1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39 IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44 StateChanges, StateStoreDataKey, StateStoreDataValue,
45 deserialized_responses::{
46 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47 },
48 media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49 serde_helpers::extract_relation,
50 store::{StateStoreExt, ThreadSubscriptionStatus},
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
54#[cfg(feature = "e2e-encryption")]
55use matrix_sdk_common::BoxFuture;
56use matrix_sdk_common::{
57 deserialized_responses::TimelineEvent,
58 executor::{JoinHandle, spawn},
59 timeout::timeout,
60};
61#[cfg(feature = "experimental-search")]
62use matrix_sdk_search::error::IndexError;
63#[cfg(feature = "experimental-search")]
64#[cfg(doc)]
65use matrix_sdk_search::index::RoomIndex;
66use mime::Mime;
67use reply::Reply;
68#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
69use ruma::events::AnySyncMessageLikeEvent;
70#[cfg(feature = "experimental-encrypted-state-events")]
71use ruma::events::AnySyncStateEvent;
72#[cfg(feature = "unstable-msc4274")]
73use ruma::events::room::message::GalleryItemType;
74#[cfg(feature = "e2e-encryption")]
75use ruma::events::{
76 AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
77};
78use ruma::{
79 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
80 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
81 api::client::{
82 config::{set_global_account_data, set_room_account_data},
83 context,
84 error::ErrorKind,
85 filter::LazyLoadOptions,
86 membership::{
87 Invite3pid, ban_user, forget_room, get_member_events,
88 invite_user::{self, v3::InvitationRecipient},
89 kick_user, leave_room, unban_user,
90 },
91 message::send_message_event,
92 read_marker::set_read_marker,
93 receipt::create_receipt,
94 redact::redact_event,
95 room::{get_room_event, report_content, report_room},
96 state::{get_state_event_for_key, send_state_event},
97 tag::{create_tag, delete_tag},
98 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
99 typing::create_typing_event::{self, v3::Typing},
100 },
101 assign,
102 events::{
103 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
104 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
105 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
106 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
107 StaticStateEventContent, SyncStateEvent,
108 beacon::BeaconEventContent,
109 beacon_info::BeaconInfoEventContent,
110 direct::DirectEventContent,
111 marked_unread::MarkedUnreadEventContent,
112 receipt::{Receipt, ReceiptThread, ReceiptType},
113 relation::RelationType,
114 room::{
115 ImageInfo, MediaSource, ThumbnailInfo,
116 avatar::{self, RoomAvatarEventContent},
117 encryption::RoomEncryptionEventContent,
118 history_visibility::HistoryVisibility,
119 member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
120 message::{
121 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
122 ImageMessageEventContent, MessageType, RoomMessageEventContent,
123 TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
124 UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
125 },
126 name::RoomNameEventContent,
127 pinned_events::RoomPinnedEventsEventContent,
128 power_levels::{
129 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
130 },
131 server_acl::RoomServerAclEventContent,
132 topic::RoomTopicEventContent,
133 },
134 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
135 tag::{TagInfo, TagName},
136 typing::SyncTypingEvent,
137 },
138 int,
139 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
140 serde::Raw,
141 time::Instant,
142 uint,
143};
144#[cfg(feature = "experimental-encrypted-state-events")]
145use ruma::{
146 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
147 serde::JsonCastable,
148};
149use serde::de::DeserializeOwned;
150use thiserror::Error;
151use tokio::{join, sync::broadcast};
152use tracing::{debug, error, info, instrument, trace, warn};
153
154use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
155pub use self::{
156 member::{RoomMember, RoomMemberRole},
157 messages::{
158 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
159 Relations, RelationsOptions, ThreadRoots,
160 },
161};
162#[cfg(feature = "e2e-encryption")]
163use crate::encryption::backups::BackupState;
164#[cfg(doc)]
165use crate::event_cache::EventCache;
166#[cfg(feature = "experimental-encrypted-state-events")]
167use crate::room::futures::{SendRawStateEvent, SendStateEvent};
168use crate::{
169 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
170 attachment::{AttachmentConfig, AttachmentInfo},
171 client::WeakClient,
172 config::RequestConfig,
173 error::{BeaconError, WrongRoomState},
174 event_cache::{self, EventCacheDropHandles, RoomEventCache},
175 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
176 live_location_share::ObservableLiveLocation,
177 media::{MediaFormat, MediaRequestParameters},
178 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
179 room::{
180 knock_requests::{KnockRequest, KnockRequestMemberInfo},
181 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
182 privacy_settings::RoomPrivacySettings,
183 },
184 sync::RoomUpdate,
185 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
186};
187
188pub mod edit;
189pub mod futures;
190pub mod identity_status_changes;
191pub mod knock_requests;
193mod member;
194mod messages;
195pub mod power_levels;
196pub mod reply;
197
198pub mod calls;
199
200pub mod privacy_settings;
202
203#[cfg(feature = "e2e-encryption")]
204pub(crate) mod shared_room_history;
205
206#[derive(Debug, Clone)]
209pub struct Room {
210 inner: BaseRoom,
211 pub(crate) client: Client,
212}
213
214impl Deref for Room {
215 type Target = BaseRoom;
216
217 fn deref(&self) -> &Self::Target {
218 &self.inner
219 }
220}
221
222const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
223const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
224
225#[derive(Debug, Clone, Copy, PartialEq, Eq)]
227pub struct ThreadSubscription {
228 pub automatic: bool,
231}
232
233#[derive(Debug)]
235pub struct PushContext {
236 push_condition_room_ctx: PushConditionRoomCtx,
238
239 push_rules: Ruleset,
242}
243
244impl PushContext {
245 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
247 Self { push_condition_room_ctx, push_rules }
248 }
249
250 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
252 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
253 }
254
255 #[doc(hidden)]
258 #[instrument(skip_all)]
259 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
260 let rules = self
261 .push_rules
262 .iter()
263 .filter_map(|r| {
264 if !r.enabled() {
265 return None;
266 }
267
268 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
269
270 let conditions = match r {
271 AnyPushRuleRef::Override(r) => {
272 format!("{:?}", r.conditions)
273 }
274 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
275 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
276 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
277 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
278 _ => "<unknown push rule kind>".to_owned(),
279 };
280
281 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
282 })
283 .collect::<Vec<_>>()
284 .join("\n");
285 trace!("rules:\n\n{rules}\n\n");
286
287 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
288
289 if let Some(found) = found {
290 trace!("rule {} matched", found.rule_id());
291 found.actions().to_owned()
292 } else {
293 trace!("no match");
294 Vec::new()
295 }
296 }
297}
298
299macro_rules! make_media_type {
300 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
301 let (body, formatted, filename) = match $caption {
305 Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
306 None => ($filename, None, None),
307 };
308
309 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
310
311 match $content_type.type_() {
312 mime::IMAGE => {
313 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
314 mimetype: Some($content_type.as_ref().to_owned()),
315 thumbnail_source,
316 thumbnail_info
317 });
318 let content = assign!(ImageMessageEventContent::new(body, $source), {
319 info: Some(Box::new(info)),
320 formatted,
321 filename
322 });
323 <$t>::Image(content)
324 }
325
326 mime::AUDIO => {
327 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
328 formatted,
329 filename
330 });
331
332 if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
333 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
334 let waveform = waveform_vec
335 .iter()
336 .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
337 .map(Into::into)
338 .collect();
339 content.audio =
340 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
341 }
342
343 if matches!($info, Some(AttachmentInfo::Voice(_))) {
344 content.voice = Some(UnstableVoiceContentBlock::new());
345 }
346
347 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
348 audio_info.mimetype = Some($content_type.as_ref().to_owned());
349 let content = content.info(Box::new(audio_info));
350
351 <$t>::Audio(content)
352 }
353
354 mime::VIDEO => {
355 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
356 mimetype: Some($content_type.as_ref().to_owned()),
357 thumbnail_source,
358 thumbnail_info
359 });
360 let content = assign!(VideoMessageEventContent::new(body, $source), {
361 info: Some(Box::new(info)),
362 formatted,
363 filename
364 });
365 <$t>::Video(content)
366 }
367
368 _ => {
369 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
370 mimetype: Some($content_type.as_ref().to_owned()),
371 thumbnail_source,
372 thumbnail_info
373 });
374 let content = assign!(FileMessageEventContent::new(body, $source), {
375 info: Some(Box::new(info)),
376 formatted,
377 filename,
378 });
379 <$t>::File(content)
380 }
381 }
382 }};
383}
384
385impl Room {
386 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
393 Self { inner: room, client }
394 }
395
396 #[doc(alias = "reject_invitation")]
402 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
403 async fn leave_impl(&self) -> (Result<()>, &Room) {
404 let state = self.state();
405 if state == RoomState::Left {
406 return (
407 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
408 "Joined or Invited",
409 state,
410 )))),
411 self,
412 );
413 }
414
415 let should_forget = matches!(self.state(), RoomState::Invited);
418
419 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
420 let response = self.client.send(request).await;
421
422 if let Err(error) = response {
425 #[allow(clippy::collapsible_match)]
426 let ignore_error = if let Some(error) = error.client_api_error_kind() {
427 match error {
428 ErrorKind::Forbidden { .. } => true,
431 _ => false,
432 }
433 } else {
434 false
435 };
436
437 error!(?error, ignore_error, should_forget, "Failed to leave the room");
438
439 if !ignore_error {
440 return (Err(error.into()), self);
441 }
442 }
443
444 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
445 return (Err(e.into()), self);
446 }
447
448 if should_forget {
449 trace!("Trying to forget the room");
450
451 if let Err(error) = self.forget().await {
452 error!(?error, "Failed to forget the room");
453 }
454 }
455
456 (Ok(()), self)
457 }
458
459 pub async fn leave(&self) -> Result<()> {
467 let mut rooms: Vec<Room> = vec![self.clone()];
468 let mut current_room = self;
469
470 while let Some(predecessor) = current_room.predecessor_room() {
471 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
472
473 if let Some(predecessor_room) = maybe_predecessor_room {
474 rooms.push(predecessor_room.clone());
475 current_room = rooms.last().expect("Room just pushed so can't be empty");
476 } else {
477 warn!("Cannot find predecessor room");
478 break;
479 }
480 }
481
482 let batch_size = 5;
483
484 let rooms_futures: Vec<_> = rooms
485 .iter()
486 .filter_map(|room| match room.state() {
487 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
488 Some(room.leave_impl())
489 }
490 RoomState::Banned | RoomState::Left => None,
491 })
492 .collect();
493
494 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
495
496 let mut maybe_this_room_failed_with: Option<Error> = None;
497
498 while let Some(result) = futures_stream.next().await {
499 if let (Err(e), room) = result {
500 if room.room_id() == self.room_id() {
501 maybe_this_room_failed_with = Some(e);
502 } else {
503 warn!("Failure while attempting to leave predecessor room: {e:?}");
504 }
505 }
506 }
507
508 maybe_this_room_failed_with.map_or(Ok(()), Err)
509 }
510
511 #[doc(alias = "accept_invitation")]
515 pub async fn join(&self) -> Result<()> {
516 let prev_room_state = self.inner.state();
517
518 if prev_room_state == RoomState::Joined {
519 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
520 "Invited or Left",
521 prev_room_state,
522 ))));
523 }
524
525 self.client.join_room_by_id(self.room_id()).await?;
526
527 Ok(())
528 }
529
530 pub fn client(&self) -> Client {
534 self.client.clone()
535 }
536
537 pub fn is_synced(&self) -> bool {
540 self.inner.is_state_fully_synced()
541 }
542
543 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
573 let Some(url) = self.avatar_url() else { return Ok(None) };
574 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
575 Ok(Some(self.client.media().get_media_content(&request, true).await?))
576 }
577
578 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
607 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
608 let room_id = self.inner.room_id();
609 let request = options.into_request(room_id);
610 let http_response = self.client.send(request).await?;
611
612 let push_ctx = self.push_context().await?;
613 let chunk = join_all(
614 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
615 )
616 .await;
617
618 Ok(Messages {
619 start: http_response.start,
620 end: http_response.end,
621 chunk,
622 state: http_response.state,
623 })
624 }
625
626 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
636 where
637 Ev: SyncEvent + DeserializeOwned + Send + 'static,
638 H: EventHandler<Ev, Ctx>,
639 {
640 self.client.add_room_event_handler(self.room_id(), handler)
641 }
642
643 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
648 self.client.subscribe_to_room_updates(self.room_id())
649 }
650
651 pub fn subscribe_to_typing_notifications(
657 &self,
658 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
659 let (sender, receiver) = broadcast::channel(16);
660 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
661 let own_user_id = self.own_user_id().to_owned();
662 move |event: SyncTypingEvent| async move {
663 let typing_user_ids = event
665 .content
666 .user_ids
667 .into_iter()
668 .filter(|user_id| *user_id != own_user_id)
669 .collect();
670 let _ = sender.send(typing_user_ids);
672 }
673 });
674 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
675 (drop_guard, receiver)
676 }
677
678 #[cfg(feature = "e2e-encryption")]
701 pub async fn subscribe_to_identity_status_changes(
702 &self,
703 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
704 IdentityStatusChanges::create_stream(self.clone()).await
705 }
706
707 #[cfg(not(feature = "experimental-encrypted-state-events"))]
712 #[allow(clippy::unused_async)] async fn try_decrypt_event(
714 &self,
715 event: Raw<AnyTimelineEvent>,
716 push_ctx: Option<&PushContext>,
717 ) -> TimelineEvent {
718 #[cfg(feature = "e2e-encryption")]
719 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
720 SyncMessageLikeEvent::Original(_),
721 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
722 && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
723 {
724 return event;
725 }
726
727 let mut event = TimelineEvent::from_plaintext(event.cast());
728 if let Some(push_ctx) = push_ctx {
729 event.set_push_actions(push_ctx.for_event(event.raw()).await);
730 }
731
732 event
733 }
734
735 #[cfg(feature = "experimental-encrypted-state-events")]
740 #[allow(clippy::unused_async)] async fn try_decrypt_event(
742 &self,
743 event: Raw<AnyTimelineEvent>,
744 push_ctx: Option<&PushContext>,
745 ) -> TimelineEvent {
746 match event.deserialize_as::<AnySyncTimelineEvent>() {
748 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
749 SyncMessageLikeEvent::Original(_),
750 ))) => {
751 if let Ok(event) = self
752 .decrypt_event(
753 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
754 push_ctx,
755 )
756 .await
757 {
758 return event;
759 }
760 }
761 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
762 SyncStateEvent::Original(_),
763 ))) => {
764 if let Ok(event) = self
765 .decrypt_event(
766 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
767 push_ctx,
768 )
769 .await
770 {
771 return event;
772 }
773 }
774 _ => {}
775 }
776
777 let mut event = TimelineEvent::from_plaintext(event.cast());
778 if let Some(push_ctx) = push_ctx {
779 event.set_push_actions(push_ctx.for_event(event.raw()).await);
780 }
781
782 event
783 }
784
785 pub async fn event(
790 &self,
791 event_id: &EventId,
792 request_config: Option<RequestConfig>,
793 ) -> Result<TimelineEvent> {
794 let request =
795 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
796
797 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
798 let push_ctx = self.push_context().await?;
799 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
800
801 if let Ok((cache, _handles)) = self.event_cache().await {
803 cache.save_events([event.clone()]).await;
804 }
805
806 Ok(event)
807 }
808
809 pub async fn load_or_fetch_event(
816 &self,
817 event_id: &EventId,
818 request_config: Option<RequestConfig>,
819 ) -> Result<TimelineEvent> {
820 match self.event_cache().await {
821 Ok((event_cache, _drop_handles)) => {
822 if let Some(event) = event_cache.find_event(event_id).await? {
823 return Ok(event);
824 }
825 }
827 Err(err) => {
828 debug!("error when getting the event cache: {err}");
829 }
830 }
831 self.event(event_id, request_config).await
832 }
833
834 pub async fn load_or_fetch_event_with_relations(
852 &self,
853 event_id: &EventId,
854 filter: Option<Vec<RelationType>>,
855 request_config: Option<RequestConfig>,
856 ) -> Result<(TimelineEvent, Vec<TimelineEvent>)> {
857 let fetch_relations = async || {
858 let include_relations = if let Some(filter) = &filter
865 && filter.len() == 1
866 {
867 IncludeRelations::RelationsOfType(filter[0].clone())
868 } else {
869 IncludeRelations::AllRelations
870 };
871
872 let mut opts = RelationsOptions {
873 include_relations,
874 recurse: true,
875 limit: Some(uint!(256)),
876 ..Default::default()
877 };
878
879 let mut events = Vec::new();
880 loop {
881 match self.relations(event_id.to_owned(), opts.clone()).await {
882 Ok(relations) => {
883 if let Some(filter) = filter.as_ref() {
884 events.extend(relations.chunk.into_iter().filter_map(|ev| {
886 let (rel_type, _) = extract_relation(ev.raw())?;
887 filter
888 .iter()
889 .any(|ruma_filter| ruma_filter == &rel_type)
890 .then_some(ev)
891 }));
892 } else {
893 events.extend(relations.chunk);
895 }
896
897 if let Some(next_from) = relations.next_batch_token {
898 opts.from = Some(next_from);
899 } else {
900 break events;
901 }
902 }
903
904 Err(err) => {
905 warn!(%event_id, "error when loading relations of pinned event from server: {err}");
906 break events;
907 }
908 }
909 }
910 };
911
912 let event_cache = match self.event_cache().await {
915 Ok((event_cache, drop_handles)) => {
916 if let Some((event, mut relations)) =
917 event_cache.find_event_with_relations(event_id, filter.clone()).await?
918 {
919 if relations.is_empty() {
920 relations = fetch_relations().await;
923 }
924
925 return Ok((event, relations));
926 }
927
928 Some((event_cache, drop_handles))
930 }
931
932 Err(err) => {
933 debug!("error when getting the event cache: {err}");
934 None
936 }
937 };
938
939 let event = self.event(event_id, request_config).await?;
942
943 if let Some((event_cache, _drop_handles)) = event_cache
945 && let Some(relations) =
946 event_cache.find_event_relations(event_id, filter.clone()).await.ok()
947 && !relations.is_empty()
948 {
949 return Ok((event, relations));
950 }
951
952 Ok((event, fetch_relations().await))
955 }
956
957 pub async fn event_with_context(
960 &self,
961 event_id: &EventId,
962 lazy_load_members: bool,
963 context_size: UInt,
964 request_config: Option<RequestConfig>,
965 ) -> Result<EventWithContextResponse> {
966 let mut request =
967 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
968
969 request.limit = context_size;
970
971 if lazy_load_members {
972 request.filter.lazy_load_options =
973 LazyLoadOptions::Enabled { include_redundant_members: false };
974 }
975
976 let response = self.client.send(request).with_request_config(request_config).await?;
977
978 let push_ctx = self.push_context().await?;
979 let push_ctx = push_ctx.as_ref();
980 let target_event = if let Some(event) = response.event {
981 Some(self.try_decrypt_event(event, push_ctx).await)
982 } else {
983 None
984 };
985
986 let (events_before, events_after) = join!(
990 join_all(
991 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
992 ),
993 join_all(
994 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
995 ),
996 );
997
998 if let Ok((cache, _handles)) = self.event_cache().await {
1000 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
1001 if let Some(event) = &target_event {
1002 events_to_save.push(event.clone());
1003 }
1004
1005 for event in &events_before {
1006 events_to_save.push(event.clone());
1007 }
1008
1009 for event in &events_after {
1010 events_to_save.push(event.clone());
1011 }
1012
1013 cache.save_events(events_to_save).await;
1014 }
1015
1016 Ok(EventWithContextResponse {
1017 event: target_event,
1018 events_before,
1019 events_after,
1020 state: response.state,
1021 prev_batch_token: response.start,
1022 next_batch_token: response.end,
1023 })
1024 }
1025
1026 pub(crate) async fn request_members(&self) -> Result<()> {
1027 self.client
1028 .locks()
1029 .members_request_deduplicated_handler
1030 .run(self.room_id().to_owned(), async move {
1031 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
1032 let response = self
1033 .client
1034 .send(request.clone())
1035 .with_request_config(
1036 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
1039 )
1040 .await?;
1041
1042 Box::pin(self.client.base_client().receive_all_members(
1044 self.room_id(),
1045 &request,
1046 &response,
1047 ))
1048 .await?;
1049
1050 Ok(())
1051 })
1052 .await
1053 }
1054
1055 pub async fn request_encryption_state(&self) -> Result<()> {
1060 if !self.inner.encryption_state().is_unknown() {
1061 return Ok(());
1062 }
1063
1064 self.client
1065 .locks()
1066 .encryption_state_deduplicated_handler
1067 .run(self.room_id().to_owned(), async move {
1068 let request = get_state_event_for_key::v3::Request::new(
1070 self.room_id().to_owned(),
1071 StateEventType::RoomEncryption,
1072 "".to_owned(),
1073 );
1074 let response = match self.client.send(request).await {
1075 Ok(response) => Some(
1076 response
1077 .into_content()
1078 .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
1079 ),
1080 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
1081 Err(err) => return Err(err.into()),
1082 };
1083
1084 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
1085
1086 let mut room_info = self.clone_info();
1089 room_info.mark_encryption_state_synced();
1090 room_info.set_encryption_event(response.clone());
1091 let mut changes = StateChanges::default();
1092 changes.add_room(room_info.clone());
1093
1094 self.client.state_store().save_changes(&changes).await?;
1095 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1096
1097 Ok(())
1098 })
1099 .await
1100 }
1101
1102 pub fn encryption_state(&self) -> EncryptionState {
1107 self.inner.encryption_state()
1108 }
1109
1110 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
1116 self.request_encryption_state().await?;
1117
1118 Ok(self.encryption_state())
1119 }
1120
1121 #[cfg(feature = "e2e-encryption")]
1123 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
1124 let encryption = self.client.encryption();
1125
1126 let this_device_is_verified = match encryption.get_own_device().await {
1127 Ok(Some(device)) => device.is_verified_with_cross_signing(),
1128
1129 _ => true,
1131 };
1132
1133 let backup_exists_on_server =
1134 encryption.backups().exists_on_server().await.unwrap_or(false);
1135
1136 CryptoContextInfo {
1137 device_creation_ts: encryption.device_creation_timestamp().await,
1138 this_device_is_verified,
1139 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1140 backup_exists_on_server,
1141 }
1142 }
1143
1144 fn are_events_visible(&self) -> bool {
1145 if let RoomState::Invited = self.inner.state() {
1146 return matches!(
1147 self.inner.history_visibility_or_default(),
1148 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1149 );
1150 }
1151
1152 true
1153 }
1154
1155 pub async fn sync_members(&self) -> Result<()> {
1161 if !self.are_events_visible() {
1162 return Ok(());
1163 }
1164
1165 if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1166 }
1167
1168 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1182 self.sync_members().await?;
1183 self.get_member_no_sync(user_id).await
1184 }
1185
1186 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1200 Ok(self
1201 .inner
1202 .get_member(user_id)
1203 .await?
1204 .map(|member| RoomMember::new(self.client.clone(), member)))
1205 }
1206
1207 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1216 self.sync_members().await?;
1217 self.members_no_sync(memberships).await
1218 }
1219
1220 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1229 Ok(self
1230 .inner
1231 .members(memberships)
1232 .await?
1233 .into_iter()
1234 .map(|member| RoomMember::new(self.client.clone(), member))
1235 .collect())
1236 }
1237
1238 pub async fn set_own_member_display_name(
1243 &self,
1244 display_name: Option<String>,
1245 ) -> Result<send_state_event::v3::Response> {
1246 let user_id = self.own_user_id();
1247 let member_event =
1248 self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1249
1250 let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1251 return Err(Error::InsufficientData);
1252 };
1253
1254 let event = raw_event.deserialize()?;
1255
1256 let mut content = match event {
1257 SyncStateEvent::Original(original_event) => original_event.content,
1258 SyncStateEvent::Redacted(redacted_event) => {
1259 RoomMemberEventContent::new(redacted_event.content.membership)
1260 }
1261 };
1262
1263 content.displayname = display_name;
1264 self.send_state_event_for_key(user_id, content).await
1265 }
1266
1267 pub async fn get_state_events(
1269 &self,
1270 event_type: StateEventType,
1271 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1272 self.client
1273 .state_store()
1274 .get_state_events(self.room_id(), event_type)
1275 .await
1276 .map_err(Into::into)
1277 }
1278
1279 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1296 where
1297 C: StaticEventContent<IsPrefix = ruma::events::False>
1298 + StaticStateEventContent
1299 + RedactContent,
1300 C::Redacted: RedactedStateEventContent,
1301 {
1302 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1303 }
1304
1305 pub async fn get_state_events_for_keys(
1308 &self,
1309 event_type: StateEventType,
1310 state_keys: &[&str],
1311 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1312 self.client
1313 .state_store()
1314 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1315 .await
1316 .map_err(Into::into)
1317 }
1318
1319 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1339 &self,
1340 state_keys: I,
1341 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1342 where
1343 C: StaticEventContent<IsPrefix = ruma::events::False>
1344 + StaticStateEventContent
1345 + RedactContent,
1346 C::StateKey: Borrow<K>,
1347 C::Redacted: RedactedStateEventContent,
1348 K: AsRef<str> + Sized + Sync + 'a,
1349 I: IntoIterator<Item = &'a K> + Send,
1350 I::IntoIter: Send,
1351 {
1352 Ok(self
1353 .client
1354 .state_store()
1355 .get_state_events_for_keys_static(self.room_id(), state_keys)
1356 .await?)
1357 }
1358
1359 pub async fn get_state_event(
1361 &self,
1362 event_type: StateEventType,
1363 state_key: &str,
1364 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1365 self.client
1366 .state_store()
1367 .get_state_event(self.room_id(), event_type, state_key)
1368 .await
1369 .map_err(Into::into)
1370 }
1371
1372 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1391 where
1392 C: StaticEventContent<IsPrefix = ruma::events::False>
1393 + StaticStateEventContent<StateKey = EmptyStateKey>
1394 + RedactContent,
1395 C::Redacted: RedactedStateEventContent,
1396 {
1397 self.get_state_event_static_for_key(&EmptyStateKey).await
1398 }
1399
1400 pub async fn get_state_event_static_for_key<C, K>(
1420 &self,
1421 state_key: &K,
1422 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1423 where
1424 C: StaticEventContent<IsPrefix = ruma::events::False>
1425 + StaticStateEventContent
1426 + RedactContent,
1427 C::StateKey: Borrow<K>,
1428 C::Redacted: RedactedStateEventContent,
1429 K: AsRef<str> + ?Sized + Sync,
1430 {
1431 Ok(self
1432 .client
1433 .state_store()
1434 .get_state_event_static_for_key(self.room_id(), state_key)
1435 .await?)
1436 }
1437
1438 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1442 Ok(self
1447 .get_state_events_static::<SpaceParentEventContent>()
1448 .await?
1449 .into_iter()
1450 .filter_map(|parent_event| match parent_event.deserialize() {
1452 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1453 Some((e.state_key.to_owned(), e.sender))
1454 }
1455 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1456 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1457 Err(e) => {
1458 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1459 None
1460 }
1461 })
1462 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1464 let Some(parent_room) = self.client.get_room(&state_key) else {
1465 return Ok(ParentSpace::Unverifiable(state_key));
1468 };
1469 if let Some(child_event) = parent_room
1472 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1473 .await?
1474 {
1475 match child_event.deserialize() {
1476 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1477 return Ok(ParentSpace::Reciprocal(parent_room));
1480 }
1481 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1482 Ok(SyncOrStrippedState::Stripped(_)) => {}
1483 Err(e) => {
1484 info!(
1485 room_id = ?self.room_id(), parent_room_id = ?state_key,
1486 "Could not deserialize m.space.child: {e}"
1487 );
1488 }
1489 }
1490 }
1495
1496 let Some(member) = parent_room.get_member(&sender).await? else {
1499 return Ok(ParentSpace::Illegitimate(parent_room));
1501 };
1502
1503 if member.can_send_state(StateEventType::SpaceChild) {
1504 Ok(ParentSpace::WithPowerlevel(parent_room))
1506 } else {
1507 Ok(ParentSpace::Illegitimate(parent_room))
1508 }
1509 })
1510 .collect::<FuturesUnordered<_>>())
1511 }
1512
1513 pub async fn account_data(
1515 &self,
1516 data_type: RoomAccountDataEventType,
1517 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1518 self.client
1519 .state_store()
1520 .get_room_account_data_event(self.room_id(), data_type)
1521 .await
1522 .map_err(Into::into)
1523 }
1524
1525 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1544 where
1545 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1546 {
1547 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1548 }
1549
1550 #[cfg(feature = "e2e-encryption")]
1555 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1556 let user_ids = self
1557 .client
1558 .state_store()
1559 .get_user_ids(self.room_id(), RoomMemberships::empty())
1560 .await?;
1561
1562 for user_id in user_ids {
1563 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1564 let any_unverified = devices.devices().any(|d| !d.is_verified());
1565
1566 if any_unverified {
1567 return Ok(false);
1568 }
1569 }
1570
1571 Ok(true)
1572 }
1573
1574 pub async fn set_account_data<T>(
1589 &self,
1590 content: T,
1591 ) -> Result<set_room_account_data::v3::Response>
1592 where
1593 T: RoomAccountDataEventContent,
1594 {
1595 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1596
1597 let request = set_room_account_data::v3::Request::new(
1598 own_user.to_owned(),
1599 self.room_id().to_owned(),
1600 &content,
1601 )?;
1602
1603 Ok(self.client.send(request).await?)
1604 }
1605
1606 pub async fn set_account_data_raw(
1631 &self,
1632 event_type: RoomAccountDataEventType,
1633 content: Raw<AnyRoomAccountDataEventContent>,
1634 ) -> Result<set_room_account_data::v3::Response> {
1635 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1636
1637 let request = set_room_account_data::v3::Request::new_raw(
1638 own_user.to_owned(),
1639 self.room_id().to_owned(),
1640 event_type,
1641 content,
1642 );
1643
1644 Ok(self.client.send(request).await?)
1645 }
1646
1647 pub async fn set_tag(
1678 &self,
1679 tag: TagName,
1680 tag_info: TagInfo,
1681 ) -> Result<create_tag::v3::Response> {
1682 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1683 let request = create_tag::v3::Request::new(
1684 user_id.to_owned(),
1685 self.inner.room_id().to_owned(),
1686 tag.to_string(),
1687 tag_info,
1688 );
1689 Ok(self.client.send(request).await?)
1690 }
1691
1692 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1699 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1700 let request = delete_tag::v3::Request::new(
1701 user_id.to_owned(),
1702 self.inner.room_id().to_owned(),
1703 tag.to_string(),
1704 );
1705 Ok(self.client.send(request).await?)
1706 }
1707
1708 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1718 if is_favourite {
1719 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1720
1721 self.set_tag(TagName::Favorite, tag_info).await?;
1722
1723 if self.is_low_priority() {
1724 self.remove_tag(TagName::LowPriority).await?;
1725 }
1726 } else {
1727 self.remove_tag(TagName::Favorite).await?;
1728 }
1729 Ok(())
1730 }
1731
1732 pub async fn set_is_low_priority(
1742 &self,
1743 is_low_priority: bool,
1744 tag_order: Option<f64>,
1745 ) -> Result<()> {
1746 if is_low_priority {
1747 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1748
1749 self.set_tag(TagName::LowPriority, tag_info).await?;
1750
1751 if self.is_favourite() {
1752 self.remove_tag(TagName::Favorite).await?;
1753 }
1754 } else {
1755 self.remove_tag(TagName::LowPriority).await?;
1756 }
1757 Ok(())
1758 }
1759
1760 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1769 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1770
1771 let mut content = self
1772 .client
1773 .account()
1774 .account_data::<DirectEventContent>()
1775 .await?
1776 .map(|c| c.deserialize())
1777 .transpose()?
1778 .unwrap_or_default();
1779
1780 let this_room_id = self.inner.room_id();
1781
1782 if is_direct {
1783 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1784 room_members.retain(|member| member.user_id() != self.own_user_id());
1785
1786 for member in room_members {
1787 let entry = content.entry(member.user_id().into()).or_default();
1788 if !entry.iter().any(|room_id| room_id == this_room_id) {
1789 entry.push(this_room_id.to_owned());
1790 }
1791 }
1792 } else {
1793 for (_, list) in content.iter_mut() {
1794 list.retain(|room_id| *room_id != this_room_id);
1795 }
1796
1797 content.retain(|_, list| !list.is_empty());
1799 }
1800
1801 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1802
1803 self.client.send(request).await?;
1804 Ok(())
1805 }
1806
1807 #[cfg(feature = "e2e-encryption")]
1815 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1816 pub async fn decrypt_event(
1817 &self,
1818 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1819 push_ctx: Option<&PushContext>,
1820 ) -> Result<TimelineEvent> {
1821 let machine = self.client.olm_machine().await;
1822 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1823
1824 match machine
1825 .try_decrypt_room_event(
1826 event.cast_ref(),
1827 self.inner.room_id(),
1828 self.client.decryption_settings(),
1829 )
1830 .await?
1831 {
1832 RoomEventDecryptionResult::Decrypted(decrypted) => {
1833 let push_actions = if let Some(push_ctx) = push_ctx {
1834 Some(push_ctx.for_event(&decrypted.event).await)
1835 } else {
1836 None
1837 };
1838 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1839 }
1840 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1841 self.client
1842 .encryption()
1843 .backups()
1844 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1845 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1846 }
1847 }
1848 }
1849
1850 #[cfg(feature = "experimental-encrypted-state-events")]
1858 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1859 &self,
1860 event: &Raw<T>,
1861 push_ctx: Option<&PushContext>,
1862 ) -> Result<TimelineEvent> {
1863 let machine = self.client.olm_machine().await;
1864 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1865
1866 match machine
1867 .try_decrypt_room_event(
1868 event.cast_ref(),
1869 self.inner.room_id(),
1870 self.client.decryption_settings(),
1871 )
1872 .await?
1873 {
1874 RoomEventDecryptionResult::Decrypted(decrypted) => {
1875 let push_actions = if let Some(push_ctx) = push_ctx {
1876 Some(push_ctx.for_event(&decrypted.event).await)
1877 } else {
1878 None
1879 };
1880 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1881 }
1882 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1883 self.client
1884 .encryption()
1885 .backups()
1886 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1887 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1890 }
1891 }
1892 }
1893
1894 #[cfg(feature = "e2e-encryption")]
1907 pub async fn get_encryption_info(
1908 &self,
1909 session_id: &str,
1910 sender: &UserId,
1911 ) -> Option<Arc<EncryptionInfo>> {
1912 let machine = self.client.olm_machine().await;
1913 let machine = machine.as_ref()?;
1914 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1915 }
1916
1917 #[cfg(feature = "e2e-encryption")]
1930 pub async fn discard_room_key(&self) -> Result<()> {
1931 let machine = self.client.olm_machine().await;
1932 if let Some(machine) = machine.as_ref() {
1933 machine.discard_room_key(self.inner.room_id()).await?;
1934 Ok(())
1935 } else {
1936 Err(Error::NoOlmMachine)
1937 }
1938 }
1939
1940 #[instrument(skip_all)]
1948 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1949 let request = assign!(
1950 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1951 { reason: reason.map(ToOwned::to_owned) }
1952 );
1953 self.client.send(request).await?;
1954 Ok(())
1955 }
1956
1957 #[instrument(skip_all)]
1965 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1966 let request = assign!(
1967 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1968 { reason: reason.map(ToOwned::to_owned) }
1969 );
1970 self.client.send(request).await?;
1971 Ok(())
1972 }
1973
1974 #[instrument(skip_all)]
1983 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1984 let request = assign!(
1985 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1986 { reason: reason.map(ToOwned::to_owned) }
1987 );
1988 self.client.send(request).await?;
1989 Ok(())
1990 }
1991
1992 #[instrument(skip_all)]
1998 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1999 #[cfg(feature = "e2e-encryption")]
2000 if self.client.inner.enable_share_history_on_invite {
2001 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
2002 }
2003
2004 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
2005 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2006 self.client.send(request).await?;
2007
2008 self.mark_members_missing();
2012
2013 Ok(())
2014 }
2015
2016 #[instrument(skip_all)]
2022 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
2023 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
2024 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
2025 self.client.send(request).await?;
2026
2027 self.mark_members_missing();
2031
2032 Ok(())
2033 }
2034
2035 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
2070 self.ensure_room_joined()?;
2071
2072 let send = if let Some(typing_time) =
2075 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
2076 {
2077 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
2078 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
2082 } else {
2083 !typing
2085 }
2086 } else {
2087 typing
2090 };
2091
2092 if send {
2093 self.send_typing_notice(typing).await?;
2094 }
2095
2096 Ok(())
2097 }
2098
2099 #[instrument(name = "typing_notice", skip(self))]
2100 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
2101 let typing = if typing {
2102 self.client
2103 .inner
2104 .typing_notice_times
2105 .write()
2106 .unwrap()
2107 .insert(self.room_id().to_owned(), Instant::now());
2108 Typing::Yes(TYPING_NOTICE_TIMEOUT)
2109 } else {
2110 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
2111 Typing::No
2112 };
2113
2114 let request = create_typing_event::v3::Request::new(
2115 self.own_user_id().to_owned(),
2116 self.room_id().to_owned(),
2117 typing,
2118 );
2119
2120 self.client.send(request).await?;
2121
2122 Ok(())
2123 }
2124
2125 #[instrument(skip_all)]
2142 pub async fn send_single_receipt(
2143 &self,
2144 receipt_type: create_receipt::v3::ReceiptType,
2145 thread: ReceiptThread,
2146 event_id: OwnedEventId,
2147 ) -> Result<()> {
2148 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2151
2152 self.client
2153 .inner
2154 .locks
2155 .read_receipt_deduplicated_handler
2156 .run((request_key, event_id.clone()), async {
2157 let is_unthreaded = thread == ReceiptThread::Unthreaded;
2159
2160 let mut request = create_receipt::v3::Request::new(
2161 self.room_id().to_owned(),
2162 receipt_type,
2163 event_id,
2164 );
2165 request.thread = thread;
2166
2167 self.client.send(request).await?;
2168
2169 if is_unthreaded {
2170 self.set_unread_flag(false).await?;
2171 }
2172
2173 Ok(())
2174 })
2175 .await
2176 }
2177
2178 #[instrument(skip_all)]
2188 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2189 if receipts.is_empty() {
2190 return Ok(());
2191 }
2192
2193 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2194 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2195 fully_read,
2196 read_receipt: public_read_receipt,
2197 private_read_receipt,
2198 });
2199
2200 self.client.send(request).await?;
2201
2202 self.set_unread_flag(false).await?;
2203
2204 Ok(())
2205 }
2206
2207 #[allow(unused_variables, unused_mut)]
2211 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2212 use ruma::{
2213 EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2214 };
2215 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2216
2217 if !self.latest_encryption_state().await?.is_encrypted() {
2218 let mut content =
2219 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2220 #[cfg(feature = "experimental-encrypted-state-events")]
2221 if encrypted_state_events {
2222 content = content.with_encrypted_state();
2223 }
2224 self.send_state_event(content).await?;
2225
2226 let res = timeout(
2233 async {
2234 loop {
2235 self.client.inner.sync_beat.listen().await;
2237 let _state_store_lock =
2238 self.client.base_client().state_store_lock().lock().await;
2239
2240 if !self.inner.encryption_state().is_unknown() {
2241 break;
2242 }
2243 }
2244 },
2245 SYNC_WAIT_TIME,
2246 )
2247 .await;
2248
2249 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2250
2251 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2253 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2254 debug!("room successfully marked as encrypted");
2255 return Ok(());
2256 }
2257
2258 #[cfg(feature = "experimental-encrypted-state-events")]
2260 if res.is_ok() && {
2261 if encrypted_state_events {
2262 self.inner.encryption_state().is_state_encrypted()
2263 } else {
2264 self.inner.encryption_state().is_encrypted()
2265 }
2266 } {
2267 debug!("room successfully marked as encrypted");
2268 return Ok(());
2269 }
2270
2271 debug!("still not marked as encrypted, marking encryption state as missing");
2276
2277 let mut room_info = self.clone_info();
2278 room_info.mark_encryption_state_missing();
2279 let mut changes = StateChanges::default();
2280 changes.add_room(room_info.clone());
2281
2282 self.client.state_store().save_changes(&changes).await?;
2283 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2284 }
2285
2286 Ok(())
2287 }
2288
2289 #[instrument(skip_all)]
2321 pub async fn enable_encryption(&self) -> Result<()> {
2322 self.enable_encryption_inner(false).await
2323 }
2324
2325 #[instrument(skip_all)]
2358 #[cfg(feature = "experimental-encrypted-state-events")]
2359 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2360 self.enable_encryption_inner(true).await
2361 }
2362
2363 #[cfg(feature = "e2e-encryption")]
2372 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2373 async fn preshare_room_key(&self) -> Result<()> {
2374 self.ensure_room_joined()?;
2375
2376 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2378 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2379
2380 self.client
2381 .locks()
2382 .group_session_deduplicated_handler
2383 .run(self.room_id().to_owned(), async move {
2384 {
2385 let members = self
2386 .client
2387 .state_store()
2388 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2389 .await?;
2390 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2391 };
2392
2393 let response = self.share_room_key().await;
2394
2395 if let Err(r) = response {
2399 let machine = self.client.olm_machine().await;
2400 if let Some(machine) = machine.as_ref() {
2401 machine.discard_room_key(self.room_id()).await?;
2402 }
2403 return Err(r);
2404 }
2405
2406 Ok(())
2407 })
2408 .await
2409 }
2410
2411 #[cfg(feature = "e2e-encryption")]
2417 #[instrument(skip_all)]
2418 async fn share_room_key(&self) -> Result<()> {
2419 self.ensure_room_joined()?;
2420
2421 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2422
2423 for request in requests {
2424 let response = self.client.send_to_device(&request).await?;
2425 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2426 }
2427
2428 Ok(())
2429 }
2430
2431 #[instrument(skip_all)]
2440 pub async fn sync_up(&self) {
2441 while !self.is_synced() && self.state() == RoomState::Joined {
2442 let wait_for_beat = self.client.inner.sync_beat.listen();
2443 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2445 }
2446 }
2447
2448 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2519 SendMessageLikeEvent::new(self, content)
2520 }
2521
2522 #[cfg(feature = "e2e-encryption")]
2525 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2526 let olm = self.client.olm_machine().await;
2527 let olm = olm.as_ref().expect("Olm machine wasn't started");
2528
2529 let members =
2530 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2531
2532 let tracked: HashMap<_, _> = olm
2533 .store()
2534 .load_tracked_users()
2535 .await?
2536 .into_iter()
2537 .map(|tracked| (tracked.user_id, tracked.dirty))
2538 .collect();
2539
2540 let members_with_unknown_devices =
2543 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2544
2545 let (req_id, request) =
2546 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2547
2548 if !request.device_keys.is_empty() {
2549 self.client.keys_query(&req_id, request.device_keys).await?;
2550 }
2551
2552 Ok(())
2553 }
2554
2555 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2599 pub fn send_raw<'a>(
2600 &'a self,
2601 event_type: &'a str,
2602 content: impl IntoRawMessageLikeEventContent,
2603 ) -> SendRawMessageLikeEvent<'a> {
2604 SendRawMessageLikeEvent::new(self, event_type, content)
2607 }
2608
2609 #[instrument(skip_all)]
2657 pub fn send_attachment<'a>(
2658 &'a self,
2659 filename: impl Into<String>,
2660 content_type: &'a Mime,
2661 data: Vec<u8>,
2662 config: AttachmentConfig,
2663 ) -> SendAttachment<'a> {
2664 SendAttachment::new(self, filename.into(), content_type, data, config)
2665 }
2666
2667 #[instrument(skip_all)]
2695 pub(super) async fn prepare_and_send_attachment<'a>(
2696 &'a self,
2697 filename: String,
2698 content_type: &'a Mime,
2699 data: Vec<u8>,
2700 mut config: AttachmentConfig,
2701 send_progress: SharedObservable<TransmissionProgress>,
2702 store_in_cache: bool,
2703 ) -> Result<send_message_event::v3::Response> {
2704 self.ensure_room_joined()?;
2705
2706 let txn_id = config.txn_id.take();
2707 let mentions = config.mentions.take();
2708
2709 let thumbnail = config.thumbnail.take();
2710
2711 let thumbnail_cache_info = if store_in_cache {
2713 thumbnail
2714 .as_ref()
2715 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2716 } else {
2717 None
2718 };
2719
2720 #[cfg(feature = "e2e-encryption")]
2721 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2722 self.client
2723 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2724 .await?
2725 } else {
2726 self.client
2727 .media()
2728 .upload_plain_media_and_thumbnail(
2729 content_type,
2730 data.clone(),
2733 thumbnail,
2734 send_progress,
2735 )
2736 .await?
2737 };
2738
2739 #[cfg(not(feature = "e2e-encryption"))]
2740 let (media_source, thumbnail) = self
2741 .client
2742 .media()
2743 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2744 .await?;
2745
2746 if store_in_cache {
2747 let media_store_lock_guard = self.client.media_store().lock().await?;
2748
2749 debug!("caching the media");
2753 let request =
2754 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2755
2756 if let Err(err) = media_store_lock_guard
2757 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2758 .await
2759 {
2760 warn!("unable to cache the media after uploading it: {err}");
2761 }
2762
2763 if let Some(((data, height, width), source)) =
2764 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2765 {
2766 debug!("caching the thumbnail");
2767
2768 let request = MediaRequestParameters {
2769 source: source.clone(),
2770 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2771 };
2772
2773 if let Err(err) = media_store_lock_guard
2774 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2775 .await
2776 {
2777 warn!("unable to cache the media after uploading it: {err}");
2778 }
2779 }
2780 }
2781
2782 let content = self
2783 .make_media_event(
2784 Room::make_attachment_type(
2785 content_type,
2786 filename,
2787 media_source,
2788 config.caption,
2789 config.info,
2790 thumbnail,
2791 ),
2792 mentions,
2793 config.reply,
2794 )
2795 .await?;
2796
2797 let mut fut = self.send(content);
2798 if let Some(txn_id) = txn_id {
2799 fut = fut.with_transaction_id(txn_id);
2800 }
2801
2802 fut.await.map(|result| result.response)
2803 }
2804
2805 #[allow(clippy::too_many_arguments)]
2808 pub(crate) fn make_attachment_type(
2809 content_type: &Mime,
2810 filename: String,
2811 source: MediaSource,
2812 caption: Option<TextMessageEventContent>,
2813 info: Option<AttachmentInfo>,
2814 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2815 ) -> MessageType {
2816 make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2817 }
2818
2819 pub(crate) async fn make_media_event(
2822 &self,
2823 msg_type: MessageType,
2824 mentions: Option<Mentions>,
2825 reply: Option<Reply>,
2826 ) -> Result<RoomMessageEventContent> {
2827 let mut content = RoomMessageEventContent::new(msg_type);
2828 if let Some(mentions) = mentions {
2829 content = content.add_mentions(mentions);
2830 }
2831 if let Some(reply) = reply {
2832 content = self.make_reply_event(content.into(), reply).await?;
2835 }
2836 Ok(content)
2837 }
2838
2839 #[cfg(feature = "unstable-msc4274")]
2842 #[allow(clippy::too_many_arguments)]
2843 pub(crate) fn make_gallery_item_type(
2844 content_type: &Mime,
2845 filename: String,
2846 source: MediaSource,
2847 caption: Option<TextMessageEventContent>,
2848 info: Option<AttachmentInfo>,
2849 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2850 ) -> GalleryItemType {
2851 make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2852 }
2853
2854 pub async fn update_power_levels(
2863 &self,
2864 updates: Vec<(&UserId, Int)>,
2865 ) -> Result<send_state_event::v3::Response> {
2866 let mut power_levels = self.power_levels().await?;
2867
2868 for (user_id, new_level) in updates {
2869 if new_level == power_levels.users_default {
2870 power_levels.users.remove(user_id);
2871 } else {
2872 power_levels.users.insert(user_id.to_owned(), new_level);
2873 }
2874 }
2875
2876 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2877 }
2878
2879 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2884 let mut power_levels = self.power_levels().await?;
2885 power_levels.apply(changes)?;
2886 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2887 Ok(())
2888 }
2889
2890 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2894 let creators = self.creators().unwrap_or_default();
2895 let rules = self.clone_info().room_version_rules_or_default();
2896
2897 let default_power_levels =
2898 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2899 let changes = RoomPowerLevelChanges::from(default_power_levels);
2900 self.apply_power_level_changes(changes).await?;
2901 Ok(self.power_levels().await?)
2902 }
2903
2904 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2909 let power_level = self.get_user_power_level(user_id).await?;
2910 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2911 }
2912
2913 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2918 let event = self.power_levels().await?;
2919 Ok(event.for_user(user_id))
2920 }
2921
2922 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2925 let power_levels = self.power_levels().await.ok();
2926 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2927 if let Some(power_levels) = power_levels {
2928 for (id, level) in power_levels.users.into_iter() {
2929 user_power_levels.insert(id, level.into());
2930 }
2931 }
2932 user_power_levels
2933 }
2934
2935 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2937 self.send_state_event(RoomNameEventContent::new(name)).await
2938 }
2939
2940 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2942 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2943 }
2944
2945 pub async fn set_avatar_url(
2951 &self,
2952 url: &MxcUri,
2953 info: Option<avatar::ImageInfo>,
2954 ) -> Result<send_state_event::v3::Response> {
2955 self.ensure_room_joined()?;
2956
2957 let mut room_avatar_event = RoomAvatarEventContent::new();
2958 room_avatar_event.url = Some(url.to_owned());
2959 room_avatar_event.info = info.map(Box::new);
2960
2961 self.send_state_event(room_avatar_event).await
2962 }
2963
2964 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2966 self.send_state_event(RoomAvatarEventContent::new()).await
2967 }
2968
2969 pub async fn upload_avatar(
2977 &self,
2978 mime: &Mime,
2979 data: Vec<u8>,
2980 info: Option<avatar::ImageInfo>,
2981 ) -> Result<send_state_event::v3::Response> {
2982 self.ensure_room_joined()?;
2983
2984 let upload_response = self.client.media().upload(mime, data, None).await?;
2985 let mut info = info.unwrap_or_default();
2986 info.blurhash = upload_response.blurhash;
2987 info.mimetype = Some(mime.to_string());
2988
2989 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2990 }
2991
2992 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3036 #[instrument(skip_all)]
3037 pub async fn send_state_event(
3038 &self,
3039 content: impl StateEventContent<StateKey = EmptyStateKey>,
3040 ) -> Result<send_state_event::v3::Response> {
3041 self.send_state_event_for_key(&EmptyStateKey, content).await
3042 }
3043
3044 #[cfg(feature = "experimental-encrypted-state-events")]
3095 #[instrument(skip_all)]
3096 pub fn send_state_event<'a>(
3097 &'a self,
3098 content: impl StateEventContent<StateKey = EmptyStateKey>,
3099 ) -> SendStateEvent<'a> {
3100 self.send_state_event_for_key(&EmptyStateKey, content)
3101 }
3102
3103 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3144 pub async fn send_state_event_for_key<C, K>(
3145 &self,
3146 state_key: &K,
3147 content: C,
3148 ) -> Result<send_state_event::v3::Response>
3149 where
3150 C: StateEventContent,
3151 C::StateKey: Borrow<K>,
3152 K: AsRef<str> + ?Sized,
3153 {
3154 self.ensure_room_joined()?;
3155 let request =
3156 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3157 let response = self.client.send(request).await?;
3158 Ok(response)
3159 }
3160
3161 #[cfg(feature = "experimental-encrypted-state-events")]
3210 pub fn send_state_event_for_key<'a, C, K>(
3211 &'a self,
3212 state_key: &K,
3213 content: C,
3214 ) -> SendStateEvent<'a>
3215 where
3216 C: StateEventContent,
3217 C::StateKey: Borrow<K>,
3218 K: AsRef<str> + ?Sized,
3219 {
3220 SendStateEvent::new(self, state_key, content)
3221 }
3222
3223 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3258 #[instrument(skip_all)]
3259 pub async fn send_state_event_raw(
3260 &self,
3261 event_type: &str,
3262 state_key: &str,
3263 content: impl IntoRawStateEventContent,
3264 ) -> Result<send_state_event::v3::Response> {
3265 self.ensure_room_joined()?;
3266
3267 let request = send_state_event::v3::Request::new_raw(
3268 self.room_id().to_owned(),
3269 event_type.into(),
3270 state_key.to_owned(),
3271 content.into_raw_state_event_content(),
3272 );
3273
3274 Ok(self.client.send(request).await?)
3275 }
3276
3277 #[cfg(feature = "experimental-encrypted-state-events")]
3319 #[instrument(skip_all)]
3320 pub fn send_state_event_raw<'a>(
3321 &'a self,
3322 event_type: &'a str,
3323 state_key: &'a str,
3324 content: impl IntoRawStateEventContent,
3325 ) -> SendRawStateEvent<'a> {
3326 SendRawStateEvent::new(self, event_type, state_key, content)
3327 }
3328
3329 #[instrument(skip_all)]
3364 pub async fn redact(
3365 &self,
3366 event_id: &EventId,
3367 reason: Option<&str>,
3368 txn_id: Option<OwnedTransactionId>,
3369 ) -> HttpResult<redact_event::v3::Response> {
3370 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3371 let request = assign!(
3372 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3373 { reason: reason.map(ToOwned::to_owned) }
3374 );
3375
3376 self.client.send(request).await
3377 }
3378
3379 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3388 let acl_ev = self
3389 .get_state_event_static::<RoomServerAclEventContent>()
3390 .await?
3391 .and_then(|ev| ev.deserialize().ok());
3392 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3393 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3394 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3395 });
3396
3397 let members: Vec<_> = self
3401 .members_no_sync(RoomMemberships::JOIN)
3402 .await?
3403 .into_iter()
3404 .filter(|member| {
3405 let server = member.user_id().server_name();
3406 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3407 })
3408 .collect();
3409
3410 let max = members
3413 .iter()
3414 .max_by_key(|member| member.power_level())
3415 .filter(|max| max.power_level() >= int!(50))
3416 .map(|member| member.user_id().server_name());
3417
3418 let servers = members
3420 .iter()
3421 .map(|member| member.user_id().server_name())
3422 .filter(|server| max.filter(|max| max == server).is_none())
3423 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3424 *servers.entry(server).or_default() += 1;
3425 servers
3426 });
3427 let mut servers: Vec<_> = servers.into_iter().collect();
3428 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3429
3430 Ok(max
3431 .into_iter()
3432 .chain(servers.into_iter().map(|(name, _)| name))
3433 .take(3)
3434 .map(ToOwned::to_owned)
3435 .collect())
3436 }
3437
3438 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3445 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3446 return Ok(alias.matrix_to_uri());
3447 }
3448
3449 let via = self.route().await?;
3450 Ok(self.room_id().matrix_to_uri_via(via))
3451 }
3452
3453 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3464 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3465 return Ok(alias.matrix_uri(join));
3466 }
3467
3468 let via = self.route().await?;
3469 Ok(self.room_id().matrix_uri_via(via, join))
3470 }
3471
3472 pub async fn matrix_to_event_permalink(
3486 &self,
3487 event_id: impl Into<OwnedEventId>,
3488 ) -> Result<MatrixToUri> {
3489 let via = self.route().await?;
3492 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3493 }
3494
3495 pub async fn matrix_event_permalink(
3509 &self,
3510 event_id: impl Into<OwnedEventId>,
3511 ) -> Result<MatrixUri> {
3512 let via = self.route().await?;
3515 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3516 }
3517
3518 pub async fn load_user_receipt(
3531 &self,
3532 receipt_type: ReceiptType,
3533 thread: ReceiptThread,
3534 user_id: &UserId,
3535 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3536 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3537 }
3538
3539 pub async fn load_event_receipts(
3552 &self,
3553 receipt_type: ReceiptType,
3554 thread: ReceiptThread,
3555 event_id: &EventId,
3556 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3557 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3558 }
3559
3560 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3565 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3566 }
3567
3568 pub(crate) async fn push_condition_room_ctx_internal(
3575 &self,
3576 with_threads_subscriptions: bool,
3577 ) -> Result<Option<PushConditionRoomCtx>> {
3578 let room_id = self.room_id();
3579 let user_id = self.own_user_id();
3580 let room_info = self.clone_info();
3581 let member_count = room_info.active_members_count();
3582
3583 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3584 member.name().to_owned()
3585 } else {
3586 return Ok(None);
3587 };
3588
3589 let power_levels = match self.power_levels().await {
3590 Ok(power_levels) => Some(power_levels.into()),
3591 Err(error) => {
3592 if matches!(room_info.state(), RoomState::Joined) {
3593 error!("Could not compute power levels for push conditions: {error}");
3596 }
3597 None
3598 }
3599 };
3600
3601 let mut ctx = assign!(PushConditionRoomCtx::new(
3602 room_id.to_owned(),
3603 UInt::new(member_count).unwrap_or(UInt::MAX),
3604 user_id.to_owned(),
3605 user_display_name,
3606 ),
3607 {
3608 power_levels,
3609 });
3610
3611 if with_threads_subscriptions {
3612 let this = self.clone();
3613 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3614 let room = this.clone();
3615 Box::pin(async move {
3616 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3617 maybe_sub.is_some()
3618 } else {
3619 false
3620 }
3621 })
3622 });
3623 }
3624
3625 Ok(Some(ctx))
3626 }
3627
3628 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3631 self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3632 }
3633
3634 #[instrument(skip(self))]
3638 pub(crate) async fn push_context_internal(
3639 &self,
3640 with_threads_subscriptions: bool,
3641 ) -> Result<Option<PushContext>> {
3642 let Some(push_condition_room_ctx) =
3643 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3644 else {
3645 debug!("Could not aggregate push context");
3646 return Ok(None);
3647 };
3648 let push_rules = self.client().account().push_rules().await?;
3649 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3650 }
3651
3652 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3657 if let Some(ctx) = self.push_context().await? {
3658 Ok(Some(ctx.for_event(event).await))
3659 } else {
3660 Ok(None)
3661 }
3662 }
3663
3664 pub async fn invite_details(&self) -> Result<Invite> {
3667 let state = self.state();
3668
3669 if state != RoomState::Invited {
3670 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3671 }
3672
3673 let invitee = self
3674 .get_member_no_sync(self.own_user_id())
3675 .await?
3676 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3677 let event = invitee.event();
3678
3679 let inviter_id = event.sender().to_owned();
3680 let inviter = self.get_member_no_sync(&inviter_id).await?;
3681
3682 Ok(Invite { invitee, inviter_id, inviter })
3683 }
3684
3685 pub async fn member_with_sender_info(
3693 &self,
3694 user_id: &UserId,
3695 ) -> Result<RoomMemberWithSenderInfo> {
3696 let Some(member) = self.get_member_no_sync(user_id).await? else {
3697 return Err(Error::InsufficientData);
3698 };
3699
3700 let sender_member =
3701 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3702 Some(member)
3704 } else if self.are_members_synced() {
3705 None
3707 } else if self.sync_members().await.is_ok() {
3708 self.get_member_no_sync(member.event().sender()).await?
3710 } else {
3711 None
3712 };
3713
3714 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3715 }
3716
3717 pub async fn forget(&self) -> Result<()> {
3723 let state = self.state();
3724 match state {
3725 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3726 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3727 "Left / Banned",
3728 state,
3729 ))));
3730 }
3731 RoomState::Left | RoomState::Banned => {}
3732 }
3733
3734 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3735 let _response = self.client.send(request).await?;
3736
3737 if self.inner.direct_targets_length() != 0
3739 && let Err(e) = self.set_is_direct(false).await
3740 {
3741 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3744 }
3745
3746 self.client.base_client().forget_room(self.inner.room_id()).await?;
3747
3748 Ok(())
3749 }
3750
3751 fn ensure_room_joined(&self) -> Result<()> {
3752 let state = self.state();
3753 if state == RoomState::Joined {
3754 Ok(())
3755 } else {
3756 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3757 }
3758 }
3759
3760 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3762 if !matches!(self.state(), RoomState::Joined) {
3763 return None;
3764 }
3765
3766 let notification_settings = self.client().notification_settings().await;
3767
3768 let notification_mode =
3770 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3771
3772 if notification_mode.is_some() {
3773 notification_mode
3774 } else if let Ok(is_encrypted) =
3775 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3776 {
3777 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3782 let default_mode = notification_settings
3783 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3784 .await;
3785 Some(default_mode)
3786 } else {
3787 None
3788 }
3789 }
3790
3791 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3802 if !matches!(self.state(), RoomState::Joined) {
3803 return None;
3804 }
3805
3806 let notification_settings = self.client().notification_settings().await;
3807
3808 let mode =
3810 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3811
3812 if let Some(mode) = mode {
3813 self.update_cached_user_defined_notification_mode(mode);
3814 }
3815
3816 mode
3817 }
3818
3819 pub async fn report_content(
3832 &self,
3833 event_id: OwnedEventId,
3834 score: Option<ReportedContentScore>,
3835 reason: Option<String>,
3836 ) -> Result<report_content::v3::Response> {
3837 let state = self.state();
3838 if state != RoomState::Joined {
3839 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3840 }
3841
3842 let request = report_content::v3::Request::new(
3843 self.inner.room_id().to_owned(),
3844 event_id,
3845 score.map(Into::into),
3846 reason,
3847 );
3848 Ok(self.client.send(request).await?)
3849 }
3850
3851 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3862 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3863
3864 Ok(self.client.send(request).await?)
3865 }
3866
3867 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3873 if self.is_marked_unread() == unread {
3874 return Ok(());
3876 }
3877
3878 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3879
3880 let content = MarkedUnreadEventContent::new(unread);
3881
3882 let request = set_room_account_data::v3::Request::new(
3883 user_id.to_owned(),
3884 self.inner.room_id().to_owned(),
3885 &content,
3886 )?;
3887
3888 self.client.send(request).await?;
3889 Ok(())
3890 }
3891
3892 pub async fn event_cache(
3895 &self,
3896 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3897 self.client.event_cache().for_room(self.room_id()).await
3898 }
3899
3900 pub(crate) async fn get_user_beacon_info(
3907 &self,
3908 user_id: &UserId,
3909 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3910 let raw_event = self
3911 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3912 .await?
3913 .ok_or(BeaconError::NotFound)?;
3914
3915 match raw_event.deserialize()? {
3916 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3917 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3918 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3919 }
3920 }
3921
3922 pub async fn start_live_location_share(
3935 &self,
3936 duration_millis: u64,
3937 description: Option<String>,
3938 ) -> Result<send_state_event::v3::Response> {
3939 self.ensure_room_joined()?;
3940
3941 self.send_state_event_for_key(
3942 self.own_user_id(),
3943 BeaconInfoEventContent::new(
3944 description,
3945 Duration::from_millis(duration_millis),
3946 true,
3947 None,
3948 ),
3949 )
3950 .await
3951 }
3952
3953 pub async fn stop_live_location_share(
3960 &self,
3961 ) -> Result<send_state_event::v3::Response, BeaconError> {
3962 self.ensure_room_joined()?;
3963
3964 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3965 beacon_info_event.content.stop();
3966 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3967 }
3968
3969 pub async fn send_location_beacon(
3981 &self,
3982 geo_uri: String,
3983 ) -> Result<send_message_event::v3::Response, BeaconError> {
3984 self.ensure_room_joined()?;
3985
3986 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3987
3988 if beacon_info_event.content.is_live() {
3989 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3990 Ok(self.send(content).await?.response)
3991 } else {
3992 Err(BeaconError::NotLive)
3993 }
3994 }
3995
3996 pub async fn save_composer_draft(
3999 &self,
4000 draft: ComposerDraft,
4001 thread_root: Option<&EventId>,
4002 ) -> Result<()> {
4003 self.client
4004 .state_store()
4005 .set_kv_data(
4006 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
4007 StateStoreDataValue::ComposerDraft(draft),
4008 )
4009 .await?;
4010 Ok(())
4011 }
4012
4013 pub async fn load_composer_draft(
4016 &self,
4017 thread_root: Option<&EventId>,
4018 ) -> Result<Option<ComposerDraft>> {
4019 let data = self
4020 .client
4021 .state_store()
4022 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4023 .await?;
4024 Ok(data.and_then(|d| d.into_composer_draft()))
4025 }
4026
4027 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
4030 self.client
4031 .state_store()
4032 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
4033 .await?;
4034 Ok(())
4035 }
4036
4037 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
4040 let response = self
4041 .client
4042 .send(get_state_event_for_key::v3::Request::new(
4043 self.room_id().to_owned(),
4044 StateEventType::RoomPinnedEvents,
4045 "".to_owned(),
4046 ))
4047 .await;
4048
4049 match response {
4050 Ok(response) => Ok(Some(
4051 response
4052 .into_content()
4053 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
4054 .pinned,
4055 )),
4056 Err(http_error) => match http_error.as_client_api_error() {
4057 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
4058 _ => Err(http_error.into()),
4059 },
4060 }
4061 }
4062
4063 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
4071 ObservableLiveLocation::new(&self.client, self.room_id())
4072 }
4073
4074 pub async fn subscribe_to_knock_requests(
4088 &self,
4089 ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
4090 let this = Arc::new(self.clone());
4091
4092 let room_member_events_observer =
4093 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
4094
4095 let current_seen_ids = self.get_seen_knock_request_ids().await?;
4096 let mut seen_request_ids_stream = self
4097 .seen_knock_request_ids_map
4098 .subscribe()
4099 .await
4100 .map(|values| values.unwrap_or_default());
4101
4102 let mut room_info_stream = self.subscribe_info();
4103
4104 let clear_seen_ids_handle = spawn({
4107 let this = self.clone();
4108 async move {
4109 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
4110 while member_updates_stream.recv().await.is_ok() {
4111 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
4113 warn!("Failed to remove seen knock requests: {err}")
4114 }
4115 }
4116 }
4117 });
4118
4119 let combined_stream = stream! {
4120 match this.get_current_join_requests(¤t_seen_ids).await {
4122 Ok(initial_requests) => yield initial_requests,
4123 Err(err) => warn!("Failed to get initial requests to join: {err}")
4124 }
4125
4126 let mut requests_stream = room_member_events_observer.subscribe();
4127 let mut seen_ids = current_seen_ids.clone();
4128
4129 loop {
4130 tokio::select! {
4133 Some((event, _)) = requests_stream.next() => {
4134 if let Some(event) = event.as_original() {
4135 let emit = if event.prev_content().is_some() {
4137 matches!(event.membership_change(),
4138 MembershipChange::Banned |
4139 MembershipChange::Knocked |
4140 MembershipChange::KnockAccepted |
4141 MembershipChange::KnockDenied |
4142 MembershipChange::KnockRetracted
4143 )
4144 } else {
4145 true
4148 };
4149
4150 if emit {
4151 match this.get_current_join_requests(&seen_ids).await {
4152 Ok(requests) => yield requests,
4153 Err(err) => {
4154 warn!("Failed to get updated knock requests on new member event: {err}")
4155 }
4156 }
4157 }
4158 }
4159 }
4160
4161 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4162 seen_ids = new_seen_ids;
4164
4165 match this.get_current_join_requests(&seen_ids).await {
4168 Ok(requests) => yield requests,
4169 Err(err) => {
4170 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4171 }
4172 }
4173 }
4174
4175 Some(room_info) = room_info_stream.next() => {
4176 if !room_info.are_members_synced() {
4179 match this.get_current_join_requests(&seen_ids).await {
4180 Ok(requests) => yield requests,
4181 Err(err) => {
4182 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4183 }
4184 }
4185 }
4186 }
4187 else => break,
4189 }
4190 }
4191 };
4192
4193 Ok((combined_stream, clear_seen_ids_handle))
4194 }
4195
4196 async fn get_current_join_requests(
4197 &self,
4198 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4199 ) -> Result<Vec<KnockRequest>> {
4200 Ok(self
4201 .members(RoomMemberships::KNOCK)
4202 .await?
4203 .into_iter()
4204 .filter_map(|member| {
4205 let event_id = member.event().event_id()?;
4206 Some(KnockRequest::new(
4207 self,
4208 event_id,
4209 member.event().timestamp(),
4210 KnockRequestMemberInfo::from_member(&member),
4211 seen_request_ids.contains_key(event_id),
4212 ))
4213 })
4214 .collect())
4215 }
4216
4217 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4219 RoomPrivacySettings::new(&self.inner, &self.client)
4220 }
4221
4222 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4230 let request = opts.into_request(self.room_id());
4231
4232 let response = self.client.send(request).await?;
4233
4234 let push_ctx = self.push_context().await?;
4235 let chunk = join_all(
4236 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4237 )
4238 .await;
4239
4240 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4241 }
4242
4243 pub async fn relations(
4257 &self,
4258 event_id: OwnedEventId,
4259 opts: RelationsOptions,
4260 ) -> Result<Relations> {
4261 let relations = opts.send(self, event_id).await;
4262
4263 if let Ok(Relations { chunk, .. }) = &relations
4265 && let Ok((cache, _handles)) = self.event_cache().await
4266 {
4267 cache.save_events(chunk.clone()).await;
4268 }
4269
4270 relations
4271 }
4272
4273 #[cfg(feature = "experimental-search")]
4276 pub async fn search(
4277 &self,
4278 query: &str,
4279 max_number_of_results: usize,
4280 pagination_offset: Option<usize>,
4281 ) -> Result<Vec<OwnedEventId>, IndexError> {
4282 let mut search_index_guard = self.client.search_index().lock().await;
4283 search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4284 }
4285
4286 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4308 pub async fn subscribe_thread(
4309 &self,
4310 thread_root: OwnedEventId,
4311 automatic: Option<OwnedEventId>,
4312 ) -> Result<()> {
4313 let is_automatic = automatic.is_some();
4314
4315 match self
4316 .client
4317 .send(subscribe_thread::unstable::Request::new(
4318 self.room_id().to_owned(),
4319 thread_root.clone(),
4320 automatic,
4321 ))
4322 .await
4323 {
4324 Ok(_response) => {
4325 trace!("Server acknowledged the thread subscription; saving in db");
4326
4327 self.client
4329 .state_store()
4330 .upsert_thread_subscriptions(vec![(
4331 self.room_id(),
4332 &thread_root,
4333 StoredThreadSubscription {
4334 status: ThreadSubscriptionStatus::Subscribed {
4335 automatic: is_automatic,
4336 },
4337 bump_stamp: None,
4338 },
4339 )])
4340 .await?;
4341
4342 Ok(())
4343 }
4344
4345 Err(err) => {
4346 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4347 trace!("Thread subscription skipped: {err}");
4352 Ok(())
4353 } else {
4354 Err(err.into())
4356 }
4357 }
4358 }
4359 }
4360
4361 pub async fn subscribe_thread_if_needed(
4367 &self,
4368 thread_root: &EventId,
4369 automatic: Option<OwnedEventId>,
4370 ) -> Result<()> {
4371 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4372 if !prev_sub.automatic || automatic.is_some() {
4375 return Ok(());
4378 }
4379 }
4380 self.subscribe_thread(thread_root.to_owned(), automatic).await
4381 }
4382
4383 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4395 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4396 self.client
4397 .send(unsubscribe_thread::unstable::Request::new(
4398 self.room_id().to_owned(),
4399 thread_root.clone(),
4400 ))
4401 .await?;
4402
4403 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4404
4405 self.client
4407 .state_store()
4408 .upsert_thread_subscriptions(vec![(
4409 self.room_id(),
4410 &thread_root,
4411 StoredThreadSubscription {
4412 status: ThreadSubscriptionStatus::Unsubscribed,
4413 bump_stamp: None,
4414 },
4415 )])
4416 .await?;
4417
4418 Ok(())
4419 }
4420
4421 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4438 pub async fn fetch_thread_subscription(
4439 &self,
4440 thread_root: OwnedEventId,
4441 ) -> Result<Option<ThreadSubscription>> {
4442 let result = self
4443 .client
4444 .send(get_thread_subscription::unstable::Request::new(
4445 self.room_id().to_owned(),
4446 thread_root.clone(),
4447 ))
4448 .await;
4449
4450 let subscription = match result {
4451 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4452 Err(http_error) => match http_error.as_client_api_error() {
4453 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4454 _ => return Err(http_error.into()),
4455 },
4456 };
4457
4458 if let Some(sub) = &subscription {
4460 self.client
4461 .state_store()
4462 .upsert_thread_subscriptions(vec![(
4463 self.room_id(),
4464 &thread_root,
4465 StoredThreadSubscription {
4466 status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4467 bump_stamp: None,
4468 },
4469 )])
4470 .await?;
4471 } else {
4472 self.client
4474 .state_store()
4475 .remove_thread_subscription(self.room_id(), &thread_root)
4476 .await?;
4477 }
4478
4479 Ok(subscription)
4480 }
4481
4482 pub async fn load_or_fetch_thread_subscription(
4489 &self,
4490 thread_root: &EventId,
4491 ) -> Result<Option<ThreadSubscription>> {
4492 if self.client.thread_subscription_catchup().is_outdated() {
4494 return self.fetch_thread_subscription(thread_root.to_owned()).await;
4495 }
4496
4497 Ok(self
4499 .client
4500 .state_store()
4501 .load_thread_subscription(self.room_id(), thread_root)
4502 .await
4503 .map(|maybe_sub| {
4504 maybe_sub.and_then(|stored| match stored.status {
4505 ThreadSubscriptionStatus::Unsubscribed => None,
4506 ThreadSubscriptionStatus::Subscribed { automatic } => {
4507 Some(ThreadSubscription { automatic })
4508 }
4509 })
4510 })?)
4511 }
4512
4513 pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
4523 let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4524 event_ids
4525 } else {
4526 self.load_pinned_events().await?.unwrap_or_default()
4527 };
4528 let event_id = event_id.to_owned();
4529 if pinned_event_ids.contains(&event_id) {
4530 Ok(false)
4531 } else {
4532 pinned_event_ids.push(event_id);
4533 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4534 self.send_state_event(content).await?;
4535 Ok(true)
4536 }
4537 }
4538
4539 pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
4549 let mut pinned_event_ids = if let Some(event_ids) = self.pinned_event_ids() {
4550 event_ids
4551 } else {
4552 self.load_pinned_events().await?.unwrap_or_default()
4553 };
4554 let event_id = event_id.to_owned();
4555 if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
4556 pinned_event_ids.remove(idx);
4557 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
4558 self.send_state_event(content).await?;
4559 Ok(true)
4560 } else {
4561 Ok(false)
4562 }
4563 }
4564}
4565
4566#[cfg(feature = "e2e-encryption")]
4567impl RoomIdentityProvider for Room {
4568 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4569 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4570 }
4571
4572 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4573 Box::pin(async {
4574 let members = self
4575 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4576 .await
4577 .unwrap_or_else(|_| Default::default());
4578
4579 let mut ret: Vec<UserIdentity> = Vec::new();
4580 for member in members {
4581 if let Some(i) = self.user_identity(member.user_id()).await {
4582 ret.push(i);
4583 }
4584 }
4585 ret
4586 })
4587 }
4588
4589 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4590 Box::pin(async {
4591 self.client
4592 .encryption()
4593 .get_user_identity(user_id)
4594 .await
4595 .unwrap_or(None)
4596 .map(|u| u.underlying_identity())
4597 })
4598 }
4599}
4600
4601#[derive(Clone, Debug)]
4604pub(crate) struct WeakRoom {
4605 client: WeakClient,
4606 room_id: OwnedRoomId,
4607}
4608
4609impl WeakRoom {
4610 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4612 Self { client, room_id }
4613 }
4614
4615 pub fn get(&self) -> Option<Room> {
4617 self.client.get().and_then(|client| client.get_room(&self.room_id))
4618 }
4619
4620 pub fn room_id(&self) -> &RoomId {
4622 &self.room_id
4623 }
4624}
4625
4626#[derive(Debug, Clone)]
4628pub struct Invite {
4629 pub invitee: RoomMember,
4631
4632 pub inviter_id: OwnedUserId,
4636
4637 pub inviter: Option<RoomMember>,
4641}
4642
4643#[derive(Error, Debug)]
4644enum InvitationError {
4645 #[error("No membership event found")]
4646 EventMissing,
4647}
4648
4649#[derive(Debug, Clone, Default)]
4651#[non_exhaustive]
4652pub struct Receipts {
4653 pub fully_read: Option<OwnedEventId>,
4655 pub public_read_receipt: Option<OwnedEventId>,
4657 pub private_read_receipt: Option<OwnedEventId>,
4659}
4660
4661impl Receipts {
4662 pub fn new() -> Self {
4664 Self::default()
4665 }
4666
4667 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4676 self.fully_read = event_id.into();
4677 self
4678 }
4679
4680 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4686 self.public_read_receipt = event_id.into();
4687 self
4688 }
4689
4690 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4694 self.private_read_receipt = event_id.into();
4695 self
4696 }
4697
4698 pub fn is_empty(&self) -> bool {
4700 self.fully_read.is_none()
4701 && self.public_read_receipt.is_none()
4702 && self.private_read_receipt.is_none()
4703 }
4704}
4705
4706#[derive(Debug)]
4709pub enum ParentSpace {
4710 Reciprocal(Room),
4713 WithPowerlevel(Room),
4718 Illegitimate(Room),
4721 Unverifiable(OwnedRoomId),
4724}
4725
4726#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4730pub struct ReportedContentScore(i8);
4731
4732impl ReportedContentScore {
4733 pub const MIN: Self = Self(-100);
4737
4738 pub const MAX: Self = Self(0);
4742
4743 pub fn new(value: i8) -> Option<Self> {
4752 value.try_into().ok()
4753 }
4754
4755 pub fn new_saturating(value: i8) -> Self {
4761 if value > Self::MAX {
4762 Self::MAX
4763 } else if value < Self::MIN {
4764 Self::MIN
4765 } else {
4766 Self(value)
4767 }
4768 }
4769
4770 pub fn value(&self) -> i8 {
4772 self.0
4773 }
4774}
4775
4776impl PartialEq<i8> for ReportedContentScore {
4777 fn eq(&self, other: &i8) -> bool {
4778 self.0.eq(other)
4779 }
4780}
4781
4782impl PartialEq<ReportedContentScore> for i8 {
4783 fn eq(&self, other: &ReportedContentScore) -> bool {
4784 self.eq(&other.0)
4785 }
4786}
4787
4788impl PartialOrd<i8> for ReportedContentScore {
4789 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4790 self.0.partial_cmp(other)
4791 }
4792}
4793
4794impl PartialOrd<ReportedContentScore> for i8 {
4795 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4796 self.partial_cmp(&other.0)
4797 }
4798}
4799
4800impl From<ReportedContentScore> for Int {
4801 fn from(value: ReportedContentScore) -> Self {
4802 value.0.into()
4803 }
4804}
4805
4806impl TryFrom<i8> for ReportedContentScore {
4807 type Error = TryFromReportedContentScoreError;
4808
4809 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4810 if value > Self::MAX || value < Self::MIN {
4811 Err(TryFromReportedContentScoreError(()))
4812 } else {
4813 Ok(Self(value))
4814 }
4815 }
4816}
4817
4818impl TryFrom<i16> for ReportedContentScore {
4819 type Error = TryFromReportedContentScoreError;
4820
4821 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4822 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4823 value.try_into()
4824 }
4825}
4826
4827impl TryFrom<i32> for ReportedContentScore {
4828 type Error = TryFromReportedContentScoreError;
4829
4830 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4831 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4832 value.try_into()
4833 }
4834}
4835
4836impl TryFrom<i64> for ReportedContentScore {
4837 type Error = TryFromReportedContentScoreError;
4838
4839 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4840 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4841 value.try_into()
4842 }
4843}
4844
4845impl TryFrom<Int> for ReportedContentScore {
4846 type Error = TryFromReportedContentScoreError;
4847
4848 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4849 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4850 value.try_into()
4851 }
4852}
4853
4854trait EventSource {
4855 fn get_event(
4856 &self,
4857 event_id: &EventId,
4858 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4859}
4860
4861impl EventSource for &Room {
4862 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4863 self.load_or_fetch_event(event_id, None).await
4864 }
4865}
4866
4867#[derive(Debug, Clone, Error)]
4870#[error("out of range conversion attempted")]
4871pub struct TryFromReportedContentScoreError(());
4872
4873#[derive(Debug)]
4876pub struct RoomMemberWithSenderInfo {
4877 pub room_member: RoomMember,
4879 pub sender_info: Option<RoomMember>,
4882}
4883
4884#[cfg(all(test, not(target_family = "wasm")))]
4885mod tests {
4886 use std::collections::BTreeMap;
4887
4888 use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4889 use matrix_sdk_test::{
4890 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
4891 };
4892 use ruma::{
4893 RoomVersionId, event_id,
4894 events::{relation::RelationType, room::member::MembershipState},
4895 int, owned_event_id, room_id, user_id,
4896 };
4897 use wiremock::{
4898 Mock, MockServer, ResponseTemplate,
4899 matchers::{header, method, path_regex},
4900 };
4901
4902 use super::ReportedContentScore;
4903 use crate::{
4904 Client,
4905 config::RequestConfig,
4906 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4907 test_utils::{
4908 client::mock_matrix_session,
4909 logged_in_client,
4910 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4911 },
4912 };
4913
4914 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4915 #[async_test]
4916 async fn test_cache_invalidation_while_encrypt() {
4917 use matrix_sdk_base::store::RoomLoadSettings;
4918 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4919
4920 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4921 let session = mock_matrix_session();
4922
4923 let client = Client::builder()
4924 .homeserver_url("http://localhost:1234")
4925 .request_config(RequestConfig::new().disable_retry())
4926 .sqlite_store(&sqlite_path, None)
4927 .build()
4928 .await
4929 .unwrap();
4930 client
4931 .matrix_auth()
4932 .restore_session(session.clone(), RoomLoadSettings::default())
4933 .await
4934 .unwrap();
4935
4936 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4937
4938 let server = MockServer::start().await;
4940 {
4941 Mock::given(method("GET"))
4942 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4943 .and(header("authorization", "Bearer 1234"))
4944 .respond_with(
4945 ResponseTemplate::new(200)
4946 .set_body_json(EventFactory::new().room_encryption().into_content()),
4947 )
4948 .mount(&server)
4949 .await;
4950 let f = EventFactory::new().sender(user_id!("@example:localhost"));
4951 let response = SyncResponseBuilder::default()
4952 .add_joined_room(
4953 JoinedRoomBuilder::default()
4954 .add_state_event(
4955 f.member(user_id!("@example:localhost")).display_name("example"),
4956 )
4957 .add_state_event(f.default_power_levels())
4958 .add_state_event(f.room_encryption()),
4959 )
4960 .build_sync_response();
4961 client.base_client().receive_sync_response(response).await.unwrap();
4962 }
4963
4964 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4965
4966 room.preshare_room_key().await.unwrap();
4968
4969 {
4972 let client = Client::builder()
4973 .homeserver_url("http://localhost:1234")
4974 .request_config(RequestConfig::new().disable_retry())
4975 .sqlite_store(&sqlite_path, None)
4976 .build()
4977 .await
4978 .unwrap();
4979 client
4980 .matrix_auth()
4981 .restore_session(session.clone(), RoomLoadSettings::default())
4982 .await
4983 .unwrap();
4984 client
4985 .encryption()
4986 .enable_cross_process_store_lock("client2".to_owned())
4987 .await
4988 .unwrap();
4989
4990 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4991 assert!(guard.is_some());
4992 }
4993
4994 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4996 assert!(guard.is_some());
4997
4998 let olm = client.olm_machine().await;
5000 let olm = olm.as_ref().expect("Olm machine wasn't started");
5001
5002 let _encrypted_content = olm
5005 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
5006 .await
5007 .unwrap();
5008 }
5009
5010 #[test]
5011 fn reported_content_score() {
5012 let score = ReportedContentScore::new(0).unwrap();
5014 assert_eq!(score.value(), 0);
5015 let score = ReportedContentScore::new(-50).unwrap();
5016 assert_eq!(score.value(), -50);
5017 let score = ReportedContentScore::new(-100).unwrap();
5018 assert_eq!(score.value(), -100);
5019 assert_eq!(ReportedContentScore::new(10), None);
5020 assert_eq!(ReportedContentScore::new(-110), None);
5021
5022 let score = ReportedContentScore::new_saturating(0);
5023 assert_eq!(score.value(), 0);
5024 let score = ReportedContentScore::new_saturating(-50);
5025 assert_eq!(score.value(), -50);
5026 let score = ReportedContentScore::new_saturating(-100);
5027 assert_eq!(score.value(), -100);
5028 let score = ReportedContentScore::new_saturating(10);
5029 assert_eq!(score, ReportedContentScore::MAX);
5030 let score = ReportedContentScore::new_saturating(-110);
5031 assert_eq!(score, ReportedContentScore::MIN);
5032
5033 let score = ReportedContentScore::try_from(0i16).unwrap();
5035 assert_eq!(score.value(), 0);
5036 let score = ReportedContentScore::try_from(-100i16).unwrap();
5037 assert_eq!(score.value(), -100);
5038 ReportedContentScore::try_from(10i16).unwrap_err();
5039 ReportedContentScore::try_from(-110i16).unwrap_err();
5040
5041 let score = ReportedContentScore::try_from(0i32).unwrap();
5043 assert_eq!(score.value(), 0);
5044 let score = ReportedContentScore::try_from(-100i32).unwrap();
5045 assert_eq!(score.value(), -100);
5046 ReportedContentScore::try_from(10i32).unwrap_err();
5047 ReportedContentScore::try_from(-110i32).unwrap_err();
5048
5049 let score = ReportedContentScore::try_from(0i64).unwrap();
5051 assert_eq!(score.value(), 0);
5052 let score = ReportedContentScore::try_from(-100i64).unwrap();
5053 assert_eq!(score.value(), -100);
5054 ReportedContentScore::try_from(10i64).unwrap_err();
5055 ReportedContentScore::try_from(-110i64).unwrap_err();
5056
5057 let score = ReportedContentScore::try_from(int!(0)).unwrap();
5059 assert_eq!(score.value(), 0);
5060 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
5061 assert_eq!(score.value(), -100);
5062 ReportedContentScore::try_from(int!(10)).unwrap_err();
5063 ReportedContentScore::try_from(int!(-110)).unwrap_err();
5064 }
5065
5066 #[async_test]
5067 async fn test_composer_draft() {
5068 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
5069
5070 let client = logged_in_client(None).await;
5071
5072 let response = SyncResponseBuilder::default()
5073 .add_joined_room(JoinedRoomBuilder::default())
5074 .build_sync_response();
5075 client.base_client().receive_sync_response(response).await.unwrap();
5076 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
5077
5078 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
5079
5080 let draft = ComposerDraft {
5083 plain_text: "Hello, world!".to_owned(),
5084 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
5085 draft_type: ComposerDraftType::NewMessage,
5086 attachments: vec![DraftAttachment {
5087 filename: "cat.txt".to_owned(),
5088 content: matrix_sdk_base::DraftAttachmentContent::File {
5089 data: b"meow".to_vec(),
5090 mimetype: Some("text/plain".to_owned()),
5091 size: Some(5),
5092 },
5093 }],
5094 };
5095
5096 room.save_composer_draft(draft.clone(), None).await.unwrap();
5097
5098 let thread_root = owned_event_id!("$thread_root:b.c");
5099 let thread_draft = ComposerDraft {
5100 plain_text: "Hello, thread!".to_owned(),
5101 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
5102 draft_type: ComposerDraftType::NewMessage,
5103 attachments: vec![DraftAttachment {
5104 filename: "dog.txt".to_owned(),
5105 content: matrix_sdk_base::DraftAttachmentContent::File {
5106 data: b"wuv".to_vec(),
5107 mimetype: Some("text/plain".to_owned()),
5108 size: Some(4),
5109 },
5110 }],
5111 };
5112
5113 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
5114
5115 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
5117
5118 assert_eq!(
5120 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
5121 Some(thread_draft.clone())
5122 );
5123
5124 room.clear_composer_draft(None).await.unwrap();
5126 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
5127
5128 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
5130
5131 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
5133 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
5134 }
5135
5136 #[async_test]
5137 async fn test_mark_join_requests_as_seen() {
5138 let server = MatrixMockServer::new().await;
5139 let client = server.client_builder().build().await;
5140 let event_id = event_id!("$a:b.c");
5141 let room_id = room_id!("!a:b.c");
5142 let user_id = user_id!("@alice:b.c");
5143
5144 let f = EventFactory::new().room(room_id);
5145 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5146 f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
5147 ]);
5148 let room = server.sync_room(&client, joined_room_builder).await;
5149
5150 let seen_ids =
5152 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
5153 assert!(seen_ids.is_empty());
5154
5155 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
5157 .await
5158 .expect("Couldn't mark join request as seen");
5159
5160 let seen_ids =
5162 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
5163 assert_eq!(seen_ids.len(), 1);
5164 assert_eq!(
5165 seen_ids.into_iter().next().expect("No next value"),
5166 (event_id.to_owned(), user_id.to_owned())
5167 )
5168 }
5169
5170 #[async_test]
5171 async fn test_own_room_membership_with_no_own_member_event() {
5172 let server = MatrixMockServer::new().await;
5173 let client = server.client_builder().build().await;
5174 let room_id = room_id!("!a:b.c");
5175
5176 let room = server.sync_joined_room(&client, room_id).await;
5177
5178 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
5181 assert!(error.is_some());
5182 }
5183
5184 #[async_test]
5185 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
5186 let server = MatrixMockServer::new().await;
5187 let client = server.client_builder().build().await;
5188 let room_id = room_id!("!a:b.c");
5189 let user_id = user_id!("@example:localhost");
5190
5191 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5192 let joined_room_builder =
5193 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5194 let room = server.sync_room(&client, joined_room_builder).await;
5195
5196 let ret = room
5198 .member_with_sender_info(client.user_id().unwrap())
5199 .await
5200 .expect("Room member info should be available");
5201
5202 assert_eq!(ret.room_member.event().user_id(), user_id);
5204
5205 assert!(ret.sender_info.is_none());
5207 }
5208
5209 #[async_test]
5210 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5211 let server = MatrixMockServer::new().await;
5212 let client = server.client_builder().build().await;
5213 let room_id = room_id!("!a:b.c");
5214 let user_id = user_id!("@example:localhost");
5215
5216 let f = EventFactory::new().room(room_id).sender(user_id);
5217 let joined_room_builder =
5218 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5219 let room = server.sync_room(&client, joined_room_builder).await;
5220
5221 let ret = room
5223 .member_with_sender_info(client.user_id().unwrap())
5224 .await
5225 .expect("Room member info should be available");
5226
5227 assert_eq!(ret.room_member.event().user_id(), user_id);
5229
5230 assert!(ret.sender_info.is_some());
5232 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5233 }
5234
5235 #[async_test]
5236 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5237 let server = MatrixMockServer::new().await;
5238 let client = server.client_builder().build().await;
5239 let room_id = room_id!("!a:b.c");
5240 let user_id = user_id!("@example:localhost");
5241 let sender_id = user_id!("@alice:b.c");
5242
5243 let f = EventFactory::new().room(room_id).sender(sender_id);
5244 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5245 f.member(user_id).into(),
5246 f.member(sender_id).into(),
5248 ]);
5249 let room = server.sync_room(&client, joined_room_builder).await;
5250
5251 let ret = room
5253 .member_with_sender_info(client.user_id().unwrap())
5254 .await
5255 .expect("Room member info should be available");
5256
5257 assert_eq!(ret.room_member.event().user_id(), user_id);
5259
5260 assert!(ret.sender_info.is_some());
5262 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5263 }
5264
5265 #[async_test]
5266 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5267 let server = MatrixMockServer::new().await;
5268 let client = server.client_builder().build().await;
5269 let room_id = room_id!("!a:b.c");
5270 let user_id = user_id!("@example:localhost");
5271 let sender_id = user_id!("@alice:b.c");
5272
5273 let f = EventFactory::new().room(room_id).sender(sender_id);
5274 let joined_room_builder =
5275 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5276 let room = server.sync_room(&client, joined_room_builder).await;
5277
5278 server
5280 .mock_get_members()
5281 .ok(vec![f.member(sender_id).into_raw()])
5282 .mock_once()
5283 .mount()
5284 .await;
5285
5286 let ret = room
5288 .member_with_sender_info(client.user_id().unwrap())
5289 .await
5290 .expect("Room member info should be available");
5291
5292 assert_eq!(ret.room_member.event().user_id(), user_id);
5294
5295 assert!(ret.sender_info.is_some());
5297 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5298 }
5299
5300 #[async_test]
5301 async fn test_list_threads() {
5302 let server = MatrixMockServer::new().await;
5303 let client = server.client_builder().build().await;
5304
5305 let room_id = room_id!("!a:b.c");
5306 let sender_id = user_id!("@alice:b.c");
5307 let f = EventFactory::new().room(room_id).sender(sender_id);
5308
5309 let eid1 = event_id!("$1");
5310 let eid2 = event_id!("$2");
5311 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5312 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5313
5314 server
5315 .mock_room_threads()
5316 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5317 .mock_once()
5318 .mount()
5319 .await;
5320 server
5321 .mock_room_threads()
5322 .match_from("prev_batch")
5323 .ok(batch2, None)
5324 .mock_once()
5325 .mount()
5326 .await;
5327
5328 let room = server.sync_joined_room(&client, room_id).await;
5329 let result =
5330 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5331 assert_eq!(result.chunk.len(), 1);
5332 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5333 assert!(result.prev_batch_token.is_some());
5334
5335 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5336 let result = room.list_threads(opts).await.expect("Failed to list threads");
5337 assert_eq!(result.chunk.len(), 1);
5338 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5339 assert!(result.prev_batch_token.is_none());
5340 }
5341
5342 #[async_test]
5343 async fn test_relations() {
5344 let server = MatrixMockServer::new().await;
5345 let client = server.client_builder().build().await;
5346
5347 let room_id = room_id!("!a:b.c");
5348 let sender_id = user_id!("@alice:b.c");
5349 let f = EventFactory::new().room(room_id).sender(sender_id);
5350
5351 let target_event_id = owned_event_id!("$target");
5352 let eid1 = event_id!("$1");
5353 let eid2 = event_id!("$2");
5354 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5355 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5356
5357 server
5358 .mock_room_relations()
5359 .match_target_event(target_event_id.clone())
5360 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5361 .mock_once()
5362 .mount()
5363 .await;
5364
5365 server
5366 .mock_room_relations()
5367 .match_target_event(target_event_id.clone())
5368 .match_from("next_batch")
5369 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5370 .mock_once()
5371 .mount()
5372 .await;
5373
5374 let room = server.sync_joined_room(&client, room_id).await;
5375
5376 let mut opts = RelationsOptions {
5378 include_relations: IncludeRelations::AllRelations,
5379 ..Default::default()
5380 };
5381 let result = room
5382 .relations(target_event_id.clone(), opts.clone())
5383 .await
5384 .expect("Failed to list relations the first time");
5385 assert_eq!(result.chunk.len(), 1);
5386 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5387 assert!(result.prev_batch_token.is_none());
5388 assert!(result.next_batch_token.is_some());
5389 assert!(result.recursion_depth.is_none());
5390
5391 opts.from = result.next_batch_token;
5392 let result = room
5393 .relations(target_event_id, opts)
5394 .await
5395 .expect("Failed to list relations the second time");
5396 assert_eq!(result.chunk.len(), 1);
5397 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5398 assert!(result.prev_batch_token.is_none());
5399 assert!(result.next_batch_token.is_none());
5400 assert!(result.recursion_depth.is_none());
5401 }
5402
5403 #[async_test]
5404 async fn test_relations_with_reltype() {
5405 let server = MatrixMockServer::new().await;
5406 let client = server.client_builder().build().await;
5407
5408 let room_id = room_id!("!a:b.c");
5409 let sender_id = user_id!("@alice:b.c");
5410 let f = EventFactory::new().room(room_id).sender(sender_id);
5411
5412 let target_event_id = owned_event_id!("$target");
5413 let eid1 = event_id!("$1");
5414 let eid2 = event_id!("$2");
5415 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5416 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5417
5418 server
5419 .mock_room_relations()
5420 .match_target_event(target_event_id.clone())
5421 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5422 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5423 .mock_once()
5424 .mount()
5425 .await;
5426
5427 server
5428 .mock_room_relations()
5429 .match_target_event(target_event_id.clone())
5430 .match_from("next_batch")
5431 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5432 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5433 .mock_once()
5434 .mount()
5435 .await;
5436
5437 let room = server.sync_joined_room(&client, room_id).await;
5438
5439 let mut opts = RelationsOptions {
5441 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5442 ..Default::default()
5443 };
5444 let result = room
5445 .relations(target_event_id.clone(), opts.clone())
5446 .await
5447 .expect("Failed to list relations the first time");
5448 assert_eq!(result.chunk.len(), 1);
5449 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5450 assert!(result.prev_batch_token.is_none());
5451 assert!(result.next_batch_token.is_some());
5452 assert!(result.recursion_depth.is_none());
5453
5454 opts.from = result.next_batch_token;
5455 let result = room
5456 .relations(target_event_id, opts)
5457 .await
5458 .expect("Failed to list relations the second time");
5459 assert_eq!(result.chunk.len(), 1);
5460 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5461 assert!(result.prev_batch_token.is_none());
5462 assert!(result.next_batch_token.is_none());
5463 assert!(result.recursion_depth.is_none());
5464 }
5465
5466 #[async_test]
5467 async fn test_power_levels_computation() {
5468 let server = MatrixMockServer::new().await;
5469 let client = server.client_builder().build().await;
5470
5471 let room_id = room_id!("!a:b.c");
5472 let sender_id = client.user_id().expect("No session id");
5473 let f = EventFactory::new().room(room_id).sender(sender_id);
5474 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5475
5476 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5478 let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5479 let room_member_event = f.member(sender_id).into();
5480
5481 let room = server
5483 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5484 .await;
5485 let ctx = room
5486 .push_condition_room_ctx()
5487 .await
5488 .expect("Failed to get push condition context")
5489 .expect("Could not get push condition context");
5490
5491 assert!(ctx.power_levels.is_none());
5493
5494 let room = server
5496 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5497 .await;
5498 let ctx = room
5499 .push_condition_room_ctx()
5500 .await
5501 .expect("Failed to get push condition context")
5502 .expect("Could not get push condition context");
5503
5504 assert!(ctx.power_levels.is_none());
5506
5507 let room = server
5509 .sync_room(
5510 &client,
5511 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5512 )
5513 .await;
5514 let ctx = room
5515 .push_condition_room_ctx()
5516 .await
5517 .expect("Failed to get push condition context")
5518 .expect("Could not get push condition context");
5519
5520 assert!(ctx.power_levels.is_some());
5522 }
5523}