Skip to main content

matrix_sdk/room/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! High-level room API
16
17use std::{
18    borrow::Borrow,
19    collections::{BTreeMap, HashMap},
20    future::Future,
21    ops::Deref,
22    sync::Arc,
23    time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30    StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39    IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43    ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44    StateChanges, StateStoreDataKey, StateStoreDataValue,
45    deserialized_responses::{
46        RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47    },
48    media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49    serde_helpers::extract_relation,
50    store::{StateStoreExt, ThreadSubscriptionStatus},
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
54#[cfg(feature = "e2e-encryption")]
55use matrix_sdk_common::BoxFuture;
56use matrix_sdk_common::{
57    deserialized_responses::TimelineEvent,
58    executor::{JoinHandle, spawn},
59    timeout::timeout,
60};
61#[cfg(feature = "experimental-search")]
62use matrix_sdk_search::error::IndexError;
63#[cfg(feature = "experimental-search")]
64#[cfg(doc)]
65use matrix_sdk_search::index::RoomIndex;
66use mime::Mime;
67use reply::Reply;
68#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
69use ruma::events::AnySyncMessageLikeEvent;
70#[cfg(feature = "experimental-encrypted-state-events")]
71use ruma::events::AnySyncStateEvent;
72#[cfg(feature = "unstable-msc4274")]
73use ruma::events::room::message::GalleryItemType;
74#[cfg(feature = "e2e-encryption")]
75use ruma::events::{
76    AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
77};
78use ruma::{
79    EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
80    OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
81    api::client::{
82        config::{set_global_account_data, set_room_account_data},
83        context,
84        error::ErrorKind,
85        filter::LazyLoadOptions,
86        membership::{
87            Invite3pid, ban_user, forget_room, get_member_events,
88            invite_user::{
89                self,
90                v3::{InvitationRecipient, InviteUserId},
91            },
92            kick_user, leave_room, unban_user,
93        },
94        message::send_message_event,
95        read_marker::set_read_marker,
96        receipt::create_receipt,
97        redact::redact_event,
98        room::{get_room_event, report_content, report_room},
99        state::{get_state_event_for_key, send_state_event},
100        tag::{create_tag, delete_tag},
101        threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
102        typing::create_typing_event::{
103            self,
104            v3::{Typing, TypingInfo},
105        },
106    },
107    assign,
108    events::{
109        AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
110        Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
111        RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
112        RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
113        StaticStateEventContent, SyncStateEvent,
114        beacon::BeaconEventContent,
115        beacon_info::BeaconInfoEventContent,
116        direct::DirectEventContent,
117        marked_unread::MarkedUnreadEventContent,
118        receipt::{Receipt, ReceiptThread, ReceiptType},
119        relation::RelationType,
120        room::{
121            ImageInfo, MediaSource, ThumbnailInfo,
122            avatar::{self, RoomAvatarEventContent},
123            encryption::PossiblyRedactedRoomEncryptionEventContent,
124            history_visibility::HistoryVisibility,
125            member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
126            message::{
127                AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
128                ImageMessageEventContent, MessageType, RoomMessageEventContent,
129                TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
130                UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
131            },
132            name::RoomNameEventContent,
133            pinned_events::RoomPinnedEventsEventContent,
134            power_levels::{
135                RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
136            },
137            server_acl::RoomServerAclEventContent,
138            topic::RoomTopicEventContent,
139        },
140        space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
141        tag::{TagInfo, TagName},
142        typing::SyncTypingEvent,
143    },
144    int,
145    push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
146    serde::Raw,
147    time::Instant,
148    uint,
149};
150#[cfg(feature = "experimental-encrypted-state-events")]
151use ruma::{
152    events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
153    serde::JsonCastable,
154};
155use serde::de::DeserializeOwned;
156use thiserror::Error;
157use tokio::{join, sync::broadcast};
158use tracing::{debug, error, info, instrument, trace, warn};
159
160use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
161pub use self::{
162    member::{RoomMember, RoomMemberRole},
163    messages::{
164        EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
165        Relations, RelationsOptions, ThreadRoots,
166    },
167};
168#[cfg(feature = "e2e-encryption")]
169use crate::encryption::backups::BackupState;
170#[cfg(doc)]
171use crate::event_cache::EventCache;
172#[cfg(feature = "experimental-encrypted-state-events")]
173use crate::room::futures::{SendRawStateEvent, SendStateEvent};
174use crate::{
175    BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
176    attachment::{AttachmentConfig, AttachmentInfo},
177    client::WeakClient,
178    config::RequestConfig,
179    error::{BeaconError, WrongRoomState},
180    event_cache::{self, EventCacheDropHandles, RoomEventCache},
181    event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
182    live_location_share::ObservableLiveLocation,
183    media::{MediaFormat, MediaRequestParameters},
184    notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
185    room::{
186        knock_requests::{KnockRequest, KnockRequestMemberInfo},
187        power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
188        privacy_settings::RoomPrivacySettings,
189    },
190    sync::RoomUpdate,
191    utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
192};
193
194pub mod edit;
195pub mod futures;
196pub mod identity_status_changes;
197/// Contains code related to requests to join a room.
198pub mod knock_requests;
199mod member;
200mod messages;
201pub mod power_levels;
202pub mod reply;
203
204pub mod calls;
205
206/// Contains all the functionality for modifying the privacy settings in a room.
207pub mod privacy_settings;
208
209#[cfg(feature = "e2e-encryption")]
210pub(crate) mod shared_room_history;
211
212/// A struct containing methods that are common for Joined, Invited and Left
213/// Rooms
214#[derive(Debug, Clone)]
215pub struct Room {
216    inner: BaseRoom,
217    pub(crate) client: Client,
218}
219
220impl Deref for Room {
221    type Target = BaseRoom;
222
223    fn deref(&self) -> &Self::Target {
224        &self.inner
225    }
226}
227
228const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
229const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
230
231/// A thread subscription, according to the semantics of MSC4306.
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
233pub struct ThreadSubscription {
234    /// Whether the subscription was made automatically by a client, not by
235    /// manual user choice.
236    pub automatic: bool,
237}
238
239/// Context allowing to compute the push actions for a given event.
240#[derive(Debug)]
241pub struct PushContext {
242    /// The Ruma context used to compute the push actions.
243    push_condition_room_ctx: PushConditionRoomCtx,
244
245    /// Push rules for this room, based on the push rules state event, or the
246    /// global server default as defined by [`Ruleset::server_default`].
247    push_rules: Ruleset,
248}
249
250impl PushContext {
251    /// Create a new [`PushContext`] from its inner components.
252    pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
253        Self { push_condition_room_ctx, push_rules }
254    }
255
256    /// Compute the push rules for a given event.
257    pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
258        self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
259    }
260
261    /// Compute the push rules for a given event, with extra logging to help
262    /// debugging.
263    #[doc(hidden)]
264    #[instrument(skip_all)]
265    pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
266        let rules = self
267            .push_rules
268            .iter()
269            .filter_map(|r| {
270                if !r.enabled() {
271                    return None;
272                }
273
274                let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
275
276                let conditions = match r {
277                    AnyPushRuleRef::Override(r) => {
278                        format!("{:?}", r.conditions)
279                    }
280                    AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
281                    AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
282                    AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
283                    AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
284                    _ => "<unknown push rule kind>".to_owned(),
285                };
286
287                Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
288            })
289            .collect::<Vec<_>>()
290            .join("\n");
291        trace!("rules:\n\n{rules}\n\n");
292
293        let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
294
295        if let Some(found) = found {
296            trace!("rule {} matched", found.rule_id());
297            found.actions().to_owned()
298        } else {
299            trace!("no match");
300            Vec::new()
301        }
302    }
303}
304
305macro_rules! make_media_type {
306    ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
307        // If caption is set, use it as body, and filename as the file name; otherwise,
308        // body is the filename, and the filename is not set.
309        // https://github.com/matrix-org/matrix-spec-proposals/blob/main/proposals/2530-body-as-caption.md
310        let (body, formatted, filename) = match $caption {
311            Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
312            None => ($filename, None, None),
313        };
314
315        let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
316
317        match $content_type.type_() {
318            mime::IMAGE => {
319                let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
320                    mimetype: Some($content_type.as_ref().to_owned()),
321                    thumbnail_source,
322                    thumbnail_info
323                });
324                let content = assign!(ImageMessageEventContent::new(body, $source), {
325                    info: Some(Box::new(info)),
326                    formatted,
327                    filename
328                });
329                <$t>::Image(content)
330            }
331
332            mime::AUDIO => {
333                let mut content = assign!(AudioMessageEventContent::new(body, $source), {
334                    formatted,
335                    filename
336                });
337
338                if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
339                 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
340                    let waveform = waveform_vec
341                        .iter()
342                        .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
343                        .map(Into::into)
344                        .collect();
345                    content.audio =
346                        Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
347                }
348
349                if matches!($info, Some(AttachmentInfo::Voice(_))) {
350                    content.voice = Some(UnstableVoiceContentBlock::new());
351                }
352
353                let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
354                audio_info.mimetype = Some($content_type.as_ref().to_owned());
355                let content = content.info(Box::new(audio_info));
356
357                <$t>::Audio(content)
358            }
359
360            mime::VIDEO => {
361                let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
362                    mimetype: Some($content_type.as_ref().to_owned()),
363                    thumbnail_source,
364                    thumbnail_info
365                });
366                let content = assign!(VideoMessageEventContent::new(body, $source), {
367                    info: Some(Box::new(info)),
368                    formatted,
369                    filename
370                });
371                <$t>::Video(content)
372            }
373
374            _ => {
375                let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
376                    mimetype: Some($content_type.as_ref().to_owned()),
377                    thumbnail_source,
378                    thumbnail_info
379                });
380                let content = assign!(FileMessageEventContent::new(body, $source), {
381                    info: Some(Box::new(info)),
382                    formatted,
383                    filename,
384                });
385                <$t>::File(content)
386            }
387        }
388    }};
389}
390
391impl Room {
392    /// Create a new `Room`
393    ///
394    /// # Arguments
395    /// * `client` - The client used to make requests.
396    ///
397    /// * `room` - The underlying room.
398    pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
399        Self { inner: room, client }
400    }
401
402    /// Leave this room.
403    /// If the room was in [`RoomState::Invited`] state, it'll also be forgotten
404    /// automatically.
405    ///
406    /// Only invited and joined rooms can be left.
407    #[doc(alias = "reject_invitation")]
408    #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
409    async fn leave_impl(&self) -> (Result<()>, &Room) {
410        let state = self.state();
411        if state == RoomState::Left {
412            return (
413                Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
414                    "Joined or Invited",
415                    state,
416                )))),
417                self,
418            );
419        }
420
421        // If the room was in Invited state we should also forget it when declining the
422        // invite.
423        let should_forget = matches!(self.state(), RoomState::Invited);
424
425        let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
426        let response = self.client.send(request).await;
427
428        // The server can return with an error that is acceptable to ignore. Let's find
429        // which one.
430        if let Err(error) = response {
431            #[allow(clippy::collapsible_match)]
432            let ignore_error = if let Some(error) = error.client_api_error_kind() {
433                match error {
434                    // The user is trying to leave a room but doesn't have permissions to do so.
435                    // Let's consider the user has left the room.
436                    ErrorKind::Forbidden => true,
437                    _ => false,
438                }
439            } else {
440                false
441            };
442
443            error!(?error, ignore_error, should_forget, "Failed to leave the room");
444
445            if !ignore_error {
446                return (Err(error.into()), self);
447            }
448        }
449
450        if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
451            return (Err(e.into()), self);
452        }
453
454        if should_forget {
455            trace!("Trying to forget the room");
456
457            if let Err(error) = self.forget().await {
458                error!(?error, "Failed to forget the room");
459            }
460        }
461
462        (Ok(()), self)
463    }
464
465    /// Leave this room and all predecessors.
466    /// If any room was in [`RoomState::Invited`] state, it'll also be forgotten
467    /// automatically.
468    ///
469    /// Only invited and joined rooms can be left.
470    /// Will return an error if the current room fails to leave but
471    /// will only warn if a predecessor fails to leave.
472    pub async fn leave(&self) -> Result<()> {
473        let mut rooms: Vec<Room> = vec![self.clone()];
474        let mut current_room = self;
475
476        while let Some(predecessor) = current_room.predecessor_room() {
477            let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
478
479            if let Some(predecessor_room) = maybe_predecessor_room {
480                rooms.push(predecessor_room.clone());
481                current_room = rooms.last().expect("Room just pushed so can't be empty");
482            } else {
483                warn!("Cannot find predecessor room");
484                break;
485            }
486        }
487
488        let batch_size = 5;
489
490        let rooms_futures: Vec<_> = rooms
491            .iter()
492            .filter_map(|room| match room.state() {
493                RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
494                    Some(room.leave_impl())
495                }
496                RoomState::Banned | RoomState::Left => None,
497            })
498            .collect();
499
500        let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
501
502        let mut maybe_this_room_failed_with: Option<Error> = None;
503
504        while let Some(result) = futures_stream.next().await {
505            if let (Err(e), room) = result {
506                if room.room_id() == self.room_id() {
507                    maybe_this_room_failed_with = Some(e);
508                } else {
509                    warn!("Failure while attempting to leave predecessor room: {e:?}");
510                }
511            }
512        }
513
514        maybe_this_room_failed_with.map_or(Ok(()), Err)
515    }
516
517    /// Join this room.
518    ///
519    /// Only invited and left rooms can be joined via this method.
520    #[doc(alias = "accept_invitation")]
521    pub async fn join(&self) -> Result<()> {
522        let prev_room_state = self.inner.state();
523
524        if prev_room_state == RoomState::Joined {
525            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
526                "Invited or Left",
527                prev_room_state,
528            ))));
529        }
530
531        self.client.join_room_by_id(self.room_id()).await?;
532
533        Ok(())
534    }
535
536    /// Get the inner client saved in this room instance.
537    ///
538    /// Returns the client this room is part of.
539    pub fn client(&self) -> Client {
540        self.client.clone()
541    }
542
543    /// Get the sync state of this room, i.e. whether it was fully synced with
544    /// the server.
545    pub fn is_synced(&self) -> bool {
546        self.inner.is_state_fully_synced()
547    }
548
549    /// Gets the avatar of this room, if set.
550    ///
551    /// Returns the avatar.
552    /// If a thumbnail is requested no guarantee on the size of the image is
553    /// given.
554    ///
555    /// # Arguments
556    ///
557    /// * `format` - The desired format of the avatar.
558    ///
559    /// # Examples
560    ///
561    /// ```no_run
562    /// # use matrix_sdk::Client;
563    /// # use matrix_sdk::ruma::room_id;
564    /// # use matrix_sdk::media::MediaFormat;
565    /// # use url::Url;
566    /// # let homeserver = Url::parse("http://example.com").unwrap();
567    /// # async {
568    /// # let user = "example";
569    /// let client = Client::new(homeserver).await.unwrap();
570    /// client.matrix_auth().login_username(user, "password").send().await.unwrap();
571    /// let room_id = room_id!("!roomid:example.com");
572    /// let room = client.get_room(&room_id).unwrap();
573    /// if let Some(avatar) = room.avatar(MediaFormat::File).await.unwrap() {
574    ///     std::fs::write("avatar.png", avatar);
575    /// }
576    /// # };
577    /// ```
578    pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
579        let Some(url) = self.avatar_url() else { return Ok(None) };
580        let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
581        Ok(Some(self.client.media().get_media_content(&request, true).await?))
582    }
583
584    /// Sends a request to `/_matrix/client/r0/rooms/{room_id}/messages` and
585    /// returns a `Messages` struct that contains a chunk of room and state
586    /// events (`RoomEvent` and `AnyStateEvent`).
587    ///
588    /// With the encryption feature, messages are decrypted if possible. If
589    /// decryption fails for an individual message, that message is returned
590    /// undecrypted.
591    ///
592    /// # Examples
593    ///
594    /// ```no_run
595    /// use matrix_sdk::{Client, room::MessagesOptions};
596    /// # use matrix_sdk::ruma::{
597    /// #     api::client::filter::RoomEventFilter,
598    /// #     room_id,
599    /// # };
600    /// # use url::Url;
601    ///
602    /// # let homeserver = Url::parse("http://example.com").unwrap();
603    /// # async {
604    /// let options =
605    ///     MessagesOptions::backward().from("t47429-4392820_219380_26003_2265");
606    ///
607    /// let mut client = Client::new(homeserver).await.unwrap();
608    /// let room = client.get_room(room_id!("!roomid:example.com")).unwrap();
609    /// assert!(room.messages(options).await.is_ok());
610    /// # };
611    /// ```
612    #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
613    pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
614        let room_id = self.inner.room_id();
615        let request = options.into_request(room_id);
616        let http_response = self.client.send(request).await?;
617
618        let push_ctx = self.push_context().await?;
619        let chunk = join_all(
620            http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
621        )
622        .await;
623
624        // Save the loaded events into the event cache, if it's set up.
625        if let Ok((cache, _handles)) = self.event_cache().await {
626            cache.save_events(chunk.clone()).await;
627        }
628
629        Ok(Messages {
630            start: http_response.start,
631            end: http_response.end,
632            chunk,
633            state: http_response.state,
634        })
635    }
636
637    /// Register a handler for events of a specific type, within this room.
638    ///
639    /// This method works the same way as [`Client::add_event_handler`], except
640    /// that the handler will only be called for events within this room. See
641    /// that method for more details on event handler functions.
642    ///
643    /// `room.add_event_handler(hdl)` is equivalent to
644    /// `client.add_room_event_handler(room_id, hdl)`. Use whichever one is more
645    /// convenient in your use case.
646    pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
647    where
648        Ev: SyncEvent + DeserializeOwned + Send + 'static,
649        H: EventHandler<Ev, Ctx>,
650    {
651        self.client.add_room_event_handler(self.room_id(), handler)
652    }
653
654    /// Subscribe to all updates for this room.
655    ///
656    /// The returned receiver will receive a new message for each sync response
657    /// that contains updates for this room.
658    pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
659        self.client.subscribe_to_room_updates(self.room_id())
660    }
661
662    /// Subscribe to typing notifications for this room.
663    ///
664    /// The returned receiver will receive a new vector of user IDs for each
665    /// sync response that contains 'm.typing' event. The current user ID will
666    /// be filtered out.
667    pub fn subscribe_to_typing_notifications(
668        &self,
669    ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
670        let (sender, receiver) = broadcast::channel(16);
671        let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
672            let own_user_id = self.own_user_id().to_owned();
673            move |event: SyncTypingEvent| async move {
674                // Ignore typing notifications from own user.
675                let typing_user_ids = event
676                    .content
677                    .user_ids
678                    .into_iter()
679                    .filter(|user_id| *user_id != own_user_id)
680                    .collect();
681                // Ignore the result. It can only fail if there are no listeners.
682                let _ = sender.send(typing_user_ids);
683            }
684        });
685        let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
686        (drop_guard, receiver)
687    }
688
689    /// Subscribe to updates about users who are in "pin violation" i.e. their
690    /// identity has changed and the user has not yet acknowledged this.
691    ///
692    /// The returned receiver will receive a new vector of
693    /// [`IdentityStatusChange`] each time a /keys/query response shows a
694    /// changed identity for a member of this room, or a sync shows a change
695    /// to the membership of an affected user. (Changes to the current user are
696    /// not directly included, but some changes to the current user's identity
697    /// can trigger changes to how we see other users' identities, which
698    /// will be included.)
699    ///
700    /// The first item in the stream provides the current state of the room:
701    /// each member of the room who is not in "pinned" or "verified" state will
702    /// be included (except the current user).
703    ///
704    /// If the `changed_to` property of an [`IdentityStatusChange`] is set to
705    /// `PinViolation` then a warning should be displayed to the user. If it is
706    /// set to `Pinned` then no warning should be displayed.
707    ///
708    /// Note that if a user who is in pin violation leaves the room, a `Pinned`
709    /// update is sent, to indicate that the warning should be removed, even
710    /// though the user's identity is not necessarily pinned.
711    #[cfg(feature = "e2e-encryption")]
712    pub async fn subscribe_to_identity_status_changes(
713        &self,
714    ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
715        IdentityStatusChanges::create_stream(self.clone()).await
716    }
717
718    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
719    /// decrypted if needs be.
720    ///
721    /// Only logs from the crypto crate will indicate a failure to decrypt.
722    #[cfg(not(feature = "experimental-encrypted-state-events"))]
723    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
724    async fn try_decrypt_event(
725        &self,
726        event: Raw<AnyTimelineEvent>,
727        push_ctx: Option<&PushContext>,
728    ) -> TimelineEvent {
729        #[cfg(feature = "e2e-encryption")]
730        if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
731            SyncMessageLikeEvent::Original(_),
732        ))) = event.deserialize_as::<AnySyncTimelineEvent>()
733            && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
734        {
735            return event;
736        }
737
738        let mut event = TimelineEvent::from_plaintext(event.cast());
739        if let Some(push_ctx) = push_ctx {
740            event.set_push_actions(push_ctx.for_event(event.raw()).await);
741        }
742
743        event
744    }
745
746    /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
747    /// decrypted if needs be.
748    ///
749    /// Only logs from the crypto crate will indicate a failure to decrypt.
750    #[cfg(feature = "experimental-encrypted-state-events")]
751    #[allow(clippy::unused_async)] // Used only in e2e-encryption.
752    async fn try_decrypt_event(
753        &self,
754        event: Raw<AnyTimelineEvent>,
755        push_ctx: Option<&PushContext>,
756    ) -> TimelineEvent {
757        // If we have either an encrypted message-like or state event, try to decrypt.
758        match event.deserialize_as::<AnySyncTimelineEvent>() {
759            Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
760                SyncMessageLikeEvent::Original(_),
761            ))) => {
762                if let Ok(event) = self
763                    .decrypt_event(
764                        event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
765                        push_ctx,
766                    )
767                    .await
768                {
769                    return event;
770                }
771            }
772            Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
773                SyncStateEvent::Original(_),
774            ))) => {
775                if let Ok(event) = self
776                    .decrypt_event(
777                        event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
778                        push_ctx,
779                    )
780                    .await
781                {
782                    return event;
783                }
784            }
785            _ => {}
786        }
787
788        let mut event = TimelineEvent::from_plaintext(event.cast());
789        if let Some(push_ctx) = push_ctx {
790            event.set_push_actions(push_ctx.for_event(event.raw()).await);
791        }
792
793        event
794    }
795
796    /// Fetch the event with the given `EventId` in this room.
797    ///
798    /// It uses the given [`RequestConfig`] if provided, or the client's default
799    /// one otherwise.
800    pub async fn event(
801        &self,
802        event_id: &EventId,
803        request_config: Option<RequestConfig>,
804    ) -> Result<TimelineEvent> {
805        let request =
806            get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
807
808        let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
809        let push_ctx = self.push_context().await?;
810        let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
811
812        // Save the event into the event cache, if it's set up.
813        if let Ok((cache, _handles)) = self.event_cache().await {
814            cache.save_events([event.clone()]).await;
815        }
816
817        Ok(event)
818    }
819
820    /// Try to load the event from the [`EventCache`][crate::event_cache], if
821    /// it's enabled, or fetch it from the homeserver.
822    ///
823    /// When running the request against the homeserver, it uses the given
824    /// [`RequestConfig`] if provided, or the client's default one
825    /// otherwise.
826    pub async fn load_or_fetch_event(
827        &self,
828        event_id: &EventId,
829        request_config: Option<RequestConfig>,
830    ) -> Result<TimelineEvent> {
831        match self.event_cache().await {
832            Ok((event_cache, _drop_handles)) => {
833                if let Some(event) = event_cache.find_event(event_id).await? {
834                    return Ok(event);
835                }
836                // Fallthrough: try with a request.
837            }
838            Err(err) => {
839                debug!("error when getting the event cache: {err}");
840            }
841        }
842        self.event(event_id, request_config).await
843    }
844
845    /// Try to load the event and its relations from the
846    /// [`EventCache`][crate::event_cache], if it's enabled, or fetch it
847    /// from the homeserver.
848    ///
849    /// You can control which types of related events are retrieved using
850    /// `filter`. A `None` value will retrieve any type of related event.
851    ///
852    /// If the event is found in the event cache, but we can't find any
853    /// relations for it there, then we will still attempt to fetch the
854    /// relations from the homeserver.
855    ///
856    /// When running any request against the homeserver, it uses the given
857    /// [`RequestConfig`] if provided, or the client's default one
858    /// otherwise.
859    ///
860    /// Returns a tuple formed of the event and a vector of its relations (that
861    /// can be empty).
862    pub async fn load_or_fetch_event_with_relations(
863        &self,
864        event_id: &EventId,
865        filter: Option<Vec<RelationType>>,
866        request_config: Option<RequestConfig>,
867    ) -> Result<(TimelineEvent, Vec<TimelineEvent>)> {
868        let fetch_relations = async || {
869            // If there's only a single filter, we can use a more efficient request,
870            // specialized on the filter type.
871            //
872            // Otherwise, we need to get all the relations:
873            // - either because no filters implies we fetch all relations,
874            // - or because there are multiple filters and we must filter out manually.
875            let include_relations = if let Some(filter) = &filter
876                && filter.len() == 1
877            {
878                IncludeRelations::RelationsOfType(filter[0].clone())
879            } else {
880                IncludeRelations::AllRelations
881            };
882
883            let mut opts = RelationsOptions {
884                include_relations,
885                recurse: true,
886                limit: Some(uint!(256)),
887                ..Default::default()
888            };
889
890            let mut events = Vec::new();
891            loop {
892                match self.relations(event_id.to_owned(), opts.clone()).await {
893                    Ok(relations) => {
894                        if let Some(filter) = filter.as_ref() {
895                            // Manually filter out the relation types we're interested in.
896                            events.extend(relations.chunk.into_iter().filter_map(|ev| {
897                                let (rel_type, _) = extract_relation(ev.raw())?;
898                                filter
899                                    .iter()
900                                    .any(|ruma_filter| ruma_filter == &rel_type)
901                                    .then_some(ev)
902                            }));
903                        } else {
904                            // No filter: include all events from the response.
905                            events.extend(relations.chunk);
906                        }
907
908                        if let Some(next_from) = relations.next_batch_token {
909                            opts.from = Some(next_from);
910                        } else {
911                            break events;
912                        }
913                    }
914
915                    Err(err) => {
916                        warn!(%event_id, "error when loading relations of pinned event from server: {err}");
917                        break events;
918                    }
919                }
920            }
921        };
922
923        // First, try to load the event *and* its relations from the event cache, all at
924        // once.
925        let event_cache = match self.event_cache().await {
926            Ok((event_cache, drop_handles)) => {
927                if let Some((event, mut relations)) =
928                    event_cache.find_event_with_relations(event_id, filter.clone()).await?
929                {
930                    if relations.is_empty() {
931                        // The event cache doesn't have any relations for this event, try to fetch
932                        // them from the server instead.
933                        relations = fetch_relations().await;
934                    }
935
936                    return Ok((event, relations));
937                }
938
939                // Otherwise, get the event from the server.
940                Some((event_cache, drop_handles))
941            }
942
943            Err(err) => {
944                debug!("error when getting the event cache: {err}");
945                // Fallthrough: try with a request.
946                None
947            }
948        };
949
950        // Fetch the event from the server. A failure here is fatal, as we must return
951        // the target event.
952        let event = self.event(event_id, request_config).await?;
953
954        // Try to get the relations from the event cache (if we have one).
955        if let Some((event_cache, _drop_handles)) = event_cache
956            && let Some(relations) =
957                event_cache.find_event_relations(event_id, filter.clone()).await.ok()
958            && !relations.is_empty()
959        {
960            return Ok((event, relations));
961        }
962
963        // We couldn't find the relations in the event cache; fetch them from the
964        // server.
965        Ok((event, fetch_relations().await))
966    }
967
968    /// Fetch the event with the given `EventId` in this room, using the
969    /// `/context` endpoint to get more information.
970    pub async fn event_with_context(
971        &self,
972        event_id: &EventId,
973        lazy_load_members: bool,
974        context_size: UInt,
975        request_config: Option<RequestConfig>,
976    ) -> Result<EventWithContextResponse> {
977        let mut request =
978            context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
979
980        request.limit = context_size;
981
982        if lazy_load_members {
983            request.filter.lazy_load_options =
984                LazyLoadOptions::Enabled { include_redundant_members: false };
985        }
986
987        let response = self.client.send(request).with_request_config(request_config).await?;
988
989        let push_ctx = self.push_context().await?;
990        let push_ctx = push_ctx.as_ref();
991        let target_event = if let Some(event) = response.event {
992            Some(self.try_decrypt_event(event, push_ctx).await)
993        } else {
994            None
995        };
996
997        // Note: the joined future will fail if any future failed, but
998        // [`Self::try_decrypt_event`] doesn't hard-fail when there's a
999        // decryption error, so we should prevent against most bad cases here.
1000        let (events_before, events_after) = join!(
1001            join_all(
1002                response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1003            ),
1004            join_all(
1005                response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
1006            ),
1007        );
1008
1009        // Save the loaded events into the event cache, if it's set up.
1010        if let Ok((cache, _handles)) = self.event_cache().await {
1011            let mut events_to_save: Vec<TimelineEvent> = Vec::new();
1012            if let Some(event) = &target_event {
1013                events_to_save.push(event.clone());
1014            }
1015
1016            for event in &events_before {
1017                events_to_save.push(event.clone());
1018            }
1019
1020            for event in &events_after {
1021                events_to_save.push(event.clone());
1022            }
1023
1024            cache.save_events(events_to_save).await;
1025        }
1026
1027        Ok(EventWithContextResponse {
1028            event: target_event,
1029            events_before,
1030            events_after,
1031            state: response.state,
1032            prev_batch_token: response.start,
1033            next_batch_token: response.end,
1034        })
1035    }
1036
1037    pub(crate) async fn request_members(&self) -> Result<()> {
1038        self.client
1039            .locks()
1040            .members_request_deduplicated_handler
1041            .run(self.room_id().to_owned(), async move {
1042                let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
1043                let response = self
1044                    .client
1045                    .send(request.clone())
1046                    .with_request_config(
1047                        // In some cases it can take longer than 30s to load:
1048                        // https://github.com/element-hq/synapse/issues/16872
1049                        RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
1050                    )
1051                    .await?;
1052
1053                // That's a large `Future`. Let's `Box::pin` to reduce its size on the stack.
1054                Box::pin(self.client.base_client().receive_all_members(
1055                    self.room_id(),
1056                    &request,
1057                    &response,
1058                ))
1059                .await?;
1060
1061                Ok(())
1062            })
1063            .await
1064    }
1065
1066    /// Request to update the encryption state for this room.
1067    ///
1068    /// It does nothing if the encryption state is already
1069    /// [`EncryptionState::Encrypted`] or [`EncryptionState::NotEncrypted`].
1070    pub async fn request_encryption_state(&self) -> Result<()> {
1071        if !self.inner.encryption_state().is_unknown() {
1072            return Ok(());
1073        }
1074
1075        self.client
1076            .locks()
1077            .encryption_state_deduplicated_handler
1078            .run(self.room_id().to_owned(), async move {
1079                // Request the event from the server.
1080                let request = get_state_event_for_key::v3::Request::new(
1081                    self.room_id().to_owned(),
1082                    StateEventType::RoomEncryption,
1083                    "".to_owned(),
1084                );
1085                let response = match self.client.send(request).await {
1086                    Ok(response) => Some(
1087                        response
1088                            .into_content()
1089                            .deserialize_as_unchecked::<PossiblyRedactedRoomEncryptionEventContent>(
1090                            )?,
1091                    ),
1092                    Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
1093                    Err(err) => return Err(err.into()),
1094                };
1095
1096                let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
1097
1098                // Persist the event and the fact that we requested it from the server in
1099                // `RoomInfo`.
1100                let mut room_info = self.clone_info();
1101                room_info.mark_encryption_state_synced();
1102                room_info.set_encryption_event(response);
1103                let mut changes = StateChanges::default();
1104                changes.add_room(room_info.clone());
1105
1106                self.client.state_store().save_changes(&changes).await?;
1107                self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1108
1109                Ok(())
1110            })
1111            .await
1112    }
1113
1114    /// Check the encryption state of this room.
1115    ///
1116    /// If the result is [`EncryptionState::Unknown`], one might want to call
1117    /// [`Room::request_encryption_state`].
1118    pub fn encryption_state(&self) -> EncryptionState {
1119        self.inner.encryption_state()
1120    }
1121
1122    /// Force to update the encryption state by calling
1123    /// [`Room::request_encryption_state`], and then calling
1124    /// [`Room::encryption_state`].
1125    ///
1126    /// This method is useful to ensure the encryption state is up-to-date.
1127    pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
1128        self.request_encryption_state().await?;
1129
1130        Ok(self.encryption_state())
1131    }
1132
1133    /// Gets additional context info about the client crypto.
1134    #[cfg(feature = "e2e-encryption")]
1135    pub async fn crypto_context_info(&self) -> CryptoContextInfo {
1136        let encryption = self.client.encryption();
1137
1138        let this_device_is_verified = match encryption.get_own_device().await {
1139            Ok(Some(device)) => device.is_verified_with_cross_signing(),
1140
1141            // Should not happen, there will always be an own device
1142            _ => true,
1143        };
1144
1145        let backup_exists_on_server =
1146            encryption.backups().exists_on_server().await.unwrap_or(false);
1147
1148        CryptoContextInfo {
1149            device_creation_ts: encryption.device_creation_timestamp().await,
1150            this_device_is_verified,
1151            is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1152            backup_exists_on_server,
1153        }
1154    }
1155
1156    fn are_events_visible(&self) -> bool {
1157        if let RoomState::Invited = self.inner.state() {
1158            return matches!(
1159                self.inner.history_visibility_or_default(),
1160                HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1161            );
1162        }
1163
1164        true
1165    }
1166
1167    /// Sync the member list with the server.
1168    ///
1169    /// This method will de-duplicate requests if it is called multiple times in
1170    /// quick succession, in that case the return value will be `None`. This
1171    /// method does nothing if the members are already synced.
1172    pub async fn sync_members(&self) -> Result<()> {
1173        if !self.are_events_visible() {
1174            return Ok(());
1175        }
1176
1177        if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1178    }
1179
1180    /// Get a specific member of this room.
1181    ///
1182    /// *Note*: This method will fetch the members from the homeserver if the
1183    /// member list isn't synchronized due to member lazy loading. Because of
1184    /// that it might panic if it isn't run on a tokio thread.
1185    ///
1186    /// Use [get_member_no_sync()](#method.get_member_no_sync) if you want a
1187    /// method that doesn't do any requests.
1188    ///
1189    /// # Arguments
1190    ///
1191    /// * `user_id` - The ID of the user that should be fetched out of the
1192    ///   store.
1193    pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1194        self.sync_members().await?;
1195        self.get_member_no_sync(user_id).await
1196    }
1197
1198    /// Get a specific member of this room.
1199    ///
1200    /// *Note*: This method will not fetch the members from the homeserver if
1201    /// the member list isn't synchronized due to member lazy loading. Thus,
1202    /// members could be missing.
1203    ///
1204    /// Use [get_member()](#method.get_member) if you want to ensure to always
1205    /// have the full member list to chose from.
1206    ///
1207    /// # Arguments
1208    ///
1209    /// * `user_id` - The ID of the user that should be fetched out of the
1210    ///   store.
1211    pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1212        Ok(self
1213            .inner
1214            .get_member(user_id)
1215            .await?
1216            .map(|member| RoomMember::new(self.client.clone(), member)))
1217    }
1218
1219    /// Get members for this room, with the given memberships.
1220    ///
1221    /// *Note*: This method will fetch the members from the homeserver if the
1222    /// member list isn't synchronized due to member lazy loading. Because of
1223    /// that it might panic if it isn't run on a tokio thread.
1224    ///
1225    /// Use [members_no_sync()](#method.members_no_sync) if you want a
1226    /// method that doesn't do any requests.
1227    pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1228        self.sync_members().await?;
1229        self.members_no_sync(memberships).await
1230    }
1231
1232    /// Get members for this room, with the given memberships.
1233    ///
1234    /// *Note*: This method will not fetch the members from the homeserver if
1235    /// the member list isn't synchronized due to member lazy loading. Thus,
1236    /// members could be missing.
1237    ///
1238    /// Use [members()](#method.members) if you want to ensure to always get
1239    /// the full member list.
1240    pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1241        Ok(self
1242            .inner
1243            .members(memberships)
1244            .await?
1245            .into_iter()
1246            .map(|member| RoomMember::new(self.client.clone(), member))
1247            .collect())
1248    }
1249
1250    /// Sets the display name of the current user within this room.
1251    ///
1252    /// *Note*: This is different to [`crate::Account::set_display_name`] which
1253    /// updates the user's display name across all of their rooms.
1254    pub async fn set_own_member_display_name(
1255        &self,
1256        display_name: Option<String>,
1257    ) -> Result<send_state_event::v3::Response> {
1258        let user_id = self.own_user_id();
1259        let member_event =
1260            self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1261
1262        let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1263            return Err(Error::InsufficientData);
1264        };
1265
1266        let event = raw_event.deserialize()?;
1267
1268        let mut content = match event {
1269            SyncStateEvent::Original(original_event) => original_event.content,
1270            SyncStateEvent::Redacted(redacted_event) => {
1271                RoomMemberEventContent::new(redacted_event.content.membership)
1272            }
1273        };
1274
1275        content.displayname = display_name;
1276        self.send_state_event_for_key(user_id, content).await
1277    }
1278
1279    /// Get all state events of a given type in this room.
1280    pub async fn get_state_events(
1281        &self,
1282        event_type: StateEventType,
1283    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1284        self.client
1285            .state_store()
1286            .get_state_events(self.room_id(), event_type)
1287            .await
1288            .map_err(Into::into)
1289    }
1290
1291    /// Get all state events of a given statically-known type in this room.
1292    ///
1293    /// # Examples
1294    ///
1295    /// ```no_run
1296    /// # async {
1297    /// # let room: matrix_sdk::Room = todo!();
1298    /// use matrix_sdk::ruma::{
1299    ///     events::room::member::RoomMemberEventContent, serde::Raw,
1300    /// };
1301    ///
1302    /// let room_members =
1303    ///     room.get_state_events_static::<RoomMemberEventContent>().await?;
1304    /// # anyhow::Ok(())
1305    /// # };
1306    /// ```
1307    pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1308    where
1309        C: StaticEventContent<IsPrefix = ruma::events::False>
1310            + StaticStateEventContent
1311            + RedactContent,
1312        C::Redacted: RedactedStateEventContent,
1313    {
1314        Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1315    }
1316
1317    /// Get the state events of a given type with the given state keys in this
1318    /// room.
1319    pub async fn get_state_events_for_keys(
1320        &self,
1321        event_type: StateEventType,
1322        state_keys: &[&str],
1323    ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1324        self.client
1325            .state_store()
1326            .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1327            .await
1328            .map_err(Into::into)
1329    }
1330
1331    /// Get the state events of a given statically-known type with the given
1332    /// state keys in this room.
1333    ///
1334    /// # Examples
1335    ///
1336    /// ```no_run
1337    /// # async {
1338    /// # let room: matrix_sdk::Room = todo!();
1339    /// # let user_ids: &[matrix_sdk::ruma::OwnedUserId] = &[];
1340    /// use matrix_sdk::ruma::events::room::member::RoomMemberEventContent;
1341    ///
1342    /// let room_members = room
1343    ///     .get_state_events_for_keys_static::<RoomMemberEventContent, _, _>(
1344    ///         user_ids,
1345    ///     )
1346    ///     .await?;
1347    /// # anyhow::Ok(())
1348    /// # };
1349    /// ```
1350    pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1351        &self,
1352        state_keys: I,
1353    ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1354    where
1355        C: StaticEventContent<IsPrefix = ruma::events::False>
1356            + StaticStateEventContent
1357            + RedactContent,
1358        C::StateKey: Borrow<K>,
1359        C::Redacted: RedactedStateEventContent,
1360        K: AsRef<str> + Sized + Sync + 'a,
1361        I: IntoIterator<Item = &'a K> + Send,
1362        I::IntoIter: Send,
1363    {
1364        Ok(self
1365            .client
1366            .state_store()
1367            .get_state_events_for_keys_static(self.room_id(), state_keys)
1368            .await?)
1369    }
1370
1371    /// Get a specific state event in this room.
1372    pub async fn get_state_event(
1373        &self,
1374        event_type: StateEventType,
1375        state_key: &str,
1376    ) -> Result<Option<RawAnySyncOrStrippedState>> {
1377        self.client
1378            .state_store()
1379            .get_state_event(self.room_id(), event_type, state_key)
1380            .await
1381            .map_err(Into::into)
1382    }
1383
1384    /// Get a specific state event of statically-known type with an empty state
1385    /// key in this room.
1386    ///
1387    /// # Examples
1388    ///
1389    /// ```no_run
1390    /// # async {
1391    /// # let room: matrix_sdk::Room = todo!();
1392    /// use matrix_sdk::ruma::events::room::power_levels::RoomPowerLevelsEventContent;
1393    ///
1394    /// let power_levels = room
1395    ///     .get_state_event_static::<RoomPowerLevelsEventContent>()
1396    ///     .await?
1397    ///     .expect("every room has a power_levels event")
1398    ///     .deserialize()?;
1399    /// # anyhow::Ok(())
1400    /// # };
1401    /// ```
1402    pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1403    where
1404        C: StaticEventContent<IsPrefix = ruma::events::False>
1405            + StaticStateEventContent<StateKey = EmptyStateKey>
1406            + RedactContent,
1407        C::Redacted: RedactedStateEventContent,
1408    {
1409        self.get_state_event_static_for_key(&EmptyStateKey).await
1410    }
1411
1412    /// Get a specific state event of statically-known type in this room.
1413    ///
1414    /// # Examples
1415    ///
1416    /// ```no_run
1417    /// # async {
1418    /// # let room: matrix_sdk::Room = todo!();
1419    /// use matrix_sdk::ruma::{
1420    ///     events::room::member::RoomMemberEventContent, serde::Raw, user_id,
1421    /// };
1422    ///
1423    /// let member_event = room
1424    ///     .get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id!(
1425    ///         "@alice:example.org"
1426    ///     ))
1427    ///     .await?;
1428    /// # anyhow::Ok(())
1429    /// # };
1430    /// ```
1431    pub async fn get_state_event_static_for_key<C, K>(
1432        &self,
1433        state_key: &K,
1434    ) -> Result<Option<RawSyncOrStrippedState<C>>>
1435    where
1436        C: StaticEventContent<IsPrefix = ruma::events::False>
1437            + StaticStateEventContent
1438            + RedactContent,
1439        C::StateKey: Borrow<K>,
1440        C::Redacted: RedactedStateEventContent,
1441        K: AsRef<str> + ?Sized + Sync,
1442    {
1443        Ok(self
1444            .client
1445            .state_store()
1446            .get_state_event_static_for_key(self.room_id(), state_key)
1447            .await?)
1448    }
1449
1450    /// Returns the parents this room advertises as its parents.
1451    ///
1452    /// Results are in no particular order.
1453    pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1454        // Implements this algorithm:
1455        // https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships
1456
1457        // Get all m.space.parent events for this room
1458        Ok(self
1459            .get_state_events_static::<SpaceParentEventContent>()
1460            .await?
1461            .into_iter()
1462            // Extract state key (ie. the parent's id) and sender
1463            .filter_map(|parent_event| match parent_event.deserialize() {
1464                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1465                    Some((e.state_key.to_owned(), e.sender))
1466                }
1467                Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1468                Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1469                Err(e) => {
1470                    info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1471                    None
1472                }
1473            })
1474            // Check whether the parent recognizes this room as its child
1475            .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1476                let Some(parent_room) = self.client.get_room(&state_key) else {
1477                    // We are not in the room, cannot check if the relationship is reciprocal
1478                    // TODO: try peeking into the room
1479                    return Ok(ParentSpace::Unverifiable(state_key));
1480                };
1481                // Get the m.space.child state of the parent with this room's id
1482                // as state key.
1483                if let Some(child_event) = parent_room
1484                    .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1485                    .await?
1486                {
1487                    match child_event.deserialize() {
1488                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1489                            // There is a valid m.space.child in the parent pointing to
1490                            // this room
1491                            return Ok(ParentSpace::Reciprocal(parent_room));
1492                        }
1493                        Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1494                        Ok(SyncOrStrippedState::Stripped(_)) => {}
1495                        Err(e) => {
1496                            info!(
1497                                room_id = ?self.room_id(), parent_room_id = ?state_key,
1498                                "Could not deserialize m.space.child: {e}"
1499                            );
1500                        }
1501                    }
1502                    // Otherwise the event is either invalid or redacted. If
1503                    // redacted it would be missing the
1504                    // `via` key, thereby invalidating that end of the
1505                    // relationship: https://spec.matrix.org/v1.8/client-server-api/#mspacechild
1506                }
1507
1508                // No reciprocal m.space.child found, let's check if the sender has the
1509                // power to set it
1510                let Some(member) = parent_room.get_member(&sender).await? else {
1511                    // Sender is not even in the parent room
1512                    return Ok(ParentSpace::Illegitimate(parent_room));
1513                };
1514
1515                if member.can_send_state(StateEventType::SpaceChild) {
1516                    // Sender does have the power to set m.room.child
1517                    Ok(ParentSpace::WithPowerlevel(parent_room))
1518                } else {
1519                    Ok(ParentSpace::Illegitimate(parent_room))
1520                }
1521            })
1522            .collect::<FuturesUnordered<_>>())
1523    }
1524
1525    /// Read account data in this room, from storage.
1526    pub async fn account_data(
1527        &self,
1528        data_type: RoomAccountDataEventType,
1529    ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1530        self.client
1531            .state_store()
1532            .get_room_account_data_event(self.room_id(), data_type)
1533            .await
1534            .map_err(Into::into)
1535    }
1536
1537    /// Get account data of a statically-known type in this room, from storage.
1538    ///
1539    /// # Examples
1540    ///
1541    /// ```no_run
1542    /// # async {
1543    /// # let room: matrix_sdk::Room = todo!();
1544    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1545    ///
1546    /// match room.account_data_static::<FullyReadEventContent>().await? {
1547    ///     Some(fully_read) => {
1548    ///         println!("Found read marker: {:?}", fully_read.deserialize()?)
1549    ///     }
1550    ///     None => println!("No read marker for this room"),
1551    /// }
1552    /// # anyhow::Ok(())
1553    /// # };
1554    /// ```
1555    pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1556    where
1557        C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1558    {
1559        Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1560    }
1561
1562    /// Check if all members of this room are verified and all their devices are
1563    /// verified.
1564    ///
1565    /// Returns true if all devices in the room are verified, otherwise false.
1566    #[cfg(feature = "e2e-encryption")]
1567    pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1568        let user_ids = self
1569            .client
1570            .state_store()
1571            .get_user_ids(self.room_id(), RoomMemberships::empty())
1572            .await?;
1573
1574        for user_id in user_ids {
1575            let devices = self.client.encryption().get_user_devices(&user_id).await?;
1576            let any_unverified = devices.devices().any(|d| !d.is_verified());
1577
1578            if any_unverified {
1579                return Ok(false);
1580            }
1581        }
1582
1583        Ok(true)
1584    }
1585
1586    /// Set the given account data event for this room.
1587    ///
1588    /// # Example
1589    /// ```
1590    /// # async {
1591    /// # let room: matrix_sdk::Room = todo!();
1592    /// # let event_id: ruma::OwnedEventId = todo!();
1593    /// use matrix_sdk::ruma::events::fully_read::FullyReadEventContent;
1594    /// let content = FullyReadEventContent::new(event_id);
1595    ///
1596    /// room.set_account_data(content).await?;
1597    /// # anyhow::Ok(())
1598    /// # };
1599    /// ```
1600    pub async fn set_account_data<T>(
1601        &self,
1602        content: T,
1603    ) -> Result<set_room_account_data::v3::Response>
1604    where
1605        T: RoomAccountDataEventContent,
1606    {
1607        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1608
1609        let request = set_room_account_data::v3::Request::new(
1610            own_user.to_owned(),
1611            self.room_id().to_owned(),
1612            &content,
1613        )?;
1614
1615        Ok(self.client.send(request).await?)
1616    }
1617
1618    /// Set the given raw account data event in this room.
1619    ///
1620    /// # Example
1621    /// ```
1622    /// # async {
1623    /// # let room: matrix_sdk::Room = todo!();
1624    /// use matrix_sdk::ruma::{
1625    ///     events::{
1626    ///         AnyRoomAccountDataEventContent, RoomAccountDataEventContent,
1627    ///         marked_unread::MarkedUnreadEventContent,
1628    ///     },
1629    ///     serde::Raw,
1630    /// };
1631    /// let marked_unread_content = MarkedUnreadEventContent::new(true);
1632    /// let full_event: AnyRoomAccountDataEventContent =
1633    ///     marked_unread_content.clone().into();
1634    /// room.set_account_data_raw(
1635    ///     marked_unread_content.event_type(),
1636    ///     Raw::new(&full_event).unwrap(),
1637    /// )
1638    /// .await?;
1639    /// # anyhow::Ok(())
1640    /// # };
1641    /// ```
1642    pub async fn set_account_data_raw(
1643        &self,
1644        event_type: RoomAccountDataEventType,
1645        content: Raw<AnyRoomAccountDataEventContent>,
1646    ) -> Result<set_room_account_data::v3::Response> {
1647        let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1648
1649        let request = set_room_account_data::v3::Request::new_raw(
1650            own_user.to_owned(),
1651            self.room_id().to_owned(),
1652            event_type,
1653            content,
1654        );
1655
1656        Ok(self.client.send(request).await?)
1657    }
1658
1659    /// Adds a tag to the room, or updates it if it already exists.
1660    ///
1661    /// Returns the [`create_tag::v3::Response`] from the server.
1662    ///
1663    /// # Arguments
1664    /// * `tag` - The tag to add or update.
1665    ///
1666    /// * `tag_info` - Information about the tag, generally containing the
1667    ///   `order` parameter.
1668    ///
1669    /// # Examples
1670    ///
1671    /// ```no_run
1672    /// # use std::str::FromStr;
1673    /// # use ruma::events::tag::{TagInfo, TagName, UserTagName};
1674    /// # async {
1675    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
1676    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
1677    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
1678    /// use matrix_sdk::ruma::events::tag::TagInfo;
1679    ///
1680    /// if let Some(room) = client.get_room(&room_id) {
1681    ///     let mut tag_info = TagInfo::new();
1682    ///     tag_info.order = Some(0.9);
1683    ///     let user_tag = UserTagName::from_str("u.work")?;
1684    ///
1685    ///     room.set_tag(TagName::User(user_tag), tag_info).await?;
1686    /// }
1687    /// # anyhow::Ok(()) };
1688    /// ```
1689    pub async fn set_tag(
1690        &self,
1691        tag: TagName,
1692        tag_info: TagInfo,
1693    ) -> Result<create_tag::v3::Response> {
1694        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1695        let request = create_tag::v3::Request::new(
1696            user_id.to_owned(),
1697            self.inner.room_id().to_owned(),
1698            tag.to_string(),
1699            tag_info,
1700        );
1701        Ok(self.client.send(request).await?)
1702    }
1703
1704    /// Removes a tag from the room.
1705    ///
1706    /// Returns the [`delete_tag::v3::Response`] from the server.
1707    ///
1708    /// # Arguments
1709    /// * `tag` - The tag to remove.
1710    pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1711        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1712        let request = delete_tag::v3::Request::new(
1713            user_id.to_owned(),
1714            self.inner.room_id().to_owned(),
1715            tag.to_string(),
1716        );
1717        Ok(self.client.send(request).await?)
1718    }
1719
1720    /// Add or remove the `m.favourite` flag for this room.
1721    ///
1722    /// If `is_favourite` is `true`, and the `m.low_priority` tag is set on the
1723    /// room, the tag will be removed too.
1724    ///
1725    /// # Arguments
1726    ///
1727    /// * `is_favourite` - Whether to mark this room as favourite.
1728    /// * `tag_order` - The order of the tag if any.
1729    pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1730        if is_favourite {
1731            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1732
1733            self.set_tag(TagName::Favorite, tag_info).await?;
1734
1735            if self.is_low_priority() {
1736                self.remove_tag(TagName::LowPriority).await?;
1737            }
1738        } else {
1739            self.remove_tag(TagName::Favorite).await?;
1740        }
1741        Ok(())
1742    }
1743
1744    /// Add or remove the `m.lowpriority` flag for this room.
1745    ///
1746    /// If `is_low_priority` is `true`, and the `m.favourite` tag is set on the
1747    /// room, the tag will be removed too.
1748    ///
1749    /// # Arguments
1750    ///
1751    /// * `is_low_priority` - Whether to mark this room as low_priority or not.
1752    /// * `tag_order` - The order of the tag if any.
1753    pub async fn set_is_low_priority(
1754        &self,
1755        is_low_priority: bool,
1756        tag_order: Option<f64>,
1757    ) -> Result<()> {
1758        if is_low_priority {
1759            let tag_info = assign!(TagInfo::new(), { order: tag_order });
1760
1761            self.set_tag(TagName::LowPriority, tag_info).await?;
1762
1763            if self.is_favourite() {
1764                self.remove_tag(TagName::Favorite).await?;
1765            }
1766        } else {
1767            self.remove_tag(TagName::LowPriority).await?;
1768        }
1769        Ok(())
1770    }
1771
1772    /// Sets whether this room is a DM.
1773    ///
1774    /// When setting this room as DM, it will be marked as DM for all active
1775    /// members of the room. When unsetting this room as DM, it will be
1776    /// unmarked as DM for all users, not just the members.
1777    ///
1778    /// # Arguments
1779    /// * `is_direct` - Whether to mark this room as direct.
1780    pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1781        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1782
1783        let mut content = self
1784            .client
1785            .account()
1786            .account_data::<DirectEventContent>()
1787            .await?
1788            .map(|c| c.deserialize())
1789            .transpose()?
1790            .unwrap_or_default();
1791
1792        let this_room_id = self.inner.room_id();
1793
1794        if is_direct {
1795            let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1796            room_members.retain(|member| member.user_id() != self.own_user_id());
1797
1798            for member in room_members {
1799                let entry = content.entry(member.user_id().into()).or_default();
1800                if !entry.iter().any(|room_id| room_id == this_room_id) {
1801                    entry.push(this_room_id.to_owned());
1802                }
1803            }
1804        } else {
1805            for (_, list) in content.iter_mut() {
1806                list.retain(|room_id| *room_id != this_room_id);
1807            }
1808
1809            // Remove user ids that don't have any room marked as DM
1810            content.retain(|_, list| !list.is_empty());
1811        }
1812
1813        let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1814
1815        self.client.send(request).await?;
1816        Ok(())
1817    }
1818
1819    /// Tries to decrypt a room event.
1820    ///
1821    /// # Arguments
1822    /// * `event` - The room event to be decrypted.
1823    ///
1824    /// Returns the decrypted event. In the case of a decryption error, returns
1825    /// a `TimelineEvent` representing the decryption error.
1826    #[cfg(feature = "e2e-encryption")]
1827    #[cfg(not(feature = "experimental-encrypted-state-events"))]
1828    pub async fn decrypt_event(
1829        &self,
1830        event: &Raw<OriginalSyncRoomEncryptedEvent>,
1831        push_ctx: Option<&PushContext>,
1832    ) -> Result<TimelineEvent> {
1833        let machine = self.client.olm_machine().await;
1834        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1835
1836        match machine
1837            .try_decrypt_room_event(
1838                event.cast_ref(),
1839                self.inner.room_id(),
1840                self.client.decryption_settings(),
1841            )
1842            .await?
1843        {
1844            RoomEventDecryptionResult::Decrypted(decrypted) => {
1845                let push_actions = if let Some(push_ctx) = push_ctx {
1846                    Some(push_ctx.for_event(&decrypted.event).await)
1847                } else {
1848                    None
1849                };
1850                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1851            }
1852            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1853                self.client
1854                    .encryption()
1855                    .backups()
1856                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1857                Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1858            }
1859        }
1860    }
1861
1862    /// Tries to decrypt a room event.
1863    ///
1864    /// # Arguments
1865    /// * `event` - The room event to be decrypted.
1866    ///
1867    /// Returns the decrypted event. In the case of a decryption error, returns
1868    /// a `TimelineEvent` representing the decryption error.
1869    #[cfg(feature = "experimental-encrypted-state-events")]
1870    pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1871        &self,
1872        event: &Raw<T>,
1873        push_ctx: Option<&PushContext>,
1874    ) -> Result<TimelineEvent> {
1875        let machine = self.client.olm_machine().await;
1876        let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1877
1878        match machine
1879            .try_decrypt_room_event(
1880                event.cast_ref(),
1881                self.inner.room_id(),
1882                self.client.decryption_settings(),
1883            )
1884            .await?
1885        {
1886            RoomEventDecryptionResult::Decrypted(decrypted) => {
1887                let push_actions = if let Some(push_ctx) = push_ctx {
1888                    Some(push_ctx.for_event(&decrypted.event).await)
1889                } else {
1890                    None
1891                };
1892                Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1893            }
1894            RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1895                self.client
1896                    .encryption()
1897                    .backups()
1898                    .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1899                // Cast safety: Anything that can be cast to EncryptedEvent must be a timeline
1900                // event.
1901                Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1902            }
1903        }
1904    }
1905
1906    /// Fetches the [`EncryptionInfo`] for an event decrypted with the supplied
1907    /// session_id.
1908    ///
1909    /// This may be used when we receive an update for a session, and we want to
1910    /// reflect the changes in messages we have received that were encrypted
1911    /// with that session, e.g. to remove a warning shield because a device is
1912    /// now verified.
1913    ///
1914    /// # Arguments
1915    /// * `session_id` - The ID of the Megolm session to get information for.
1916    /// * `sender` - The (claimed) sender of the event where the session was
1917    ///   used.
1918    #[cfg(feature = "e2e-encryption")]
1919    pub async fn get_encryption_info(
1920        &self,
1921        session_id: &str,
1922        sender: &UserId,
1923    ) -> Option<Arc<EncryptionInfo>> {
1924        let machine = self.client.olm_machine().await;
1925        let machine = machine.as_ref()?;
1926        machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1927    }
1928
1929    /// Forces the currently active room key, which is used to encrypt messages,
1930    /// to be rotated.
1931    ///
1932    /// A new room key will be crated and shared with all the room members the
1933    /// next time a message will be sent. You don't have to call this method,
1934    /// room keys will be rotated automatically when necessary. This method is
1935    /// still useful for debugging purposes.
1936    ///
1937    /// For more info please take a look a the [`encryption`] module
1938    /// documentation.
1939    ///
1940    /// [`encryption`]: crate::encryption
1941    #[cfg(feature = "e2e-encryption")]
1942    pub async fn discard_room_key(&self) -> Result<()> {
1943        let machine = self.client.olm_machine().await;
1944        if let Some(machine) = machine.as_ref() {
1945            machine.discard_room_key(self.inner.room_id()).await?;
1946            Ok(())
1947        } else {
1948            Err(Error::NoOlmMachine)
1949        }
1950    }
1951
1952    /// Ban the user with `UserId` from this room.
1953    ///
1954    /// # Arguments
1955    ///
1956    /// * `user_id` - The user to ban with `UserId`.
1957    ///
1958    /// * `reason` - The reason for banning this user.
1959    #[instrument(skip_all)]
1960    pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1961        let request = assign!(
1962            ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1963            { reason: reason.map(ToOwned::to_owned) }
1964        );
1965        self.client.send(request).await?;
1966        Ok(())
1967    }
1968
1969    /// Unban the user with `UserId` from this room.
1970    ///
1971    /// # Arguments
1972    ///
1973    /// * `user_id` - The user to unban with `UserId`.
1974    ///
1975    /// * `reason` - The reason for unbanning this user.
1976    #[instrument(skip_all)]
1977    pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1978        let request = assign!(
1979            unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1980            { reason: reason.map(ToOwned::to_owned) }
1981        );
1982        self.client.send(request).await?;
1983        Ok(())
1984    }
1985
1986    /// Kick a user out of this room.
1987    ///
1988    /// # Arguments
1989    ///
1990    /// * `user_id` - The `UserId` of the user that should be kicked out of the
1991    ///   room.
1992    ///
1993    /// * `reason` - Optional reason why the room member is being kicked out.
1994    #[instrument(skip_all)]
1995    pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1996        let request = assign!(
1997            kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1998            { reason: reason.map(ToOwned::to_owned) }
1999        );
2000        self.client.send(request).await?;
2001        Ok(())
2002    }
2003
2004    /// Invite the specified user by `UserId` to this room.
2005    ///
2006    /// # Arguments
2007    ///
2008    /// * `user_id` - The `UserId` of the user to invite to the room.
2009    #[instrument(skip_all)]
2010    pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
2011        #[cfg(feature = "e2e-encryption")]
2012        if self.client.inner.enable_share_history_on_invite {
2013            shared_room_history::share_room_history(self, user_id.to_owned()).await?;
2014        }
2015
2016        let recipient = InvitationRecipient::UserId(InviteUserId::new(user_id.to_owned()));
2017        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2018        self.client.send(request).await?;
2019
2020        // Force a future room members reload before sending any event to prevent UTDs
2021        // that can happen when some event is sent after a room member has been invited
2022        // but before the /sync request could fetch the membership change event.
2023        self.mark_members_missing();
2024
2025        Ok(())
2026    }
2027
2028    /// Invite the specified user by third party id to this room.
2029    ///
2030    /// # Arguments
2031    ///
2032    /// * `invite_id` - A third party id of a user to invite to the room.
2033    #[instrument(skip_all)]
2034    pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
2035        let recipient = InvitationRecipient::ThirdPartyId(invite_id);
2036        let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2037        self.client.send(request).await?;
2038
2039        // Force a future room members reload before sending any event to prevent UTDs
2040        // that can happen when some event is sent after a room member has been invited
2041        // but before the /sync request could fetch the membership change event.
2042        self.mark_members_missing();
2043
2044        Ok(())
2045    }
2046
2047    /// Activate typing notice for this room.
2048    ///
2049    /// The typing notice remains active for 4s. It can be deactivate at any
2050    /// point by setting typing to `false`. If this method is called while
2051    /// the typing notice is active nothing will happen. This method can be
2052    /// called on every key stroke, since it will do nothing while typing is
2053    /// active.
2054    ///
2055    /// # Arguments
2056    ///
2057    /// * `typing` - Whether the user is typing or has stopped typing.
2058    ///
2059    /// # Examples
2060    ///
2061    /// ```no_run
2062    /// use std::time::Duration;
2063    ///
2064    /// use matrix_sdk::ruma::api::client::typing::create_typing_event::v3::Typing;
2065    /// # use matrix_sdk::{
2066    /// #     Client, config::SyncSettings,
2067    /// #     ruma::room_id,
2068    /// # };
2069    /// # use url::Url;
2070    ///
2071    /// # async {
2072    /// # let homeserver = Url::parse("http://localhost:8080")?;
2073    /// # let client = Client::new(homeserver).await?;
2074    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2075    ///
2076    /// if let Some(room) = client.get_room(&room_id) {
2077    ///     room.typing_notice(true).await?
2078    /// }
2079    /// # anyhow::Ok(()) };
2080    /// ```
2081    pub async fn typing_notice(&self, typing: bool) -> Result<()> {
2082        self.ensure_room_joined()?;
2083
2084        // Only send a request to the homeserver if the old timeout has elapsed
2085        // or the typing notice changed state within the `TYPING_NOTICE_TIMEOUT`
2086        let send = if let Some(typing_time) =
2087            self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
2088        {
2089            if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
2090                // We always reactivate the typing notice if typing is true or
2091                // we may need to deactivate it if it's
2092                // currently active if typing is false
2093                typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
2094            } else {
2095                // Only send a request when we need to deactivate typing
2096                !typing
2097            }
2098        } else {
2099            // Typing notice is currently deactivated, therefore, send a request
2100            // only when it's about to be activated
2101            typing
2102        };
2103
2104        if send {
2105            self.send_typing_notice(typing).await?;
2106        }
2107
2108        Ok(())
2109    }
2110
2111    #[instrument(name = "typing_notice", skip(self))]
2112    async fn send_typing_notice(&self, typing: bool) -> Result<()> {
2113        let typing = if typing {
2114            self.client
2115                .inner
2116                .typing_notice_times
2117                .write()
2118                .unwrap()
2119                .insert(self.room_id().to_owned(), Instant::now());
2120            Typing::Yes(TypingInfo::new(TYPING_NOTICE_TIMEOUT))
2121        } else {
2122            self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
2123            Typing::No
2124        };
2125
2126        let request = create_typing_event::v3::Request::new(
2127            self.own_user_id().to_owned(),
2128            self.room_id().to_owned(),
2129            typing,
2130        );
2131
2132        self.client.send(request).await?;
2133
2134        Ok(())
2135    }
2136
2137    /// Send a request to set a single receipt.
2138    ///
2139    /// If an unthreaded receipt is sent, this will also unset the unread flag
2140    /// of the room if necessary.
2141    ///
2142    /// # Arguments
2143    ///
2144    /// * `receipt_type` - The type of the receipt to set. Note that it is
2145    ///   possible to set the fully-read marker although it is technically not a
2146    ///   receipt.
2147    ///
2148    /// * `thread` - The thread where this receipt should apply, if any. Note
2149    ///   that this must be [`ReceiptThread::Unthreaded`] when sending a
2150    ///   [`ReceiptType::FullyRead`][create_receipt::v3::ReceiptType::FullyRead].
2151    ///
2152    /// * `event_id` - The `EventId` of the event to set the receipt on.
2153    #[instrument(skip_all)]
2154    pub async fn send_single_receipt(
2155        &self,
2156        receipt_type: create_receipt::v3::ReceiptType,
2157        thread: ReceiptThread,
2158        event_id: OwnedEventId,
2159    ) -> Result<()> {
2160        // Since the receipt type and the thread aren't Hash/Ord, flatten then as a
2161        // string key.
2162        let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2163
2164        self.client
2165            .inner
2166            .locks
2167            .read_receipt_deduplicated_handler
2168            .run((request_key, event_id.clone()), async {
2169                // We will unset the unread flag if we send an unthreaded receipt.
2170                let is_unthreaded = thread == ReceiptThread::Unthreaded;
2171
2172                let mut request = create_receipt::v3::Request::new(
2173                    self.room_id().to_owned(),
2174                    receipt_type,
2175                    event_id,
2176                );
2177                request.thread = thread;
2178
2179                self.client.send(request).await?;
2180
2181                if is_unthreaded {
2182                    self.set_unread_flag(false).await?;
2183                }
2184
2185                Ok(())
2186            })
2187            .await
2188    }
2189
2190    /// Send a request to set multiple receipts at once.
2191    ///
2192    /// This will also unset the unread flag of the room if necessary.
2193    ///
2194    /// # Arguments
2195    ///
2196    /// * `receipts` - The `Receipts` to send.
2197    ///
2198    /// If `receipts` is empty, this is a no-op.
2199    #[instrument(skip_all)]
2200    pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2201        if receipts.is_empty() {
2202            return Ok(());
2203        }
2204
2205        let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2206        let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2207            fully_read,
2208            read_receipt: public_read_receipt,
2209            private_read_receipt,
2210        });
2211
2212        self.client.send(request).await?;
2213
2214        self.set_unread_flag(false).await?;
2215
2216        Ok(())
2217    }
2218
2219    /// Helper function to enable End-to-end encryption in this room.
2220    /// `encrypted_state_events` is not used unless the
2221    /// `experimental-encrypted-state-events` feature is enabled.
2222    #[allow(unused_variables, unused_mut)]
2223    async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2224        use ruma::{
2225            EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2226        };
2227        const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2228
2229        if !self.latest_encryption_state().await?.is_encrypted() {
2230            let mut content =
2231                RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2232            #[cfg(feature = "experimental-encrypted-state-events")]
2233            if encrypted_state_events {
2234                content = content.with_encrypted_state();
2235            }
2236            self.send_state_event(content).await?;
2237
2238            // Spin on the sync beat event, since the first sync we receive might not
2239            // include the encryption event.
2240            //
2241            // TODO do we want to return an error here if we time out? This
2242            // could be quite useful if someone wants to enable encryption and
2243            // send a message right after it's enabled.
2244            let res = timeout(
2245                async {
2246                    loop {
2247                        // Listen for sync events, then check if the encryption state is known.
2248                        self.client.inner.sync_beat.listen().await;
2249                        let _state_store_lock =
2250                            self.client.base_client().state_store_lock().lock().await;
2251
2252                        if !self.inner.encryption_state().is_unknown() {
2253                            break;
2254                        }
2255                    }
2256                },
2257                SYNC_WAIT_TIME,
2258            )
2259            .await;
2260
2261            let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2262
2263            // If encryption was enabled, return.
2264            #[cfg(not(feature = "experimental-encrypted-state-events"))]
2265            if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2266                debug!("room successfully marked as encrypted");
2267                return Ok(());
2268            }
2269
2270            // If encryption with state event encryption was enabled, return.
2271            #[cfg(feature = "experimental-encrypted-state-events")]
2272            if res.is_ok() && {
2273                if encrypted_state_events {
2274                    self.inner.encryption_state().is_state_encrypted()
2275                } else {
2276                    self.inner.encryption_state().is_encrypted()
2277                }
2278            } {
2279                debug!("room successfully marked as encrypted");
2280                return Ok(());
2281            }
2282
2283            // If after waiting for multiple syncs, we don't have the encryption state we
2284            // expect, assume the local encryption state is incorrect; this will
2285            // cause the SDK to re-request it later for confirmation, instead of
2286            // assuming it's sync'd and correct (and not encrypted).
2287            debug!("still not marked as encrypted, marking encryption state as missing");
2288
2289            let mut room_info = self.clone_info();
2290            room_info.mark_encryption_state_missing();
2291            let mut changes = StateChanges::default();
2292            changes.add_room(room_info.clone());
2293
2294            self.client.state_store().save_changes(&changes).await?;
2295            self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2296        }
2297
2298        Ok(())
2299    }
2300
2301    /// Enable End-to-end encryption in this room.
2302    ///
2303    /// This method will be a noop if encryption is already enabled, otherwise
2304    /// sends a `m.room.encryption` state event to the room. This might fail if
2305    /// you don't have the appropriate power level to enable end-to-end
2306    /// encryption.
2307    ///
2308    /// A sync needs to be received to update the local room state. This method
2309    /// will wait for a sync to be received, this might time out if no
2310    /// sync loop is running or if the server is slow.
2311    ///
2312    /// # Examples
2313    ///
2314    /// ```no_run
2315    /// # use matrix_sdk::{
2316    /// #     Client, config::SyncSettings,
2317    /// #     ruma::room_id,
2318    /// # };
2319    /// # use url::Url;
2320    /// #
2321    /// # async {
2322    /// # let homeserver = Url::parse("http://localhost:8080")?;
2323    /// # let client = Client::new(homeserver).await?;
2324    /// # let room_id = room_id!("!test:localhost");
2325    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2326    ///
2327    /// if let Some(room) = client.get_room(&room_id) {
2328    ///     room.enable_encryption().await?
2329    /// }
2330    /// # anyhow::Ok(()) };
2331    /// ```
2332    #[instrument(skip_all)]
2333    pub async fn enable_encryption(&self) -> Result<()> {
2334        self.enable_encryption_inner(false).await
2335    }
2336
2337    /// Enable End-to-end encryption in this room, opting into experimental
2338    /// state event encryption.
2339    ///
2340    /// This method will be a noop if encryption is already enabled, otherwise
2341    /// sends a `m.room.encryption` state event to the room. This might fail if
2342    /// you don't have the appropriate power level to enable end-to-end
2343    /// encryption.
2344    ///
2345    /// A sync needs to be received to update the local room state. This method
2346    /// will wait for a sync to be received, this might time out if no
2347    /// sync loop is running or if the server is slow.
2348    ///
2349    /// # Examples
2350    ///
2351    /// ```no_run
2352    /// # use matrix_sdk::{
2353    /// #     Client, config::SyncSettings,
2354    /// #     ruma::room_id,
2355    /// # };
2356    /// # use url::Url;
2357    /// #
2358    /// # async {
2359    /// # let homeserver = Url::parse("http://localhost:8080")?;
2360    /// # let client = Client::new(homeserver).await?;
2361    /// # let room_id = room_id!("!test:localhost");
2362    /// let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
2363    ///
2364    /// if let Some(room) = client.get_room(&room_id) {
2365    ///     room.enable_encryption_with_state_event_encryption().await?
2366    /// }
2367    /// # anyhow::Ok(()) };
2368    /// ```
2369    #[instrument(skip_all)]
2370    #[cfg(feature = "experimental-encrypted-state-events")]
2371    pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2372        self.enable_encryption_inner(true).await
2373    }
2374
2375    /// Share a room key with users in the given room.
2376    ///
2377    /// This will create Olm sessions with all the users/device pairs in the
2378    /// room if necessary and share a room key that can be shared with them.
2379    ///
2380    /// Does nothing if no room key needs to be shared.
2381    // TODO: expose this publicly so people can pre-share a group session if
2382    // e.g. a user starts to type a message for a room.
2383    #[cfg(feature = "e2e-encryption")]
2384    #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2385    async fn preshare_room_key(&self) -> Result<()> {
2386        self.ensure_room_joined()?;
2387
2388        // Take and release the lock on the store, if needs be.
2389        let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2390        tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2391
2392        self.client
2393            .locks()
2394            .group_session_deduplicated_handler
2395            .run(self.room_id().to_owned(), async move {
2396                {
2397                    let members = self
2398                        .client
2399                        .state_store()
2400                        .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2401                        .await?;
2402                    self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2403                };
2404
2405                let response = self.share_room_key().await;
2406
2407                // If one of the responses failed invalidate the group
2408                // session as using it would end up in undecryptable
2409                // messages.
2410                if let Err(r) = response {
2411                    let machine = self.client.olm_machine().await;
2412                    if let Some(machine) = machine.as_ref() {
2413                        machine.discard_room_key(self.room_id()).await?;
2414                    }
2415                    return Err(r);
2416                }
2417
2418                Ok(())
2419            })
2420            .await
2421    }
2422
2423    /// Share a group session for a room.
2424    ///
2425    /// # Panics
2426    ///
2427    /// Panics if the client isn't logged in.
2428    #[cfg(feature = "e2e-encryption")]
2429    #[instrument(skip_all)]
2430    async fn share_room_key(&self) -> Result<()> {
2431        self.ensure_room_joined()?;
2432
2433        let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2434
2435        for request in requests {
2436            let response = self.client.send_to_device(&request).await?;
2437            self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2438        }
2439
2440        Ok(())
2441    }
2442
2443    /// Wait for the room to be fully synced.
2444    ///
2445    /// This method makes sure the room that was returned when joining a room
2446    /// has been echoed back in the sync.
2447    ///
2448    /// Warning: This waits until a sync happens and does not return if no sync
2449    /// is happening. It can also return early when the room is not a joined
2450    /// room anymore.
2451    #[instrument(skip_all)]
2452    pub async fn sync_up(&self) {
2453        while !self.is_synced() && self.state() == RoomState::Joined {
2454            let wait_for_beat = self.client.inner.sync_beat.listen();
2455            // We don't care whether it's a timeout or a sync beat.
2456            let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2457        }
2458    }
2459
2460    /// Send a message-like event to this room.
2461    ///
2462    /// Returns the parsed response from the server.
2463    ///
2464    /// If the encryption feature is enabled this method will transparently
2465    /// encrypt the event if this room is encrypted (except for `m.reaction`
2466    /// events, which are never encrypted).
2467    ///
2468    /// **Note**: If you just want to send an event with custom JSON content to
2469    /// a room, you can use the [`send_raw()`][Self::send_raw] method for that.
2470    ///
2471    /// If you want to set a transaction ID for the event, use
2472    /// [`.with_transaction_id()`][SendMessageLikeEvent::with_transaction_id]
2473    /// on the returned value before `.await`ing it.
2474    ///
2475    /// # Arguments
2476    ///
2477    /// * `content` - The content of the message event.
2478    ///
2479    /// # Examples
2480    ///
2481    /// ```no_run
2482    /// # use std::sync::{Arc, RwLock};
2483    /// # use matrix_sdk::{Client, config::SyncSettings};
2484    /// # use url::Url;
2485    /// # use matrix_sdk::ruma::room_id;
2486    /// # use serde::{Deserialize, Serialize};
2487    /// use matrix_sdk::ruma::{
2488    ///     MilliSecondsSinceUnixEpoch, TransactionId,
2489    ///     events::{
2490    ///         macros::EventContent,
2491    ///         room::message::{RoomMessageEventContent, TextMessageEventContent},
2492    ///     },
2493    ///     uint,
2494    /// };
2495    ///
2496    /// # async {
2497    /// # let homeserver = Url::parse("http://localhost:8080")?;
2498    /// # let mut client = Client::new(homeserver).await?;
2499    /// # let room_id = room_id!("!test:localhost");
2500    /// let content = RoomMessageEventContent::text_plain("Hello world");
2501    /// let txn_id = TransactionId::new();
2502    ///
2503    /// if let Some(room) = client.get_room(&room_id) {
2504    ///     room.send(content).with_transaction_id(txn_id).await?;
2505    /// }
2506    ///
2507    /// // Custom events work too:
2508    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
2509    /// #[ruma_event(type = "org.shiny_new_2fa.token", kind = MessageLike)]
2510    /// struct TokenEventContent {
2511    ///     token: String,
2512    ///     #[serde(rename = "exp")]
2513    ///     expires_at: MilliSecondsSinceUnixEpoch,
2514    /// }
2515    ///
2516    /// # fn generate_token() -> String { todo!() }
2517    /// let content = TokenEventContent {
2518    ///     token: generate_token(),
2519    ///     expires_at: {
2520    ///         let now = MilliSecondsSinceUnixEpoch::now();
2521    ///         MilliSecondsSinceUnixEpoch(now.0 + uint!(30_000))
2522    ///     },
2523    /// };
2524    ///
2525    /// if let Some(room) = client.get_room(&room_id) {
2526    ///     room.send(content).await?;
2527    /// }
2528    /// # anyhow::Ok(()) };
2529    /// ```
2530    pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2531        SendMessageLikeEvent::new(self, content)
2532    }
2533
2534    /// Run /keys/query requests for all the non-tracked users, and for users
2535    /// with an out-of-date device list.
2536    #[cfg(feature = "e2e-encryption")]
2537    async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2538        let olm = self.client.olm_machine().await;
2539        let olm = olm.as_ref().expect("Olm machine wasn't started");
2540
2541        let members =
2542            self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2543
2544        let tracked: HashMap<_, _> = olm
2545            .store()
2546            .load_tracked_users()
2547            .await?
2548            .into_iter()
2549            .map(|tracked| (tracked.user_id, tracked.dirty))
2550            .collect();
2551
2552        // A member has no unknown devices iff it was tracked *and* the tracking is
2553        // not considered dirty.
2554        let members_with_unknown_devices =
2555            members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2556
2557        let (req_id, request) =
2558            olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2559
2560        if !request.device_keys.is_empty() {
2561            self.client.keys_query(&req_id, request.device_keys).await?;
2562        }
2563
2564        Ok(())
2565    }
2566
2567    /// Send a message-like event with custom JSON content to this room.
2568    ///
2569    /// Returns the parsed response from the server.
2570    ///
2571    /// If the encryption feature is enabled this method will transparently
2572    /// encrypt the event if this room is encrypted (except for `m.reaction`
2573    /// events, which are never encrypted).
2574    ///
2575    /// This method is equivalent to the [`send()`][Self::send] method but
2576    /// allows sending custom JSON payloads, e.g. constructed using the
2577    /// [`serde_json::json!()`] macro.
2578    ///
2579    /// If you want to set a transaction ID for the event, use
2580    /// [`.with_transaction_id()`][SendRawMessageLikeEvent::with_transaction_id]
2581    /// on the returned value before `.await`ing it.
2582    ///
2583    /// # Arguments
2584    ///
2585    /// * `event_type` - The type of the event.
2586    ///
2587    /// * `content` - The content of the event as a raw JSON value. The argument
2588    ///   type can be `serde_json::Value`, but also other raw JSON types; for
2589    ///   the full list check the documentation of
2590    ///   [`IntoRawMessageLikeEventContent`].
2591    ///
2592    /// # Examples
2593    ///
2594    /// ```no_run
2595    /// # use std::sync::{Arc, RwLock};
2596    /// # use matrix_sdk::{Client, config::SyncSettings};
2597    /// # use url::Url;
2598    /// # use matrix_sdk::ruma::room_id;
2599    /// # async {
2600    /// # let homeserver = Url::parse("http://localhost:8080")?;
2601    /// # let mut client = Client::new(homeserver).await?;
2602    /// # let room_id = room_id!("!test:localhost");
2603    /// use serde_json::json;
2604    ///
2605    /// if let Some(room) = client.get_room(&room_id) {
2606    ///     room.send_raw("m.room.message", json!({ "body": "Hello world" })).await?;
2607    /// }
2608    /// # anyhow::Ok(()) };
2609    /// ```
2610    #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2611    pub fn send_raw<'a>(
2612        &'a self,
2613        event_type: &'a str,
2614        content: impl IntoRawMessageLikeEventContent,
2615    ) -> SendRawMessageLikeEvent<'a> {
2616        // Note: the recorded instrument fields are saved in
2617        // `SendRawMessageLikeEvent::into_future`.
2618        SendRawMessageLikeEvent::new(self, event_type, content)
2619    }
2620
2621    /// Send an attachment to this room.
2622    ///
2623    /// This will upload the given data that the reader produces using the
2624    /// [`upload()`] method and post an event to the given room.
2625    /// If the room is encrypted and the encryption feature is enabled the
2626    /// upload will be encrypted.
2627    ///
2628    /// This is a convenience method that calls the
2629    /// [`upload()`] and afterwards the [`send()`].
2630    ///
2631    /// # Arguments
2632    /// * `filename` - The file name.
2633    ///
2634    /// * `content_type` - The type of the media, this will be used as the
2635    /// content-type header.
2636    ///
2637    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2638    /// media.
2639    ///
2640    /// * `config` - Metadata and configuration for the attachment.
2641    ///
2642    /// # Examples
2643    ///
2644    /// ```no_run
2645    /// # use std::fs;
2646    /// # use matrix_sdk::{Client, ruma::room_id, attachment::AttachmentConfig};
2647    /// # use url::Url;
2648    /// # use mime;
2649    /// # async {
2650    /// # let homeserver = Url::parse("http://localhost:8080")?;
2651    /// # let mut client = Client::new(homeserver).await?;
2652    /// # let room_id = room_id!("!test:localhost");
2653    /// let mut image = fs::read("/home/example/my-cat.jpg")?;
2654    ///
2655    /// if let Some(room) = client.get_room(&room_id) {
2656    ///     room.send_attachment(
2657    ///         "my_favorite_cat.jpg",
2658    ///         &mime::IMAGE_JPEG,
2659    ///         image,
2660    ///         AttachmentConfig::new(),
2661    ///     ).await?;
2662    /// }
2663    /// # anyhow::Ok(()) };
2664    /// ```
2665    ///
2666    /// [`upload()`]: crate::Media::upload
2667    /// [`send()`]: Self::send
2668    #[instrument(skip_all)]
2669    pub fn send_attachment<'a>(
2670        &'a self,
2671        filename: impl Into<String>,
2672        content_type: &'a Mime,
2673        data: Vec<u8>,
2674        config: AttachmentConfig,
2675    ) -> SendAttachment<'a> {
2676        SendAttachment::new(self, filename.into(), content_type, data, config)
2677    }
2678
2679    /// Prepare and send an attachment to this room.
2680    ///
2681    /// This will upload the given data that the reader produces using the
2682    /// [`upload()`](#method.upload) method and post an event to the given room.
2683    /// If the room is encrypted and the encryption feature is enabled the
2684    /// upload will be encrypted.
2685    ///
2686    /// This is a convenience method that calls the
2687    /// [`Client::upload()`](#Client::method.upload) and afterwards the
2688    /// [`send()`](#method.send).
2689    ///
2690    /// # Arguments
2691    /// * `filename` - The file name.
2692    ///
2693    /// * `content_type` - The type of the media, this will be used as the
2694    ///   content-type header.
2695    ///
2696    /// * `reader` - A `Reader` that will be used to fetch the raw bytes of the
2697    ///   media.
2698    ///
2699    /// * `config` - Metadata and configuration for the attachment.
2700    ///
2701    /// * `send_progress` - An observable to transmit forward progress about the
2702    ///   upload.
2703    ///
2704    /// * `store_in_cache` - A boolean defining whether the uploaded media will
2705    ///   be stored in the cache immediately after a successful upload.
2706    #[instrument(skip_all)]
2707    pub(super) async fn prepare_and_send_attachment<'a>(
2708        &'a self,
2709        filename: String,
2710        content_type: &'a Mime,
2711        data: Vec<u8>,
2712        mut config: AttachmentConfig,
2713        send_progress: SharedObservable<TransmissionProgress>,
2714        store_in_cache: bool,
2715    ) -> Result<send_message_event::v3::Response> {
2716        self.ensure_room_joined()?;
2717
2718        let txn_id = config.txn_id.take();
2719        let mentions = config.mentions.take();
2720
2721        let thumbnail = config.thumbnail.take();
2722
2723        // If necessary, store caching data for the thumbnail ahead of time.
2724        let thumbnail_cache_info = if store_in_cache {
2725            thumbnail
2726                .as_ref()
2727                .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2728        } else {
2729            None
2730        };
2731
2732        #[cfg(feature = "e2e-encryption")]
2733        let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2734            self.client
2735                .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2736                .await?
2737        } else {
2738            self.client
2739                .media()
2740                .upload_plain_media_and_thumbnail(
2741                    content_type,
2742                    // TODO: get rid of this clone; wait for Ruma to use `Bytes` or something
2743                    // similar.
2744                    data.clone(),
2745                    thumbnail,
2746                    send_progress,
2747                )
2748                .await?
2749        };
2750
2751        #[cfg(not(feature = "e2e-encryption"))]
2752        let (media_source, thumbnail) = self
2753            .client
2754            .media()
2755            .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2756            .await?;
2757
2758        if store_in_cache {
2759            let media_store_lock_guard = self.client.media_store().lock().await?;
2760
2761            // A failure to cache shouldn't prevent the whole upload from finishing
2762            // properly, so only log errors during caching.
2763
2764            debug!("caching the media");
2765            let request =
2766                MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2767
2768            if let Err(err) = media_store_lock_guard
2769                .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2770                .await
2771            {
2772                warn!("unable to cache the media after uploading it: {err}");
2773            }
2774
2775            if let Some(((data, height, width), source)) =
2776                thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2777            {
2778                debug!("caching the thumbnail");
2779
2780                let request = MediaRequestParameters {
2781                    source: source.clone(),
2782                    format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2783                };
2784
2785                if let Err(err) = media_store_lock_guard
2786                    .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2787                    .await
2788                {
2789                    warn!("unable to cache the media after uploading it: {err}");
2790                }
2791            }
2792        }
2793
2794        let content = self
2795            .make_media_event(
2796                Room::make_attachment_type(
2797                    content_type,
2798                    filename,
2799                    media_source,
2800                    config.caption,
2801                    config.info,
2802                    thumbnail,
2803                ),
2804                mentions,
2805                config.reply,
2806            )
2807            .await?;
2808
2809        let mut fut = self.send(content);
2810        if let Some(txn_id) = txn_id {
2811            fut = fut.with_transaction_id(txn_id);
2812        }
2813
2814        fut.await.map(|result| result.response)
2815    }
2816
2817    /// Creates the inner [`MessageType`] for an already-uploaded media file
2818    /// provided by its source.
2819    #[allow(clippy::too_many_arguments)]
2820    pub(crate) fn make_attachment_type(
2821        content_type: &Mime,
2822        filename: String,
2823        source: MediaSource,
2824        caption: Option<TextMessageEventContent>,
2825        info: Option<AttachmentInfo>,
2826        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2827    ) -> MessageType {
2828        make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2829    }
2830
2831    /// Creates the [`RoomMessageEventContent`] based on the message type,
2832    /// mentions and reply information.
2833    pub(crate) async fn make_media_event(
2834        &self,
2835        msg_type: MessageType,
2836        mentions: Option<Mentions>,
2837        reply: Option<Reply>,
2838    ) -> Result<RoomMessageEventContent> {
2839        let mut content = RoomMessageEventContent::new(msg_type);
2840        if let Some(mentions) = mentions {
2841            content = content.add_mentions(mentions);
2842        }
2843        if let Some(reply) = reply {
2844            // Since we just created the event, there is no relation attached to it. Thus,
2845            // it is safe to add the reply relation without overriding anything.
2846            content = self.make_reply_event(content.into(), reply).await?;
2847        }
2848        Ok(content)
2849    }
2850
2851    /// Creates the inner [`GalleryItemType`] for an already-uploaded media file
2852    /// provided by its source.
2853    #[cfg(feature = "unstable-msc4274")]
2854    #[allow(clippy::too_many_arguments)]
2855    pub(crate) fn make_gallery_item_type(
2856        content_type: &Mime,
2857        filename: String,
2858        source: MediaSource,
2859        caption: Option<TextMessageEventContent>,
2860        info: Option<AttachmentInfo>,
2861        thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2862    ) -> GalleryItemType {
2863        make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2864    }
2865
2866    /// Update the power levels of a select set of users of this room.
2867    ///
2868    /// Issue a `power_levels` state event request to the server, changing the
2869    /// given UserId -> Int levels. May fail if the `power_levels` aren't
2870    /// locally known yet or the server rejects the state event update, e.g.
2871    /// because of insufficient permissions. Neither permissions to update
2872    /// nor whether the data might be stale is checked prior to issuing the
2873    /// request.
2874    pub async fn update_power_levels(
2875        &self,
2876        updates: Vec<(&UserId, Int)>,
2877    ) -> Result<send_state_event::v3::Response> {
2878        let mut power_levels = self.power_levels().await?;
2879
2880        for (user_id, new_level) in updates {
2881            if new_level == power_levels.users_default {
2882                power_levels.users.remove(user_id);
2883            } else {
2884                power_levels.users.insert(user_id.to_owned(), new_level);
2885            }
2886        }
2887
2888        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2889    }
2890
2891    /// Applies a set of power level changes to this room.
2892    ///
2893    /// Any values that are `None` in the given `RoomPowerLevelChanges` will
2894    /// remain unchanged.
2895    pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2896        let mut power_levels = self.power_levels().await?;
2897        power_levels.apply(changes)?;
2898        self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2899        Ok(())
2900    }
2901
2902    /// Resets the room's power levels to the default values
2903    ///
2904    /// [spec]: https://spec.matrix.org/v1.9/client-server-api/#mroompower_levels
2905    pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2906        let creators = self.creators().unwrap_or_default();
2907        let rules = self.clone_info().room_version_rules_or_default();
2908
2909        let default_power_levels =
2910            RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2911        let changes = RoomPowerLevelChanges::from(default_power_levels);
2912        self.apply_power_level_changes(changes).await?;
2913        Ok(self.power_levels().await?)
2914    }
2915
2916    /// Gets the suggested role for the user with the provided `user_id`.
2917    ///
2918    /// This method checks the `RoomPowerLevels` events instead of loading the
2919    /// member list and looking for the member.
2920    pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2921        let power_level = self.get_user_power_level(user_id).await?;
2922        Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2923    }
2924
2925    /// Gets the power level the user with the provided `user_id`.
2926    ///
2927    /// This method checks the `RoomPowerLevels` events instead of loading the
2928    /// member list and looking for the member.
2929    pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2930        let event = self.power_levels().await?;
2931        Ok(event.for_user(user_id))
2932    }
2933
2934    /// Gets a map with the `UserId` of users with power levels other than `0`
2935    /// and this power level.
2936    pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2937        let power_levels = self.power_levels().await.ok();
2938        let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2939        if let Some(power_levels) = power_levels {
2940            for (id, level) in power_levels.users.into_iter() {
2941                user_power_levels.insert(id, level.into());
2942            }
2943        }
2944        user_power_levels
2945    }
2946
2947    /// Sets the name of this room.
2948    pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2949        self.send_state_event(RoomNameEventContent::new(name)).await
2950    }
2951
2952    /// Sets a new topic for this room.
2953    pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2954        self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2955    }
2956
2957    /// Sets the new avatar url for this room.
2958    ///
2959    /// # Arguments
2960    /// * `avatar_url` - The owned Matrix uri that represents the avatar
2961    /// * `info` - The optional image info that can be provided for the avatar
2962    pub async fn set_avatar_url(
2963        &self,
2964        url: &MxcUri,
2965        info: Option<avatar::ImageInfo>,
2966    ) -> Result<send_state_event::v3::Response> {
2967        self.ensure_room_joined()?;
2968
2969        let mut room_avatar_event = RoomAvatarEventContent::new();
2970        room_avatar_event.url = Some(url.to_owned());
2971        room_avatar_event.info = info.map(Box::new);
2972
2973        self.send_state_event(room_avatar_event).await
2974    }
2975
2976    /// Removes the avatar from the room
2977    pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2978        self.send_state_event(RoomAvatarEventContent::new()).await
2979    }
2980
2981    /// Uploads a new avatar for this room.
2982    ///
2983    /// # Arguments
2984    /// * `mime` - The mime type describing the data
2985    /// * `data` - The data representation of the avatar
2986    /// * `info` - The optional image info provided for the avatar, the blurhash
2987    ///   and the mimetype will always be updated
2988    pub async fn upload_avatar(
2989        &self,
2990        mime: &Mime,
2991        data: Vec<u8>,
2992        info: Option<avatar::ImageInfo>,
2993    ) -> Result<send_state_event::v3::Response> {
2994        self.ensure_room_joined()?;
2995
2996        let upload_response = self.client.media().upload(mime, data, None).await?;
2997        let mut info = info.unwrap_or_default();
2998        info.blurhash = upload_response.blurhash;
2999        info.mimetype = Some(mime.to_string());
3000
3001        self.set_avatar_url(&upload_response.content_uri, Some(info)).await
3002    }
3003
3004    /// Send a state event with an empty state key to the homeserver.
3005    ///
3006    /// For state events with a non-empty state key, see
3007    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3008    ///
3009    /// Returns the parsed response from the server.
3010    ///
3011    /// # Arguments
3012    ///
3013    /// * `content` - The content of the state event.
3014    ///
3015    /// # Examples
3016    ///
3017    /// ```no_run
3018    /// # use serde::{Deserialize, Serialize};
3019    /// # async {
3020    /// # let joined_room: matrix_sdk::Room = todo!();
3021    /// use matrix_sdk::ruma::{
3022    ///     EventEncryptionAlgorithm,
3023    ///     events::{
3024    ///         EmptyStateKey, macros::EventContent,
3025    ///         room::encryption::RoomEncryptionEventContent,
3026    ///     },
3027    /// };
3028    ///
3029    /// let encryption_event_content = RoomEncryptionEventContent::new(
3030    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
3031    /// );
3032    /// joined_room.send_state_event(encryption_event_content).await?;
3033    ///
3034    /// // Custom event:
3035    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3036    /// #[ruma_event(
3037    ///     type = "org.matrix.msc_9000.xxx",
3038    ///     kind = State,
3039    ///     state_key_type = EmptyStateKey,
3040    /// )]
3041    /// struct XxxStateEventContent {/* fields... */}
3042    ///
3043    /// let content: XxxStateEventContent = todo!();
3044    /// joined_room.send_state_event(content).await?;
3045    /// # anyhow::Ok(()) };
3046    /// ```
3047    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3048    #[instrument(skip_all)]
3049    pub async fn send_state_event(
3050        &self,
3051        content: impl StateEventContent<StateKey = EmptyStateKey>,
3052    ) -> Result<send_state_event::v3::Response> {
3053        self.send_state_event_for_key(&EmptyStateKey, content).await
3054    }
3055
3056    /// Send a state event with an empty state key to the homeserver.
3057    ///
3058    /// For state events with a non-empty state key, see
3059    /// [`send_state_event_for_key`][Self::send_state_event_for_key].
3060    ///
3061    /// If the experimental state event encryption feature is enabled, this
3062    /// method will transparently encrypt the event if this room is
3063    /// encrypted (except if the event type is considered critical for the room
3064    /// to function, as outlined in [MSC4362][msc4362]).
3065    ///
3066    /// Returns the parsed response from the server.
3067    ///
3068    /// # Arguments
3069    ///
3070    /// * `content` - The content of the state event.
3071    ///
3072    /// # Examples
3073    ///
3074    /// ```no_run
3075    /// # use serde::{Deserialize, Serialize};
3076    /// # async {
3077    /// # let joined_room: matrix_sdk::Room = todo!();
3078    /// use matrix_sdk::ruma::{
3079    ///     EventEncryptionAlgorithm,
3080    ///     events::{
3081    ///         EmptyStateKey, macros::EventContent,
3082    ///         room::encryption::RoomEncryptionEventContent,
3083    ///     },
3084    /// };
3085    ///
3086    /// let encryption_event_content = RoomEncryptionEventContent::new(
3087    ///     EventEncryptionAlgorithm::MegolmV1AesSha2,
3088    /// );
3089    /// joined_room.send_state_event(encryption_event_content).await?;
3090    ///
3091    /// // Custom event:
3092    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3093    /// #[ruma_event(
3094    ///     type = "org.matrix.msc_9000.xxx",
3095    ///     kind = State,
3096    ///     state_key_type = EmptyStateKey,
3097    /// )]
3098    /// struct XxxStateEventContent {/* fields... */}
3099    ///
3100    /// let content: XxxStateEventContent = todo!();
3101    /// joined_room.send_state_event(content).await?;
3102    /// # anyhow::Ok(()) };
3103    /// ```
3104    ///
3105    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/blob/travis/msc/encrypted-state/proposals/4362-encrypted-state.md
3106    #[cfg(feature = "experimental-encrypted-state-events")]
3107    #[instrument(skip_all)]
3108    pub fn send_state_event<'a>(
3109        &'a self,
3110        content: impl StateEventContent<StateKey = EmptyStateKey>,
3111    ) -> SendStateEvent<'a> {
3112        self.send_state_event_for_key(&EmptyStateKey, content)
3113    }
3114
3115    /// Send a state event to the homeserver.
3116    ///
3117    /// Returns the parsed response from the server.
3118    ///
3119    /// # Arguments
3120    ///
3121    /// * `content` - The content of the state event.
3122    ///
3123    /// * `state_key` - A unique key which defines the overwriting semantics for
3124    ///   this piece of room state.
3125    ///
3126    /// # Examples
3127    ///
3128    /// ```no_run
3129    /// # use serde::{Deserialize, Serialize};
3130    /// # async {
3131    /// # let joined_room: matrix_sdk::Room = todo!();
3132    /// use matrix_sdk::ruma::{
3133    ///     events::{
3134    ///         macros::EventContent,
3135    ///         room::member::{RoomMemberEventContent, MembershipState},
3136    ///     },
3137    ///     mxc_uri,
3138    /// };
3139    ///
3140    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3141    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3142    /// content.avatar_url = Some(avatar_url);
3143    ///
3144    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3145    ///
3146    /// // Custom event:
3147    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3148    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3149    /// struct XxxStateEventContent { /* fields... */ }
3150    ///
3151    /// let content: XxxStateEventContent = todo!();
3152    /// joined_room.send_state_event_for_key("foo", content).await?;
3153    /// # anyhow::Ok(()) };
3154    /// ```
3155    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3156    pub async fn send_state_event_for_key<C, K>(
3157        &self,
3158        state_key: &K,
3159        content: C,
3160    ) -> Result<send_state_event::v3::Response>
3161    where
3162        C: StateEventContent,
3163        C::StateKey: Borrow<K>,
3164        K: AsRef<str> + ?Sized,
3165    {
3166        self.ensure_room_joined()?;
3167        let request =
3168            send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3169        let response = self.client.send(request).await?;
3170        Ok(response)
3171    }
3172
3173    /// Send a state event to the homeserver. If state encryption is enabled in
3174    /// this room, the event will be encrypted.
3175    ///
3176    /// If the experimental state event encryption feature is enabled, this
3177    /// method will transparently encrypt the event if this room is
3178    /// encrypted (except if the event type is considered critical for the room
3179    /// to function, as outlined in [MSC4362][msc4362]).
3180    ///
3181    /// Returns the parsed response from the server.
3182    ///
3183    /// # Arguments
3184    ///
3185    /// * `content` - The content of the state event.
3186    ///
3187    /// * `state_key` - A unique key which defines the overwriting semantics for
3188    ///   this piece of room state.
3189    ///
3190    /// # Examples
3191    ///
3192    /// ```no_run
3193    /// # use serde::{Deserialize, Serialize};
3194    /// # async {
3195    /// # let joined_room: matrix_sdk::Room = todo!();
3196    /// use matrix_sdk::ruma::{
3197    ///     events::{
3198    ///         macros::EventContent,
3199    ///         room::member::{RoomMemberEventContent, MembershipState},
3200    ///     },
3201    ///     mxc_uri,
3202    /// };
3203    ///
3204    /// let avatar_url = mxc_uri!("mxc://example.org/avatar").to_owned();
3205    /// let mut content = RoomMemberEventContent::new(MembershipState::Join);
3206    /// content.avatar_url = Some(avatar_url);
3207    ///
3208    /// joined_room.send_state_event_for_key(ruma::user_id!("@foo:bar.com"), content).await?;
3209    ///
3210    /// // Custom event:
3211    /// #[derive(Clone, Debug, Deserialize, Serialize, EventContent)]
3212    /// #[ruma_event(type = "org.matrix.msc_9000.xxx", kind = State, state_key_type = String)]
3213    /// struct XxxStateEventContent { /* fields... */ }
3214    ///
3215    /// let content: XxxStateEventContent = todo!();
3216    /// joined_room.send_state_event_for_key("foo", content).await?;
3217    /// # anyhow::Ok(()) };
3218    /// ```
3219    ///
3220    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3221    #[cfg(feature = "experimental-encrypted-state-events")]
3222    pub fn send_state_event_for_key<'a, C, K>(
3223        &'a self,
3224        state_key: &K,
3225        content: C,
3226    ) -> SendStateEvent<'a>
3227    where
3228        C: StateEventContent,
3229        C::StateKey: Borrow<K>,
3230        K: AsRef<str> + ?Sized,
3231    {
3232        SendStateEvent::new(self, state_key, content)
3233    }
3234
3235    /// Send a raw room state event to the homeserver.
3236    ///
3237    /// Returns the parsed response from the server.
3238    ///
3239    /// # Arguments
3240    ///
3241    /// * `event_type` - The type of the event that we're sending out.
3242    ///
3243    /// * `state_key` - A unique key which defines the overwriting semantics for
3244    /// this piece of room state. This value is often a zero-length string.
3245    ///
3246    /// * `content` - The content of the event as a raw JSON value. The argument
3247    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3248    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3249    ///
3250    /// # Examples
3251    ///
3252    /// ```no_run
3253    /// use serde_json::json;
3254    ///
3255    /// # async {
3256    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3257    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3258    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3259    ///
3260    /// if let Some(room) = client.get_room(&room_id) {
3261    ///     room.send_state_event_raw("m.room.member", "", json!({
3262    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3263    ///         "displayname": "Alice Margatroid",
3264    ///         "membership": "join",
3265    ///     })).await?;
3266    /// }
3267    /// # anyhow::Ok(()) };
3268    /// ```
3269    #[cfg(not(feature = "experimental-encrypted-state-events"))]
3270    #[instrument(skip_all)]
3271    pub async fn send_state_event_raw(
3272        &self,
3273        event_type: &str,
3274        state_key: &str,
3275        content: impl IntoRawStateEventContent,
3276    ) -> Result<send_state_event::v3::Response> {
3277        self.ensure_room_joined()?;
3278
3279        let request = send_state_event::v3::Request::new_raw(
3280            self.room_id().to_owned(),
3281            event_type.into(),
3282            state_key.to_owned(),
3283            content.into_raw_state_event_content(),
3284        );
3285
3286        Ok(self.client.send(request).await?)
3287    }
3288
3289    /// Send a raw room state event to the homeserver.
3290    ///
3291    /// If the experimental state event encryption feature is enabled, this
3292    /// method will transparently encrypt the event if this room is
3293    /// encrypted (except if the event type is considered critical for the room
3294    /// to function, as outlined in [MSC4362][msc4362]).
3295    ///
3296    /// Returns the parsed response from the server.
3297    ///
3298    /// # Arguments
3299    ///
3300    /// * `event_type` - The type of the event that we're sending out.
3301    ///
3302    /// * `state_key` - A unique key which defines the overwriting semantics for
3303    /// this piece of room state. This value is often a zero-length string.
3304    ///
3305    /// * `content` - The content of the event as a raw JSON value. The argument
3306    ///   type can be `serde_json::Value`, but also other raw JSON types; for
3307    ///   the full list check the documentation of [`IntoRawStateEventContent`].
3308    ///
3309    /// # Examples
3310    ///
3311    /// ```no_run
3312    /// use serde_json::json;
3313    ///
3314    /// # async {
3315    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3316    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3317    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3318    ///
3319    /// if let Some(room) = client.get_room(&room_id) {
3320    ///     room.send_state_event_raw("m.room.member", "", json!({
3321    ///         "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
3322    ///         "displayname": "Alice Margatroid",
3323    ///         "membership": "join",
3324    ///     })).await?;
3325    /// }
3326    /// # anyhow::Ok(()) };
3327    /// ```
3328    ///
3329    /// [msc4362]: https://github.com/matrix-org/matrix-spec-proposals/pull/4362
3330    #[cfg(feature = "experimental-encrypted-state-events")]
3331    #[instrument(skip_all)]
3332    pub fn send_state_event_raw<'a>(
3333        &'a self,
3334        event_type: &'a str,
3335        state_key: &'a str,
3336        content: impl IntoRawStateEventContent,
3337    ) -> SendRawStateEvent<'a> {
3338        SendRawStateEvent::new(self, event_type, state_key, content)
3339    }
3340
3341    /// Strips all information out of an event of the room.
3342    ///
3343    /// Returns the [`redact_event::v3::Response`] from the server.
3344    ///
3345    /// This cannot be undone. Users may redact their own events, and any user
3346    /// with a power level greater than or equal to the redact power level of
3347    /// the room may redact events there.
3348    ///
3349    /// # Arguments
3350    ///
3351    /// * `event_id` - The ID of the event to redact
3352    ///
3353    /// * `reason` - The reason for the event being redacted.
3354    ///
3355    /// * `txn_id` - A unique ID that can be attached to this event as
3356    /// its transaction ID. If not given one is created for the message.
3357    ///
3358    /// # Examples
3359    ///
3360    /// ```no_run
3361    /// use matrix_sdk::ruma::event_id;
3362    ///
3363    /// # async {
3364    /// # let homeserver = url::Url::parse("http://localhost:8080")?;
3365    /// # let mut client = matrix_sdk::Client::new(homeserver).await?;
3366    /// # let room_id = matrix_sdk::ruma::room_id!("!test:localhost");
3367    /// #
3368    /// if let Some(room) = client.get_room(&room_id) {
3369    ///     let event_id = event_id!("$xxxxxx:example.org");
3370    ///     let reason = Some("Indecent material");
3371    ///     room.redact(&event_id, reason, None).await?;
3372    /// }
3373    /// # anyhow::Ok(()) };
3374    /// ```
3375    #[instrument(skip_all)]
3376    pub async fn redact(
3377        &self,
3378        event_id: &EventId,
3379        reason: Option<&str>,
3380        txn_id: Option<OwnedTransactionId>,
3381    ) -> HttpResult<redact_event::v3::Response> {
3382        let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3383        let request = assign!(
3384            redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3385            { reason: reason.map(ToOwned::to_owned) }
3386        );
3387
3388        self.client.send(request).await
3389    }
3390
3391    /// Get a list of servers that should know this room.
3392    ///
3393    /// Uses the synced members of the room and the suggested [routing
3394    /// algorithm] from the Matrix spec.
3395    ///
3396    /// Returns at most three servers.
3397    ///
3398    /// [routing algorithm]: https://spec.matrix.org/v1.3/appendices/#routing
3399    pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3400        let acl_ev = self
3401            .get_state_event_static::<RoomServerAclEventContent>()
3402            .await?
3403            .and_then(|ev| ev.deserialize().ok());
3404        let acl = acl_ev.as_ref().and_then(|ev| match ev {
3405            SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3406            SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3407        });
3408
3409        // Filter out server names that:
3410        // - Are blocked due to server ACLs
3411        // - Are IP addresses
3412        let members: Vec<_> = self
3413            .members_no_sync(RoomMemberships::JOIN)
3414            .await?
3415            .into_iter()
3416            .filter(|member| {
3417                let server = member.user_id().server_name();
3418                acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3419            })
3420            .collect();
3421
3422        // Get the server of the highest power level user in the room, provided
3423        // they are at least power level 50.
3424        let max = members
3425            .iter()
3426            .max_by_key(|member| member.power_level())
3427            .filter(|max| max.power_level() >= int!(50))
3428            .map(|member| member.user_id().server_name());
3429
3430        // Sort the servers by population.
3431        let servers = members
3432            .iter()
3433            .map(|member| member.user_id().server_name())
3434            .filter(|server| max.filter(|max| max == server).is_none())
3435            .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3436                *servers.entry(server).or_default() += 1;
3437                servers
3438            });
3439        let mut servers: Vec<_> = servers.into_iter().collect();
3440        servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3441
3442        Ok(max
3443            .into_iter()
3444            .chain(servers.into_iter().map(|(name, _)| name))
3445            .take(3)
3446            .map(ToOwned::to_owned)
3447            .collect())
3448    }
3449
3450    /// Get a `matrix.to` permalink to this room.
3451    ///
3452    /// If this room has an alias, we use it. Otherwise, we try to use the
3453    /// synced members in the room for [routing] the room ID.
3454    ///
3455    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3456    pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3457        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3458            return Ok(alias.matrix_to_uri());
3459        }
3460
3461        let via = self.route().await?;
3462        Ok(self.room_id().matrix_to_uri_via(via))
3463    }
3464
3465    /// Get a `matrix:` permalink to this room.
3466    ///
3467    /// If this room has an alias, we use it. Otherwise, we try to use the
3468    /// synced members in the room for [routing] the room ID.
3469    ///
3470    /// # Arguments
3471    ///
3472    /// * `join` - Whether the user should join the room.
3473    ///
3474    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3475    pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3476        if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3477            return Ok(alias.matrix_uri(join));
3478        }
3479
3480        let via = self.route().await?;
3481        Ok(self.room_id().matrix_uri_via(via, join))
3482    }
3483
3484    /// Get a `matrix.to` permalink to an event in this room.
3485    ///
3486    /// We try to use the synced members in the room for [routing] the room ID.
3487    ///
3488    /// *Note*: This method does not check if the given event ID is actually
3489    /// part of this room. It needs to be checked before calling this method
3490    /// otherwise the permalink won't work.
3491    ///
3492    /// # Arguments
3493    ///
3494    /// * `event_id` - The ID of the event.
3495    ///
3496    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3497    pub async fn matrix_to_event_permalink(
3498        &self,
3499        event_id: impl Into<OwnedEventId>,
3500    ) -> Result<MatrixToUri> {
3501        // Don't use the alias because an event is tied to a room ID, but an
3502        // alias might point to another room, e.g. after a room upgrade.
3503        let via = self.route().await?;
3504        Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3505    }
3506
3507    /// Get a `matrix:` permalink to an event in this room.
3508    ///
3509    /// We try to use the synced members in the room for [routing] the room ID.
3510    ///
3511    /// *Note*: This method does not check if the given event ID is actually
3512    /// part of this room. It needs to be checked before calling this method
3513    /// otherwise the permalink won't work.
3514    ///
3515    /// # Arguments
3516    ///
3517    /// * `event_id` - The ID of the event.
3518    ///
3519    /// [routing]: https://spec.matrix.org/v1.3/appendices/#routing
3520    pub async fn matrix_event_permalink(
3521        &self,
3522        event_id: impl Into<OwnedEventId>,
3523    ) -> Result<MatrixUri> {
3524        // Don't use the alias because an event is tied to a room ID, but an
3525        // alias might point to another room, e.g. after a room upgrade.
3526        let via = self.route().await?;
3527        Ok(self.room_id().matrix_event_uri_via(event_id, via))
3528    }
3529
3530    /// Get the latest receipt of a user in this room.
3531    ///
3532    /// # Arguments
3533    ///
3534    /// * `receipt_type` - The type of receipt to get.
3535    ///
3536    /// * `thread` - The thread containing the event of the receipt, if any.
3537    ///
3538    /// * `user_id` - The ID of the user.
3539    ///
3540    /// Returns the ID of the event on which the receipt applies and the
3541    /// receipt.
3542    pub async fn load_user_receipt(
3543        &self,
3544        receipt_type: ReceiptType,
3545        thread: ReceiptThread,
3546        user_id: &UserId,
3547    ) -> Result<Option<(OwnedEventId, Receipt)>> {
3548        self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3549    }
3550
3551    /// Load the receipts for an event in this room from storage.
3552    ///
3553    /// # Arguments
3554    ///
3555    /// * `receipt_type` - The type of receipt to get.
3556    ///
3557    /// * `thread` - The thread containing the event of the receipt, if any.
3558    ///
3559    /// * `event_id` - The ID of the event.
3560    ///
3561    /// Returns a list of IDs of users who have sent a receipt for the event and
3562    /// the corresponding receipts.
3563    pub async fn load_event_receipts(
3564        &self,
3565        receipt_type: ReceiptType,
3566        thread: ReceiptThread,
3567        event_id: &EventId,
3568    ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3569        self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3570    }
3571
3572    /// Get the push-condition context for this room.
3573    ///
3574    /// Returns `None` if some data couldn't be found. This should only happen
3575    /// in brand new rooms, while we process its state.
3576    pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3577        self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions().await?)
3578            .await
3579    }
3580
3581    /// Get the push-condition context for this room, with a choice to include
3582    /// thread subscriptions or not, based on the extra
3583    /// `with_threads_subscriptions` parameter.
3584    ///
3585    /// Returns `None` if some data couldn't be found. This should only happen
3586    /// in brand new rooms, while we process its state.
3587    pub(crate) async fn push_condition_room_ctx_internal(
3588        &self,
3589        with_threads_subscriptions: bool,
3590    ) -> Result<Option<PushConditionRoomCtx>> {
3591        let room_id = self.room_id();
3592        let user_id = self.own_user_id();
3593        let room_info = self.clone_info();
3594        let member_count = room_info.active_members_count();
3595
3596        let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3597            member.name().to_owned()
3598        } else {
3599            return Ok(None);
3600        };
3601
3602        let power_levels = match self.power_levels().await {
3603            Ok(power_levels) => Some(power_levels.into()),
3604            Err(error) => {
3605                if matches!(room_info.state(), RoomState::Joined) {
3606                    // It's normal to not have the power levels in a non-joined room, so don't log
3607                    // the error if the room is not joined
3608                    error!("Could not compute power levels for push conditions: {error}");
3609                }
3610                None
3611            }
3612        };
3613
3614        let mut ctx = assign!(PushConditionRoomCtx::new(
3615            room_id.to_owned(),
3616            UInt::new(member_count).unwrap_or(UInt::MAX),
3617            user_id.to_owned(),
3618            user_display_name,
3619        ),
3620        {
3621            power_levels,
3622        });
3623
3624        if with_threads_subscriptions {
3625            let this = self.clone();
3626            ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3627                let room = this.clone();
3628                Box::pin(async move {
3629                    if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3630                        maybe_sub.is_some()
3631                    } else {
3632                        false
3633                    }
3634                })
3635            });
3636        }
3637
3638        Ok(Some(ctx))
3639    }
3640
3641    /// Retrieves a [`PushContext`] that can be used to compute the push
3642    /// actions for events.
3643    pub async fn push_context(&self) -> Result<Option<PushContext>> {
3644        self.push_context_internal(self.client.enabled_thread_subscriptions().await?).await
3645    }
3646
3647    /// Retrieves a [`PushContext`] that can be used to compute the push actions
3648    /// for events, with a choice to include thread subscriptions or not,
3649    /// based on the extra `with_threads_subscriptions` parameter.
3650    #[instrument(skip(self))]
3651    pub(crate) async fn push_context_internal(
3652        &self,
3653        with_threads_subscriptions: bool,
3654    ) -> Result<Option<PushContext>> {
3655        let Some(push_condition_room_ctx) =
3656            self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3657        else {
3658            debug!("Could not aggregate push context");
3659            return Ok(None);
3660        };
3661        let push_rules = self.client().account().push_rules().await?;
3662        Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3663    }
3664
3665    /// Get the push actions for the given event with the current room state.
3666    ///
3667    /// Note that it is possible that no push action is returned because the
3668    /// current room state does not have all the required state events.
3669    pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3670        if let Some(ctx) = self.push_context().await? {
3671            Ok(Some(ctx.for_event(event).await))
3672        } else {
3673            Ok(None)
3674        }
3675    }
3676
3677    /// The membership details of the (latest) invite for the logged-in user in
3678    /// this room.
3679    pub async fn invite_details(&self) -> Result<Invite> {
3680        let state = self.state();
3681
3682        if state != RoomState::Invited {
3683            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3684        }
3685
3686        let invitee = self
3687            .get_member_no_sync(self.own_user_id())
3688            .await?
3689            .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3690        let event = invitee.event();
3691
3692        let inviter_id = event.sender().to_owned();
3693        let inviter = self.get_member_no_sync(&inviter_id).await?;
3694
3695        Ok(Invite { invitee, inviter_id, inviter })
3696    }
3697
3698    /// Get the membership details for the current user.
3699    ///
3700    /// Returns:
3701    ///     - If the user was present in the room, a
3702    ///       [`RoomMemberWithSenderInfo`] containing both the user info and the
3703    ///       member info of the sender of the `m.room.member` event.
3704    ///     - If the current user is not present, an error.
3705    pub async fn member_with_sender_info(
3706        &self,
3707        user_id: &UserId,
3708    ) -> Result<RoomMemberWithSenderInfo> {
3709        let Some(member) = self.get_member_no_sync(user_id).await? else {
3710            return Err(Error::InsufficientData);
3711        };
3712
3713        let sender_member =
3714            if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3715                // If the sender room member info is already available, return it
3716                Some(member)
3717            } else if self.are_members_synced() {
3718                // The room members are synced and we couldn't find the sender info
3719                None
3720            } else if self.sync_members().await.is_ok() {
3721                // Try getting the sender room member info again after syncing
3722                self.get_member_no_sync(member.event().sender()).await?
3723            } else {
3724                None
3725            };
3726
3727        Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3728    }
3729
3730    /// Forget this room.
3731    ///
3732    /// This communicates to the homeserver that it should forget the room.
3733    ///
3734    /// Only left or banned-from rooms can be forgotten.
3735    pub async fn forget(&self) -> Result<()> {
3736        let state = self.state();
3737        match state {
3738            RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3739                return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3740                    "Left / Banned",
3741                    state,
3742                ))));
3743            }
3744            RoomState::Left | RoomState::Banned => {}
3745        }
3746
3747        let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3748        let _response = self.client.send(request).await?;
3749
3750        // If it was a DM, remove the room from the `m.direct` global account data.
3751        if self.inner.direct_targets_length() != 0
3752            && let Err(e) = self.set_is_direct(false).await
3753        {
3754            // It is not important whether we managed to remove the room, it will not have
3755            // any consequences, so just log the error.
3756            warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3757        }
3758
3759        self.client.base_client().forget_room(self.inner.room_id()).await?;
3760
3761        Ok(())
3762    }
3763
3764    fn ensure_room_joined(&self) -> Result<()> {
3765        let state = self.state();
3766        if state == RoomState::Joined {
3767            Ok(())
3768        } else {
3769            Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3770        }
3771    }
3772
3773    /// Get the notification mode.
3774    pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3775        if !matches!(self.state(), RoomState::Joined) {
3776            return None;
3777        }
3778
3779        let notification_settings = self.client().notification_settings().await;
3780
3781        // Get the user-defined mode if available
3782        let notification_mode =
3783            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3784
3785        if notification_mode.is_some() {
3786            notification_mode
3787        } else if let Ok(is_encrypted) =
3788            self.latest_encryption_state().await.map(|state| state.is_encrypted())
3789        {
3790            // Otherwise, if encrypted status is available, get the default mode for this
3791            // type of room.
3792            // From the point of view of notification settings, a `one-to-one` room is one
3793            // that involves exactly two people.
3794            let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3795            let default_mode = notification_settings
3796                .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3797                .await;
3798            Some(default_mode)
3799        } else {
3800            None
3801        }
3802    }
3803
3804    /// Get the user-defined notification mode.
3805    ///
3806    /// The result is cached for fast and non-async call. To read the cached
3807    /// result, use
3808    /// [`matrix_sdk_base::Room::cached_user_defined_notification_mode`].
3809    //
3810    // Note for maintainers:
3811    //
3812    // The fact the result is cached is an important property. If you change that in
3813    // the future, please review all calls to this method.
3814    pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3815        if !matches!(self.state(), RoomState::Joined) {
3816            return None;
3817        }
3818
3819        let notification_settings = self.client().notification_settings().await;
3820
3821        // Get the user-defined mode if available.
3822        let mode =
3823            notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3824
3825        if let Some(mode) = mode {
3826            self.update_cached_user_defined_notification_mode(mode);
3827        }
3828
3829        mode
3830    }
3831
3832    /// Report an event as inappropriate to the homeserver's administrator.
3833    ///
3834    /// # Arguments
3835    ///
3836    /// * `event_id` - The ID of the event to report.
3837    /// * `score` - The score to rate this content.
3838    /// * `reason` - The reason the content is being reported.
3839    ///
3840    /// # Errors
3841    ///
3842    /// Returns an error if the room is not joined or if an error occurs with
3843    /// the request.
3844    pub async fn report_content(
3845        &self,
3846        event_id: OwnedEventId,
3847        reason: Option<String>,
3848    ) -> Result<report_content::v3::Response> {
3849        let state = self.state();
3850        if state != RoomState::Joined {
3851            return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3852        }
3853
3854        let request = assign!(
3855            report_content::v3::Request::new(
3856                self.inner.room_id().to_owned(),
3857                event_id,
3858            ), {
3859                reason: reason
3860            }
3861        );
3862        Ok(self.client.send(request).await?)
3863    }
3864
3865    /// Reports a room as inappropriate to the server.
3866    /// The caller is not required to be joined to the room to report it.
3867    ///
3868    /// # Arguments
3869    ///
3870    /// * `reason` - The reason the room is being reported.
3871    ///
3872    /// # Errors
3873    ///
3874    /// Returns an error if the room is not found or on rate limit
3875    pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3876        let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3877
3878        Ok(self.client.send(request).await?)
3879    }
3880
3881    /// Set a flag on the room to indicate that the user has explicitly marked
3882    /// it as (un)read.
3883    ///
3884    /// This is a no-op if [`BaseRoom::is_marked_unread()`] returns the same
3885    /// value as `unread`.
3886    pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3887        if self.is_marked_unread() == unread {
3888            // The request is not necessary.
3889            return Ok(());
3890        }
3891
3892        let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3893
3894        let content = MarkedUnreadEventContent::new(unread);
3895
3896        let request = set_room_account_data::v3::Request::new(
3897            user_id.to_owned(),
3898            self.inner.room_id().to_owned(),
3899            &content,
3900        )?;
3901
3902        self.client.send(request).await?;
3903        Ok(())
3904    }
3905
3906    /// Returns the [`RoomEventCache`] associated to this room, assuming the
3907    /// global [`EventCache`] has been enabled for subscription.
3908    pub async fn event_cache(
3909        &self,
3910    ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3911        self.client.event_cache().for_room(self.room_id()).await
3912    }
3913
3914    /// Get the beacon information event in the room for the `user_id`.
3915    ///
3916    /// # Errors
3917    ///
3918    /// Returns an error if the event is redacted, stripped, not found or could
3919    /// not be deserialized.
3920    pub(crate) async fn get_user_beacon_info(
3921        &self,
3922        user_id: &UserId,
3923    ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3924        let raw_event = self
3925            .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3926            .await?
3927            .ok_or(BeaconError::NotFound)?;
3928
3929        match raw_event.deserialize()? {
3930            SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3931            SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3932            SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3933        }
3934    }
3935
3936    /// Start sharing live location in the room.
3937    ///
3938    /// # Arguments
3939    ///
3940    /// * `duration_millis` - The duration for which the live location is
3941    ///   shared, in milliseconds.
3942    /// * `description` - An optional description for the live location share.
3943    ///
3944    /// # Errors
3945    ///
3946    /// Returns an error if the room is not joined or if the state event could
3947    /// not be sent.
3948    pub async fn start_live_location_share(
3949        &self,
3950        duration_millis: u64,
3951        description: Option<String>,
3952    ) -> Result<send_state_event::v3::Response> {
3953        self.ensure_room_joined()?;
3954
3955        self.send_state_event_for_key(
3956            self.own_user_id(),
3957            BeaconInfoEventContent::new(
3958                description,
3959                Duration::from_millis(duration_millis),
3960                true,
3961                None,
3962            ),
3963        )
3964        .await
3965    }
3966
3967    /// Stop sharing live location in the room.
3968    ///
3969    /// # Errors
3970    ///
3971    /// Returns an error if the room is not joined, if the beacon information
3972    /// is redacted or stripped, or if the state event is not found.
3973    pub async fn stop_live_location_share(
3974        &self,
3975    ) -> Result<send_state_event::v3::Response, BeaconError> {
3976        self.ensure_room_joined()?;
3977
3978        let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3979        beacon_info_event.content.stop();
3980        Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3981    }
3982
3983    /// Send a location beacon event in the current room.
3984    ///
3985    /// # Arguments
3986    ///
3987    /// * `geo_uri` - The geo URI of the location beacon.
3988    ///
3989    /// # Errors
3990    ///
3991    /// Returns an error if the room is not joined, if the beacon information
3992    /// is redacted or stripped, if the location share is no longer live,
3993    /// or if the state event is not found.
3994    pub async fn send_location_beacon(
3995        &self,
3996        geo_uri: String,
3997    ) -> Result<send_message_event::v3::Response, BeaconError> {
3998        self.ensure_room_joined()?;
3999
4000        let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
4001
4002        if beacon_info_event.content.is_live() {
4003            let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
4004            Ok(self.send(content).await?.response)
4005        } else {
4006            Err(BeaconError::NotLive)
4007        }
4008    }
4009
4010    /// Store the given `ComposerDraft` in the state store using the current
4011    /// room id and optional thread root id as identifier.
4012    pub async fn save_composer_draft(
4013        &self,
4014        draft: ComposerDraft,
4015        thread_root: Option<&EventId>,
4016    ) -> Result<()> {
4017        self.client
4018            .state_store()
4019            .set_kv_data(
4020                StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
4021                StateStoreDataValue::ComposerDraft(draft),
4022            )
4023            .await?;
4024        Ok(())
4025    }
4026
4027    /// Retrieve the `ComposerDraft` stored in the state store for this room
4028    /// and given thread, if any.
4029    pub async fn load_composer_draft(
4030        &self,
4031        thread_root: Option<&EventId>,
4032    ) -> Result<Option<ComposerDraft>> {
4033        let data = self
4034            .client
4035            .state_store()
4036            .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4037            .await?;
4038        Ok(data.and_then(|d| d.into_composer_draft()))
4039    }
4040
4041    /// Remove the `ComposerDraft` stored in the state store for this room
4042    /// and given thread, if any.
4043    pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
4044        self.client
4045            .state_store()
4046            .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4047            .await?;
4048        Ok(())
4049    }
4050
4051    /// Load pinned state events for a room from the `/state` endpoint in the
4052    /// home server.
4053    pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
4054        let response = self
4055            .client
4056            .send(get_state_event_for_key::v3::Request::new(
4057                self.room_id().to_owned(),
4058                StateEventType::RoomPinnedEvents,
4059                "".to_owned(),
4060            ))
4061            .await;
4062
4063        match response {
4064            Ok(response) => Ok(Some(
4065                response
4066                    .into_content()
4067                    .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
4068                    .pinned,
4069            )),
4070            Err(http_error) => match http_error.as_client_api_error() {
4071                Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
4072                _ => Err(http_error.into()),
4073            },
4074        }
4075    }
4076
4077    /// Observe live location sharing events for this room.
4078    ///
4079    /// The returned observable will receive the newest event for each sync
4080    /// response that contains an `m.beacon` event.
4081    ///
4082    /// Returns a stream of [`ObservableLiveLocation`] events from other users
4083    /// in the room, excluding the live location events of the room's own user.
4084    pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
4085        ObservableLiveLocation::new(&self.client, self.room_id())
4086    }
4087
4088    /// Subscribe to knock requests in this `Room`.
4089    ///
4090    /// The current requests to join the room will be emitted immediately
4091    /// when subscribing.
4092    ///
4093    /// A new set of knock requests will be emitted whenever:
4094    /// - A new member event is received.
4095    /// - A knock request is marked as seen.
4096    /// - A sync is gappy (limited), so room membership information may be
4097    ///   outdated.
4098    ///
4099    /// Returns both a stream of knock requests and a handle for a task that
4100    /// will clean up the seen knock request ids when possible.
4101    pub async fn subscribe_to_knock_requests(
4102        &self,
4103    ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
4104        let this = Arc::new(self.clone());
4105
4106        let room_member_events_observer =
4107            self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
4108
4109        let current_seen_ids = self.get_seen_knock_request_ids().await?;
4110        let mut seen_request_ids_stream = self
4111            .seen_knock_request_ids_map
4112            .subscribe()
4113            .await
4114            .map(|values| values.unwrap_or_default());
4115
4116        let mut room_info_stream = self.subscribe_info();
4117
4118        // Spawn a task that will clean up the seen knock request ids when updated room
4119        // members are received
4120        let clear_seen_ids_handle = spawn({
4121            let this = self.clone();
4122            async move {
4123                let mut member_updates_stream = this.room_member_updates_sender.subscribe();
4124                while member_updates_stream.recv().await.is_ok() {
4125                    // If room members were updated, try to remove outdated seen knock request ids
4126                    if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
4127                        warn!("Failed to remove seen knock requests: {err}")
4128                    }
4129                }
4130            }
4131        });
4132
4133        let combined_stream = stream! {
4134            // Emit current requests to join
4135            match this.get_current_join_requests(&current_seen_ids).await {
4136                Ok(initial_requests) => yield initial_requests,
4137                Err(err) => warn!("Failed to get initial requests to join: {err}")
4138            }
4139
4140            let mut requests_stream = room_member_events_observer.subscribe();
4141            let mut seen_ids = current_seen_ids.clone();
4142
4143            loop {
4144                // This is equivalent to a combine stream operation, triggering a new emission
4145                // when any of the branches changes
4146                tokio::select! {
4147                    Some((event, _)) = requests_stream.next() => {
4148                        if let Some(event) = event.as_original() {
4149                            // If we can calculate the membership change, try to emit only when needed
4150                            let emit = if event.prev_content().is_some() {
4151                                matches!(event.membership_change(),
4152                                    MembershipChange::Banned |
4153                                    MembershipChange::Knocked |
4154                                    MembershipChange::KnockAccepted |
4155                                    MembershipChange::KnockDenied |
4156                                    MembershipChange::KnockRetracted
4157                                )
4158                            } else {
4159                                // If we can't calculate the membership change, assume we need to
4160                                // emit updated values
4161                                true
4162                            };
4163
4164                            if emit {
4165                                match this.get_current_join_requests(&seen_ids).await {
4166                                    Ok(requests) => yield requests,
4167                                    Err(err) => {
4168                                        warn!("Failed to get updated knock requests on new member event: {err}")
4169                                    }
4170                                }
4171                            }
4172                        }
4173                    }
4174
4175                    Some(new_seen_ids) = seen_request_ids_stream.next() => {
4176                        // Update the current seen ids
4177                        seen_ids = new_seen_ids;
4178
4179                        // If seen requests have changed we need to recalculate
4180                        // all the knock requests
4181                        match this.get_current_join_requests(&seen_ids).await {
4182                            Ok(requests) => yield requests,
4183                            Err(err) => {
4184                                warn!("Failed to get updated knock requests on seen ids changed: {err}")
4185                            }
4186                        }
4187                    }
4188
4189                    Some(room_info) = room_info_stream.next() => {
4190                        // We need to emit new items when we may have missing room members:
4191                        // this usually happens after a gappy (limited) sync
4192                        if !room_info.are_members_synced() {
4193                            match this.get_current_join_requests(&seen_ids).await {
4194                                Ok(requests) => yield requests,
4195                                Err(err) => {
4196                                    warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4197                                }
4198                            }
4199                        }
4200                    }
4201                    // If the streams in all branches are closed, stop the loop
4202                    else => break,
4203                }
4204            }
4205        };
4206
4207        Ok((combined_stream, clear_seen_ids_handle))
4208    }
4209
4210    async fn get_current_join_requests(
4211        &self,
4212        seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4213    ) -> Result<Vec<KnockRequest>> {
4214        Ok(self
4215            .members(RoomMemberships::KNOCK)
4216            .await?
4217            .into_iter()
4218            .filter_map(|member| {
4219                let event_id = member.event().event_id()?;
4220                Some(KnockRequest::new(
4221                    self,
4222                    event_id,
4223                    member.event().timestamp(),
4224                    KnockRequestMemberInfo::from_member(&member),
4225                    seen_request_ids.contains_key(event_id),
4226                ))
4227            })
4228            .collect())
4229    }
4230
4231    /// Access the room settings related to privacy and visibility.
4232    pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4233        RoomPrivacySettings::new(&self.inner, &self.client)
4234    }
4235
4236    /// Retrieve a list of all the threads for the current room.
4237    ///
4238    /// Since this client-server API is paginated, the return type may include a
4239    /// token used to resuming back-pagination into the list of results, in
4240    /// [`ThreadRoots::prev_batch_token`]. This token can be fed back into
4241    /// [`ListThreadsOptions::from`] to continue the pagination
4242    /// from the previous position.
4243    pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4244        let request = opts.into_request(self.room_id());
4245
4246        let response = self.client.send(request).await?;
4247
4248        let push_ctx = self.push_context().await?;
4249        let chunk = join_all(
4250            response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4251        )
4252        .await;
4253
4254        Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4255    }
4256
4257    /// Retrieve a list of relations for the given event, according to the given
4258    /// options, using the network.
4259    ///
4260    /// Since this client-server API is paginated, the return type may include a
4261    /// token used to resuming back-pagination into the list of results, in
4262    /// [`Relations::prev_batch_token`]. This token can be fed back into
4263    /// [`RelationsOptions::from`] to continue the pagination from the previous
4264    /// position.
4265    ///
4266    /// **Note**: if [`RelationsOptions::from`] is set for a subsequent request,
4267    /// then it must be used with the same
4268    /// [`RelationsOptions::include_relations`] value as the request that
4269    /// returns the `from` token, otherwise the server behavior is undefined.
4270    pub async fn relations(
4271        &self,
4272        event_id: OwnedEventId,
4273        opts: RelationsOptions,
4274    ) -> Result<Relations> {
4275        let relations = opts.send(self, event_id).await;
4276
4277        // Save any new related events to the cache.
4278        if let Ok(Relations { chunk, .. }) = &relations
4279            && let Ok((cache, _handles)) = self.event_cache().await
4280        {
4281            cache.save_events(chunk.clone()).await;
4282        }
4283
4284        relations
4285    }
4286
4287    /// Search this room's [`RoomIndex`] for query and return at most
4288    /// max_number_of_results results.
4289    #[cfg(feature = "experimental-search")]
4290    pub async fn search(
4291        &self,
4292        query: &str,
4293        max_number_of_results: usize,
4294        pagination_offset: Option<usize>,
4295    ) -> Result<Vec<OwnedEventId>, IndexError> {
4296        let mut search_index_guard = self.client.search_index().lock().await;
4297        search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4298    }
4299
4300    /// Subscribe to a given thread in this room.
4301    ///
4302    /// This will subscribe the user to the thread, so that they will receive
4303    /// notifications for that thread specifically.
4304    ///
4305    /// # Arguments
4306    ///
4307    /// - `thread_root`: The ID of the thread root event to subscribe to.
4308    /// - `automatic`: Whether the subscription was made automatically by a
4309    ///   client, not by manual user choice. If set, must include the latest
4310    ///   event ID that's known in the thread and that is causing the automatic
4311    ///   subscription. If unset (i.e. we're now subscribing manually) and there
4312    ///   was a previous automatic subscription, the subscription will be
4313    ///   overridden to a manual one instead.
4314    ///
4315    /// # Returns
4316    ///
4317    /// - A 404 error if the event isn't known, or isn't a thread root.
4318    /// - An `Ok` result if the subscription was successful, or if the server
4319    ///   skipped an automatic subscription (as the user unsubscribed from the
4320    ///   thread after the event causing the automatic subscription).
4321    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4322    pub async fn subscribe_thread(
4323        &self,
4324        thread_root: OwnedEventId,
4325        automatic: Option<OwnedEventId>,
4326    ) -> Result<()> {
4327        let is_automatic = automatic.is_some();
4328
4329        match self
4330            .client
4331            .send(subscribe_thread::unstable::Request::new(
4332                self.room_id().to_owned(),
4333                thread_root.clone(),
4334                automatic,
4335            ))
4336            .await
4337        {
4338            Ok(_response) => {
4339                trace!("Server acknowledged the thread subscription; saving in db");
4340
4341                // Immediately save the result into the database.
4342                self.client
4343                    .state_store()
4344                    .upsert_thread_subscriptions(vec![(
4345                        self.room_id(),
4346                        &thread_root,
4347                        StoredThreadSubscription {
4348                            status: ThreadSubscriptionStatus::Subscribed {
4349                                automatic: is_automatic,
4350                            },
4351                            bump_stamp: None,
4352                        },
4353                    )])
4354                    .await?;
4355
4356                Ok(())
4357            }
4358
4359            Err(err) => {
4360                if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4361                    // In this case: the server indicates that the user unsubscribed *after* the
4362                    // event ID we've used in an automatic subscription; don't
4363                    // save the subscription state in the database, as the
4364                    // previous one should be more correct.
4365                    trace!("Thread subscription skipped: {err}");
4366                    Ok(())
4367                } else {
4368                    // Forward the error to the caller.
4369                    Err(err.into())
4370                }
4371            }
4372        }
4373    }
4374
4375    /// Subscribe to a thread if needed, based on a current subscription to it.
4376    ///
4377    /// This is like [`Self::subscribe_thread`], but it first checks if the user
4378    /// has already subscribed to a thread, so as to minimize sending
4379    /// unnecessary subscriptions which would be ignored by the server.
4380    pub async fn subscribe_thread_if_needed(
4381        &self,
4382        thread_root: &EventId,
4383        automatic: Option<OwnedEventId>,
4384    ) -> Result<()> {
4385        if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4386            // If we have a previous subscription, we should only send the new one if it's
4387            // manual and the previous one was automatic.
4388            if !prev_sub.automatic || automatic.is_some() {
4389                // Either we had already a manual subscription, or we had an automatic one and
4390                // the new one is automatic too: nothing to do!
4391                return Ok(());
4392            }
4393        }
4394        self.subscribe_thread(thread_root.to_owned(), automatic).await
4395    }
4396
4397    /// Unsubscribe from a given thread in this room.
4398    ///
4399    /// # Arguments
4400    ///
4401    /// - `thread_root`: The ID of the thread root event to unsubscribe to.
4402    ///
4403    /// # Returns
4404    ///
4405    /// - An `Ok` result if the unsubscription was successful, or the thread was
4406    ///   already unsubscribed.
4407    /// - A 404 error if the event isn't known, or isn't a thread root.
4408    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4409    pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4410        self.client
4411            .send(unsubscribe_thread::unstable::Request::new(
4412                self.room_id().to_owned(),
4413                thread_root.clone(),
4414            ))
4415            .await?;
4416
4417        trace!("Server acknowledged the thread subscription removal; removed it from db too");
4418
4419        // Immediately save the result into the database.
4420        self.client
4421            .state_store()
4422            .upsert_thread_subscriptions(vec![(
4423                self.room_id(),
4424                &thread_root,
4425                StoredThreadSubscription {
4426                    status: ThreadSubscriptionStatus::Unsubscribed,
4427                    bump_stamp: None,
4428                },
4429            )])
4430            .await?;
4431
4432        Ok(())
4433    }
4434
4435    /// Return the current thread subscription for the given thread root in this
4436    /// room.
4437    ///
4438    /// # Arguments
4439    ///
4440    /// - `thread_root`: The ID of the thread root event to get the subscription
4441    ///   for.
4442    ///
4443    /// # Returns
4444    ///
4445    /// - An `Ok` result with `Some(ThreadSubscription)` if we have some
4446    ///   subscription information.
4447    /// - An `Ok` result with `None` if the subscription does not exist, or the
4448    ///   event couldn't be found, or the event isn't a thread.
4449    /// - An error if the request fails for any other reason, such as a network
4450    ///   error.
4451    #[instrument(skip(self), fields(room_id = %self.room_id()))]
4452    pub async fn fetch_thread_subscription(
4453        &self,
4454        thread_root: OwnedEventId,
4455    ) -> Result<Option<ThreadSubscription>> {
4456        let result = self
4457            .client
4458            .send(get_thread_subscription::unstable::Request::new(
4459                self.room_id().to_owned(),
4460                thread_root.clone(),
4461            ))
4462            .await;
4463
4464        let subscription = match result {
4465            Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4466            Err(http_error) => match http_error.as_client_api_error() {
4467                Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4468                _ => return Err(http_error.into()),
4469            },
4470        };
4471
4472        // Keep the database in sync.
4473        if let Some(sub) = &subscription {
4474            self.client
4475                .state_store()
4476                .upsert_thread_subscriptions(vec![(
4477                    self.room_id(),
4478                    &thread_root,
4479                    StoredThreadSubscription {
4480                        status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4481                        bump_stamp: None,
4482                    },
4483                )])
4484                .await?;
4485        } else {
4486            // If the subscription was not found, remove it from the database.
4487            self.client
4488                .state_store()
4489                .remove_thread_subscription(self.room_id(), &thread_root)
4490                .await?;
4491        }
4492
4493        Ok(subscription)
4494    }
4495
4496    /// Return the current thread subscription for the given thread root in this
4497    /// room, by getting it from storage if possible, or fetching it from
4498    /// network otherwise.
4499    ///
4500    /// See also [`Self::fetch_thread_subscription`] for the exact semantics of
4501    /// this method.
4502    pub async fn load_or_fetch_thread_subscription(
4503        &self,
4504        thread_root: &EventId,
4505    ) -> Result<Option<ThreadSubscription>> {
4506        // If the thread subscriptions list is outdated, fetch from the server.
4507        if self.client.thread_subscription_catchup().is_outdated() {
4508            return self.fetch_thread_subscription(thread_root.to_owned()).await;
4509        }
4510
4511        // Otherwise, we can rely on the store information.
4512        Ok(self
4513            .client
4514            .state_store()
4515            .load_thread_subscription(self.room_id(), thread_root)
4516            .await
4517            .map(|maybe_sub| {
4518                maybe_sub.and_then(|stored| match stored.status {
4519                    ThreadSubscriptionStatus::Unsubscribed => None,
4520                    ThreadSubscriptionStatus::Subscribed { automatic } => {
4521                        Some(ThreadSubscription { automatic })
4522                    }
4523                })
4524            })?)
4525    }
4526
4527    /// Adds a new pinned event by sending an updated `m.room.pinned_events`
4528    /// event containing the new event id.
4529    ///
4530    /// This method will first try to get the pinned events from the current
4531    /// room's state and if it fails to do so it'll try to load them from the
4532    /// homeserver.
4533    ///
4534    /// Returns `true` if we pinned the event, `false` if the event was already
4535    /// pinned.
4536    pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
4537        let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4538            event_ids
4539        } else {
4540            self.load_pinned_events().await?.unwrap_or_default()
4541        };
4542        let event_id = event_id.to_owned();
4543        if pinned_event_ids.contains(&event_id) {
4544            Ok(false)
4545        } else {
4546            pinned_event_ids.push(event_id);
4547            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4548            self.send_state_event(content).await?;
4549            Ok(true)
4550        }
4551    }
4552
4553    /// Removes a pinned event by sending an updated `m.room.pinned_events`
4554    /// event without the event id we want to remove.
4555    ///
4556    /// This method will first try to get the pinned events from the current
4557    /// room's state and if it fails to do so it'll try to load them from the
4558    /// homeserver.
4559    ///
4560    /// Returns `true` if we unpinned the event, `false` if the event wasn't
4561    /// pinned before.
4562    pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
4563        let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4564            event_ids
4565        } else {
4566            self.load_pinned_events().await?.unwrap_or_default()
4567        };
4568        let event_id = event_id.to_owned();
4569        if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
4570            pinned_event_ids.remove(idx);
4571            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4572            self.send_state_event(content).await?;
4573            Ok(true)
4574        } else {
4575            Ok(false)
4576        }
4577    }
4578}
4579
4580#[cfg(feature = "e2e-encryption")]
4581impl RoomIdentityProvider for Room {
4582    fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4583        Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4584    }
4585
4586    fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4587        Box::pin(async {
4588            let members = self
4589                .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4590                .await
4591                .unwrap_or_else(|_| Default::default());
4592
4593            let mut ret: Vec<UserIdentity> = Vec::new();
4594            for member in members {
4595                if let Some(i) = self.user_identity(member.user_id()).await {
4596                    ret.push(i);
4597                }
4598            }
4599            ret
4600        })
4601    }
4602
4603    fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4604        Box::pin(async {
4605            self.client
4606                .encryption()
4607                .get_user_identity(user_id)
4608                .await
4609                .unwrap_or(None)
4610                .map(|u| u.underlying_identity())
4611        })
4612    }
4613}
4614
4615/// A wrapper for a weak client and a room id that allows to lazily retrieve a
4616/// room, only when needed.
4617#[derive(Clone, Debug)]
4618pub(crate) struct WeakRoom {
4619    client: WeakClient,
4620    room_id: OwnedRoomId,
4621}
4622
4623impl WeakRoom {
4624    /// Create a new `WeakRoom` given its weak components.
4625    pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4626        Self { client, room_id }
4627    }
4628
4629    /// Attempts to reconstruct the room.
4630    pub fn get(&self) -> Option<Room> {
4631        self.client.get().and_then(|client| client.get_room(&self.room_id))
4632    }
4633
4634    /// The room id for that room.
4635    pub fn room_id(&self) -> &RoomId {
4636        &self.room_id
4637    }
4638}
4639
4640/// Details of the (latest) invite.
4641#[derive(Debug, Clone)]
4642pub struct Invite {
4643    /// Who has been invited.
4644    pub invitee: RoomMember,
4645
4646    /// The user ID of who sent the invite.
4647    ///
4648    /// This is useful if `Self::inviter` is `None`.
4649    pub inviter_id: OwnedUserId,
4650
4651    /// Who sent the invite.
4652    ///
4653    /// If `None`, check `Self::inviter_id`, it might be useful as a fallback.
4654    pub inviter: Option<RoomMember>,
4655}
4656
4657#[derive(Error, Debug)]
4658enum InvitationError {
4659    #[error("No membership event found")]
4660    EventMissing,
4661}
4662
4663/// Receipts to send all at once.
4664#[derive(Debug, Clone, Default)]
4665#[non_exhaustive]
4666pub struct Receipts {
4667    /// Fully-read marker (room account data).
4668    pub fully_read: Option<OwnedEventId>,
4669    /// Read receipt (public ephemeral room event).
4670    pub public_read_receipt: Option<OwnedEventId>,
4671    /// Read receipt (private ephemeral room event).
4672    pub private_read_receipt: Option<OwnedEventId>,
4673}
4674
4675impl Receipts {
4676    /// Create an empty `Receipts`.
4677    pub fn new() -> Self {
4678        Self::default()
4679    }
4680
4681    /// Set the last event the user has read.
4682    ///
4683    /// It means that the user has read all the events before this event.
4684    ///
4685    /// This is a private marker only visible by the user.
4686    ///
4687    /// Note that this is technically not a receipt as it is persisted in the
4688    /// room account data.
4689    pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4690        self.fully_read = event_id.into();
4691        self
4692    }
4693
4694    /// Set the last event presented to the user and forward it to the other
4695    /// users in the room.
4696    ///
4697    /// This is used to reset the unread messages/notification count and
4698    /// advertise to other users the last event that the user has likely seen.
4699    pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4700        self.public_read_receipt = event_id.into();
4701        self
4702    }
4703
4704    /// Set the last event presented to the user and don't forward it.
4705    ///
4706    /// This is used to reset the unread messages/notification count.
4707    pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4708        self.private_read_receipt = event_id.into();
4709        self
4710    }
4711
4712    /// Whether this `Receipts` is empty.
4713    pub fn is_empty(&self) -> bool {
4714        self.fully_read.is_none()
4715            && self.public_read_receipt.is_none()
4716            && self.private_read_receipt.is_none()
4717    }
4718}
4719
4720/// [Parent space](https://spec.matrix.org/v1.8/client-server-api/#mspaceparent-relationships)
4721/// listed by a room, possibly validated by checking the space's state.
4722#[derive(Debug)]
4723pub enum ParentSpace {
4724    /// The room recognizes the given room as its parent, and the parent
4725    /// recognizes it as its child.
4726    Reciprocal(Room),
4727    /// The room recognizes the given room as its parent, but the parent does
4728    /// not recognizes it as its child. However, the author of the
4729    /// `m.space.parent` event in the room has a sufficient power level in the
4730    /// parent to create the child event.
4731    WithPowerlevel(Room),
4732    /// The room recognizes the given room as its parent, but the parent does
4733    /// not recognizes it as its child.
4734    Illegitimate(Room),
4735    /// The room recognizes the given id as its parent room, but we cannot check
4736    /// whether the parent recognizes it as its child.
4737    Unverifiable(OwnedRoomId),
4738}
4739
4740trait EventSource {
4741    fn get_event(
4742        &self,
4743        event_id: &EventId,
4744    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4745}
4746
4747impl EventSource for &Room {
4748    async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4749        self.load_or_fetch_event(event_id, None).await
4750    }
4751}
4752
4753/// Contains the current user's room member info and the optional room member
4754/// info of the sender of the `m.room.member` event that this info represents.
4755#[derive(Debug)]
4756pub struct RoomMemberWithSenderInfo {
4757    /// The actual room member.
4758    pub room_member: RoomMember,
4759    /// The info of the sender of the event `room_member` is based on, if
4760    /// available.
4761    pub sender_info: Option<RoomMember>,
4762}
4763
4764#[cfg(all(test, not(target_family = "wasm")))]
4765mod tests {
4766    use std::collections::BTreeMap;
4767
4768    use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4769    use matrix_sdk_test::{
4770        JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
4771    };
4772    use ruma::{
4773        RoomVersionId, event_id,
4774        events::{relation::RelationType, room::member::MembershipState},
4775        owned_event_id, room_id, user_id,
4776    };
4777    use wiremock::{
4778        Mock, MockServer, ResponseTemplate,
4779        matchers::{header, method, path_regex},
4780    };
4781
4782    use crate::{
4783        Client,
4784        config::RequestConfig,
4785        room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4786        test_utils::{
4787            client::mock_matrix_session,
4788            logged_in_client,
4789            mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4790        },
4791    };
4792
4793    #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4794    #[async_test]
4795    async fn test_cache_invalidation_while_encrypt() {
4796        use matrix_sdk_base::store::RoomLoadSettings;
4797        use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4798
4799        let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4800        let session = mock_matrix_session();
4801
4802        let client = Client::builder()
4803            .homeserver_url("http://localhost:1234")
4804            .request_config(RequestConfig::new().disable_retry())
4805            .sqlite_store(&sqlite_path, None)
4806            .build()
4807            .await
4808            .unwrap();
4809        client
4810            .matrix_auth()
4811            .restore_session(session.clone(), RoomLoadSettings::default())
4812            .await
4813            .unwrap();
4814
4815        client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4816
4817        // Mock receiving an event to create an internal room.
4818        let server = MockServer::start().await;
4819        {
4820            Mock::given(method("GET"))
4821                .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4822                .and(header("authorization", "Bearer 1234"))
4823                .respond_with(
4824                    ResponseTemplate::new(200)
4825                        .set_body_json(EventFactory::new().room_encryption().into_content()),
4826                )
4827                .mount(&server)
4828                .await;
4829            let f = EventFactory::new().sender(user_id!("@example:localhost"));
4830            let response = SyncResponseBuilder::default()
4831                .add_joined_room(
4832                    JoinedRoomBuilder::default()
4833                        .add_state_event(
4834                            f.member(user_id!("@example:localhost")).display_name("example"),
4835                        )
4836                        .add_state_event(f.default_power_levels())
4837                        .add_state_event(f.room_encryption()),
4838                )
4839                .build_sync_response();
4840            client.base_client().receive_sync_response(response).await.unwrap();
4841        }
4842
4843        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4844
4845        // Step 1, preshare the room keys.
4846        room.preshare_room_key().await.unwrap();
4847
4848        // Step 2, force lock invalidation by pretending another client obtained the
4849        // lock.
4850        {
4851            let client = Client::builder()
4852                .homeserver_url("http://localhost:1234")
4853                .request_config(RequestConfig::new().disable_retry())
4854                .sqlite_store(&sqlite_path, None)
4855                .build()
4856                .await
4857                .unwrap();
4858            client
4859                .matrix_auth()
4860                .restore_session(session.clone(), RoomLoadSettings::default())
4861                .await
4862                .unwrap();
4863            client
4864                .encryption()
4865                .enable_cross_process_store_lock("client2".to_owned())
4866                .await
4867                .unwrap();
4868
4869            let guard = client.encryption().spin_lock_store(None).await.unwrap();
4870            assert!(guard.is_some());
4871        }
4872
4873        // Step 3, take the crypto-store lock.
4874        let guard = client.encryption().spin_lock_store(None).await.unwrap();
4875        assert!(guard.is_some());
4876
4877        // Step 4, try to encrypt a message.
4878        let olm = client.olm_machine().await;
4879        let olm = olm.as_ref().expect("Olm machine wasn't started");
4880
4881        // Now pretend we're encrypting an event; the olm machine shouldn't rely on
4882        // caching the outgoing session before.
4883        let _encrypted_content = olm
4884            .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4885            .await
4886            .unwrap();
4887    }
4888
4889    #[async_test]
4890    async fn test_composer_draft() {
4891        use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4892
4893        let client = logged_in_client(None).await;
4894
4895        let response = SyncResponseBuilder::default()
4896            .add_joined_room(JoinedRoomBuilder::default())
4897            .build_sync_response();
4898        client.base_client().receive_sync_response(response).await.unwrap();
4899        let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4900
4901        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4902
4903        // Save 2 drafts, one for the room and one for a thread.
4904
4905        let draft = ComposerDraft {
4906            plain_text: "Hello, world!".to_owned(),
4907            html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4908            draft_type: ComposerDraftType::NewMessage,
4909            attachments: vec![DraftAttachment {
4910                filename: "cat.txt".to_owned(),
4911                content: matrix_sdk_base::DraftAttachmentContent::File {
4912                    data: b"meow".to_vec(),
4913                    mimetype: Some("text/plain".to_owned()),
4914                    size: Some(5),
4915                },
4916            }],
4917        };
4918
4919        room.save_composer_draft(draft.clone(), None).await.unwrap();
4920
4921        let thread_root = owned_event_id!("$thread_root:b.c");
4922        let thread_draft = ComposerDraft {
4923            plain_text: "Hello, thread!".to_owned(),
4924            html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4925            draft_type: ComposerDraftType::NewMessage,
4926            attachments: vec![DraftAttachment {
4927                filename: "dog.txt".to_owned(),
4928                content: matrix_sdk_base::DraftAttachmentContent::File {
4929                    data: b"wuv".to_vec(),
4930                    mimetype: Some("text/plain".to_owned()),
4931                    size: Some(4),
4932                },
4933            }],
4934        };
4935
4936        room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4937
4938        // Check that the room draft was saved correctly
4939        assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4940
4941        // Check that the thread draft was saved correctly
4942        assert_eq!(
4943            room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4944            Some(thread_draft.clone())
4945        );
4946
4947        // Clear the room draft
4948        room.clear_composer_draft(None).await.unwrap();
4949        assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4950
4951        // Check that the thread one is still there
4952        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4953
4954        // Clear the thread draft as well
4955        room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4956        assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4957    }
4958
4959    #[async_test]
4960    async fn test_mark_join_requests_as_seen() {
4961        let server = MatrixMockServer::new().await;
4962        let client = server.client_builder().build().await;
4963        let event_id = event_id!("$a:b.c");
4964        let room_id = room_id!("!a:b.c");
4965        let user_id = user_id!("@alice:b.c");
4966
4967        let f = EventFactory::new().room(room_id);
4968        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4969            f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4970        ]);
4971        let room = server.sync_room(&client, joined_room_builder).await;
4972
4973        // When loading the initial seen ids, there are none
4974        let seen_ids =
4975            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4976        assert!(seen_ids.is_empty());
4977
4978        // We mark a random event id as seen
4979        room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4980            .await
4981            .expect("Couldn't mark join request as seen");
4982
4983        // Then we can check it was successfully marked as seen
4984        let seen_ids =
4985            room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4986        assert_eq!(seen_ids.len(), 1);
4987        assert_eq!(
4988            seen_ids.into_iter().next().expect("No next value"),
4989            (event_id.to_owned(), user_id.to_owned())
4990        )
4991    }
4992
4993    #[async_test]
4994    async fn test_own_room_membership_with_no_own_member_event() {
4995        let server = MatrixMockServer::new().await;
4996        let client = server.client_builder().build().await;
4997        let room_id = room_id!("!a:b.c");
4998
4999        let room = server.sync_joined_room(&client, room_id).await;
5000
5001        // Since there is no member event for the own user, the method fails.
5002        // This should never happen in an actual room.
5003        let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
5004        assert!(error.is_some());
5005    }
5006
5007    #[async_test]
5008    async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
5009        let server = MatrixMockServer::new().await;
5010        let client = server.client_builder().build().await;
5011        let room_id = room_id!("!a:b.c");
5012        let user_id = user_id!("@example:localhost");
5013
5014        let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5015        let joined_room_builder =
5016            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5017        let room = server.sync_room(&client, joined_room_builder).await;
5018
5019        // When we load the membership details
5020        let ret = room
5021            .member_with_sender_info(client.user_id().unwrap())
5022            .await
5023            .expect("Room member info should be available");
5024
5025        // We get the member info for the current user
5026        assert_eq!(ret.room_member.event().user_id(), user_id);
5027
5028        // But there is no info for the sender
5029        assert!(ret.sender_info.is_none());
5030    }
5031
5032    #[async_test]
5033    async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5034        let server = MatrixMockServer::new().await;
5035        let client = server.client_builder().build().await;
5036        let room_id = room_id!("!a:b.c");
5037        let user_id = user_id!("@example:localhost");
5038
5039        let f = EventFactory::new().room(room_id).sender(user_id);
5040        let joined_room_builder =
5041            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5042        let room = server.sync_room(&client, joined_room_builder).await;
5043
5044        // When we load the membership details
5045        let ret = room
5046            .member_with_sender_info(client.user_id().unwrap())
5047            .await
5048            .expect("Room member info should be available");
5049
5050        // We get the current user's member info
5051        assert_eq!(ret.room_member.event().user_id(), user_id);
5052
5053        // And the sender has the same info, since it's also the current user
5054        assert!(ret.sender_info.is_some());
5055        assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5056    }
5057
5058    #[async_test]
5059    async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5060        let server = MatrixMockServer::new().await;
5061        let client = server.client_builder().build().await;
5062        let room_id = room_id!("!a:b.c");
5063        let user_id = user_id!("@example:localhost");
5064        let sender_id = user_id!("@alice:b.c");
5065
5066        let f = EventFactory::new().room(room_id).sender(sender_id);
5067        let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5068            f.member(user_id).into(),
5069            // The sender info comes from the sync
5070            f.member(sender_id).into(),
5071        ]);
5072        let room = server.sync_room(&client, joined_room_builder).await;
5073
5074        // When we load the membership details
5075        let ret = room
5076            .member_with_sender_info(client.user_id().unwrap())
5077            .await
5078            .expect("Room member info should be available");
5079
5080        // We get the current user's member info
5081        assert_eq!(ret.room_member.event().user_id(), user_id);
5082
5083        // And also the sender info from the events received in the sync
5084        assert!(ret.sender_info.is_some());
5085        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5086    }
5087
5088    #[async_test]
5089    async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5090        let server = MatrixMockServer::new().await;
5091        let client = server.client_builder().build().await;
5092        let room_id = room_id!("!a:b.c");
5093        let user_id = user_id!("@example:localhost");
5094        let sender_id = user_id!("@alice:b.c");
5095
5096        let f = EventFactory::new().room(room_id).sender(sender_id);
5097        let joined_room_builder =
5098            JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5099        let room = server.sync_room(&client, joined_room_builder).await;
5100
5101        // We'll receive the member info through the /members endpoint
5102        server
5103            .mock_get_members()
5104            .ok(vec![f.member(sender_id).into_raw()])
5105            .mock_once()
5106            .mount()
5107            .await;
5108
5109        // We get the current user's member info
5110        let ret = room
5111            .member_with_sender_info(client.user_id().unwrap())
5112            .await
5113            .expect("Room member info should be available");
5114
5115        // We get the current user's member info
5116        assert_eq!(ret.room_member.event().user_id(), user_id);
5117
5118        // And also the sender info from the /members endpoint
5119        assert!(ret.sender_info.is_some());
5120        assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5121    }
5122
5123    #[async_test]
5124    async fn test_list_threads() {
5125        let server = MatrixMockServer::new().await;
5126        let client = server.client_builder().build().await;
5127
5128        let room_id = room_id!("!a:b.c");
5129        let sender_id = user_id!("@alice:b.c");
5130        let f = EventFactory::new().room(room_id).sender(sender_id);
5131
5132        let eid1 = event_id!("$1");
5133        let eid2 = event_id!("$2");
5134        let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5135        let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5136
5137        server
5138            .mock_room_threads()
5139            .ok(batch1.clone(), Some("prev_batch".to_owned()))
5140            .mock_once()
5141            .mount()
5142            .await;
5143        server
5144            .mock_room_threads()
5145            .match_from("prev_batch")
5146            .ok(batch2, None)
5147            .mock_once()
5148            .mount()
5149            .await;
5150
5151        let room = server.sync_joined_room(&client, room_id).await;
5152        let result =
5153            room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5154        assert_eq!(result.chunk.len(), 1);
5155        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5156        assert!(result.prev_batch_token.is_some());
5157
5158        let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5159        let result = room.list_threads(opts).await.expect("Failed to list threads");
5160        assert_eq!(result.chunk.len(), 1);
5161        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5162        assert!(result.prev_batch_token.is_none());
5163    }
5164
5165    #[async_test]
5166    async fn test_relations() {
5167        let server = MatrixMockServer::new().await;
5168        let client = server.client_builder().build().await;
5169
5170        let room_id = room_id!("!a:b.c");
5171        let sender_id = user_id!("@alice:b.c");
5172        let f = EventFactory::new().room(room_id).sender(sender_id);
5173
5174        let target_event_id = owned_event_id!("$target");
5175        let eid1 = event_id!("$1");
5176        let eid2 = event_id!("$2");
5177        let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5178        let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5179
5180        server
5181            .mock_room_relations()
5182            .match_target_event(target_event_id.clone())
5183            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5184            .mock_once()
5185            .mount()
5186            .await;
5187
5188        server
5189            .mock_room_relations()
5190            .match_target_event(target_event_id.clone())
5191            .match_from("next_batch")
5192            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5193            .mock_once()
5194            .mount()
5195            .await;
5196
5197        let room = server.sync_joined_room(&client, room_id).await;
5198
5199        // Main endpoint: no relation type filtered out.
5200        let mut opts = RelationsOptions {
5201            include_relations: IncludeRelations::AllRelations,
5202            ..Default::default()
5203        };
5204        let result = room
5205            .relations(target_event_id.clone(), opts.clone())
5206            .await
5207            .expect("Failed to list relations the first time");
5208        assert_eq!(result.chunk.len(), 1);
5209        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5210        assert!(result.prev_batch_token.is_none());
5211        assert!(result.next_batch_token.is_some());
5212        assert!(result.recursion_depth.is_none());
5213
5214        opts.from = result.next_batch_token;
5215        let result = room
5216            .relations(target_event_id, opts)
5217            .await
5218            .expect("Failed to list relations the second time");
5219        assert_eq!(result.chunk.len(), 1);
5220        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5221        assert!(result.prev_batch_token.is_none());
5222        assert!(result.next_batch_token.is_none());
5223        assert!(result.recursion_depth.is_none());
5224    }
5225
5226    #[async_test]
5227    async fn test_relations_with_reltype() {
5228        let server = MatrixMockServer::new().await;
5229        let client = server.client_builder().build().await;
5230
5231        let room_id = room_id!("!a:b.c");
5232        let sender_id = user_id!("@alice:b.c");
5233        let f = EventFactory::new().room(room_id).sender(sender_id);
5234
5235        let target_event_id = owned_event_id!("$target");
5236        let eid1 = event_id!("$1");
5237        let eid2 = event_id!("$2");
5238        let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5239        let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5240
5241        server
5242            .mock_room_relations()
5243            .match_target_event(target_event_id.clone())
5244            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5245            .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5246            .mock_once()
5247            .mount()
5248            .await;
5249
5250        server
5251            .mock_room_relations()
5252            .match_target_event(target_event_id.clone())
5253            .match_from("next_batch")
5254            .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5255            .ok(RoomRelationsResponseTemplate::default().events(batch2))
5256            .mock_once()
5257            .mount()
5258            .await;
5259
5260        let room = server.sync_joined_room(&client, room_id).await;
5261
5262        // Reltype-filtered endpoint, for threads \o/
5263        let mut opts = RelationsOptions {
5264            include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5265            ..Default::default()
5266        };
5267        let result = room
5268            .relations(target_event_id.clone(), opts.clone())
5269            .await
5270            .expect("Failed to list relations the first time");
5271        assert_eq!(result.chunk.len(), 1);
5272        assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5273        assert!(result.prev_batch_token.is_none());
5274        assert!(result.next_batch_token.is_some());
5275        assert!(result.recursion_depth.is_none());
5276
5277        opts.from = result.next_batch_token;
5278        let result = room
5279            .relations(target_event_id, opts)
5280            .await
5281            .expect("Failed to list relations the second time");
5282        assert_eq!(result.chunk.len(), 1);
5283        assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5284        assert!(result.prev_batch_token.is_none());
5285        assert!(result.next_batch_token.is_none());
5286        assert!(result.recursion_depth.is_none());
5287    }
5288
5289    #[async_test]
5290    async fn test_power_levels_computation() {
5291        let server = MatrixMockServer::new().await;
5292        let client = server.client_builder().build().await;
5293
5294        let room_id = room_id!("!a:b.c");
5295        let sender_id = client.user_id().expect("No session id");
5296        let f = EventFactory::new().room(room_id).sender(sender_id);
5297        let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5298
5299        // Computing the power levels will need these 3 state events:
5300        let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5301        let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5302        let room_member_event = f.member(sender_id).into();
5303
5304        // With only the room member event
5305        let room = server
5306            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5307            .await;
5308        let ctx = room
5309            .push_condition_room_ctx()
5310            .await
5311            .expect("Failed to get push condition context")
5312            .expect("Could not get push condition context");
5313
5314        // The internal power levels couldn't be computed
5315        assert!(ctx.power_levels.is_none());
5316
5317        // Adding the room creation event
5318        let room = server
5319            .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5320            .await;
5321        let ctx = room
5322            .push_condition_room_ctx()
5323            .await
5324            .expect("Failed to get push condition context")
5325            .expect("Could not get push condition context");
5326
5327        // The internal power levels still couldn't be computed
5328        assert!(ctx.power_levels.is_none());
5329
5330        // With the room member, room creation and the power levels events
5331        let room = server
5332            .sync_room(
5333                &client,
5334                JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5335            )
5336            .await;
5337        let ctx = room
5338            .push_condition_room_ctx()
5339            .await
5340            .expect("Failed to get push condition context")
5341            .expect("Could not get push condition context");
5342
5343        // The internal power levels can finally be computed
5344        assert!(ctx.power_levels.is_some());
5345    }
5346}