1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 ops::Deref,
21 sync::Arc,
22 time::Duration,
23};
24
25use async_stream::stream;
26use eyeball::SharedObservable;
27use futures_core::Stream;
28use futures_util::{
29 future::{try_join, try_join_all},
30 stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "e2e-encryption")]
36use matrix_sdk_base::crypto::{DecryptionSettings, RoomEventDecryptionResult};
37#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
38use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
39use matrix_sdk_base::{
40 deserialized_responses::{
41 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
42 },
43 event_cache::store::media::IgnoreMediaRetentionPolicy,
44 media::MediaThumbnailSettings,
45 store::StateStoreExt,
46 ComposerDraft, RoomInfoNotableUpdateReasons, RoomMemberships, StateChanges, StateStoreDataKey,
47 StateStoreDataValue,
48};
49#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
50use matrix_sdk_common::BoxFuture;
51use matrix_sdk_common::{
52 deserialized_responses::TimelineEvent,
53 executor::{spawn, JoinHandle},
54 timeout::timeout,
55};
56use mime::Mime;
57#[cfg(feature = "e2e-encryption")]
58use ruma::events::{
59 room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
60 SyncMessageLikeEvent,
61};
62use ruma::{
63 api::client::{
64 config::{set_global_account_data, set_room_account_data},
65 context,
66 error::ErrorKind,
67 filter::LazyLoadOptions,
68 membership::{
69 ban_user, forget_room, get_member_events,
70 invite_user::{self, v3::InvitationRecipient},
71 kick_user, leave_room, unban_user, Invite3pid,
72 },
73 message::send_message_event,
74 read_marker::set_read_marker,
75 receipt::create_receipt,
76 redact::redact_event,
77 room::{get_room_event, report_content, report_room},
78 state::{get_state_events_for_key, send_state_event},
79 tag::{create_tag, delete_tag},
80 typing::create_typing_event::{self, v3::Typing},
81 },
82 assign,
83 events::{
84 beacon::BeaconEventContent,
85 beacon_info::BeaconInfoEventContent,
86 call::notify::{ApplicationType, CallNotifyEventContent, NotifyType},
87 direct::DirectEventContent,
88 marked_unread::{MarkedUnreadEventContent, UnstableMarkedUnreadEventContent},
89 receipt::{Receipt, ReceiptThread, ReceiptType},
90 room::{
91 avatar::{self, RoomAvatarEventContent},
92 encryption::RoomEncryptionEventContent,
93 history_visibility::HistoryVisibility,
94 member::{MembershipChange, SyncRoomMemberEvent},
95 message::{
96 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
97 FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
98 UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo,
99 VideoMessageEventContent,
100 },
101 name::RoomNameEventContent,
102 pinned_events::RoomPinnedEventsEventContent,
103 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
104 server_acl::RoomServerAclEventContent,
105 topic::RoomTopicEventContent,
106 ImageInfo, MediaSource, ThumbnailInfo,
107 },
108 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
109 tag::{TagInfo, TagName},
110 typing::SyncTypingEvent,
111 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
112 Mentions, MessageLikeEventContent, MessageLikeEventType, OriginalSyncStateEvent,
113 RedactContent, RedactedStateEventContent, RoomAccountDataEvent,
114 RoomAccountDataEventContent, RoomAccountDataEventType, StateEventContent, StateEventType,
115 StaticEventContent, StaticStateEventContent, SyncStateEvent,
116 },
117 push::{Action, PushConditionRoomCtx},
118 serde::Raw,
119 time::Instant,
120 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
121 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
122};
123use serde::de::DeserializeOwned;
124use thiserror::Error;
125use tokio::sync::broadcast;
126use tokio_stream::StreamExt;
127use tracing::{debug, info, instrument, warn};
128
129use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
130pub use self::{
131 member::{RoomMember, RoomMemberRole},
132 messages::{EventWithContextResponse, Messages, MessagesOptions},
133};
134#[cfg(doc)]
135use crate::event_cache::EventCache;
136use crate::{
137 attachment::{AttachmentConfig, AttachmentInfo},
138 client::WeakClient,
139 config::RequestConfig,
140 error::{BeaconError, WrongRoomState},
141 event_cache::{self, EventCacheDropHandles, RoomEventCache},
142 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
143 live_location_share::ObservableLiveLocation,
144 media::{MediaFormat, MediaRequestParameters},
145 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
146 room::{
147 knock_requests::{KnockRequest, KnockRequestMemberInfo},
148 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
149 privacy_settings::RoomPrivacySettings,
150 },
151 sync::RoomUpdate,
152 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
153 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
154};
155#[cfg(feature = "e2e-encryption")]
156use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
157
158pub mod edit;
159pub mod futures;
160pub mod identity_status_changes;
161pub mod knock_requests;
163mod member;
164mod messages;
165pub mod power_levels;
166
167pub mod privacy_settings;
169
170#[derive(Debug, Clone)]
173pub struct Room {
174 inner: BaseRoom,
175 pub(crate) client: Client,
176}
177
178impl Deref for Room {
179 type Target = BaseRoom;
180
181 fn deref(&self) -> &Self::Target {
182 &self.inner
183 }
184}
185
186const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
187const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
188
189impl Room {
190 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
197 Self { inner: room, client }
198 }
199
200 #[doc(alias = "reject_invitation")]
204 pub async fn leave(&self) -> Result<()> {
205 let state = self.state();
206 if state == RoomState::Left {
207 return Err(Error::WrongRoomState(WrongRoomState::new("Joined or Invited", state)));
208 }
209
210 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
211 self.client.send(request).await?;
212 self.client.base_client().room_left(self.room_id()).await?;
213 Ok(())
214 }
215
216 #[doc(alias = "accept_invitation")]
220 pub async fn join(&self) -> Result<()> {
221 let state = self.state();
222 if state == RoomState::Joined {
223 return Err(Error::WrongRoomState(WrongRoomState::new("Invited or Left", state)));
224 }
225
226 let prev_room_state = self.inner.state();
227
228 let mark_as_direct = prev_room_state == RoomState::Invited
229 && self.inner.is_direct().await.unwrap_or_else(|e| {
230 warn!(room_id = ?self.room_id(), "is_direct() failed: {e}");
231 false
232 });
233
234 self.client.join_room_by_id(self.room_id()).await?;
235
236 if mark_as_direct {
237 self.set_is_direct(true).await?;
238 }
239
240 Ok(())
241 }
242
243 pub fn client(&self) -> Client {
247 self.client.clone()
248 }
249
250 pub fn is_synced(&self) -> bool {
253 self.inner.is_state_fully_synced()
254 }
255
256 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
286 let Some(url) = self.avatar_url() else { return Ok(None) };
287 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
288 Ok(Some(self.client.media().get_media_content(&request, true).await?))
289 }
290
291 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
320 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
321 let room_id = self.inner.room_id();
322 let request = options.into_request(room_id);
323 let http_response = self.client.send(request).await?;
324
325 #[allow(unused_mut)]
326 let mut response = Messages {
327 start: http_response.start,
328 end: http_response.end,
329 #[cfg(not(feature = "e2e-encryption"))]
330 chunk: http_response
331 .chunk
332 .into_iter()
333 .map(|raw| TimelineEvent::new(raw.cast()))
334 .collect(),
335 #[cfg(feature = "e2e-encryption")]
336 chunk: Vec::with_capacity(http_response.chunk.len()),
337 state: http_response.state,
338 };
339
340 #[cfg(feature = "e2e-encryption")]
341 for event in http_response.chunk {
342 let decrypted_event = if let Ok(AnySyncTimelineEvent::MessageLike(
343 AnySyncMessageLikeEvent::RoomEncrypted(SyncMessageLikeEvent::Original(_)),
344 )) = event.deserialize_as::<AnySyncTimelineEvent>()
345 {
346 if let Ok(event) = self.decrypt_event(event.cast_ref()).await {
347 event
348 } else {
349 TimelineEvent::new(event.cast())
350 }
351 } else {
352 TimelineEvent::new(event.cast())
353 };
354 response.chunk.push(decrypted_event);
355 }
356
357 if let Some(push_context) = self.push_context().await? {
358 let push_rules = self.client().account().push_rules().await?;
359
360 for event in &mut response.chunk {
361 event.push_actions =
362 Some(push_rules.get_actions(event.raw(), &push_context).to_owned());
363 }
364 }
365
366 Ok(response)
367 }
368
369 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
379 where
380 Ev: SyncEvent + DeserializeOwned + Send + 'static,
381 H: EventHandler<Ev, Ctx>,
382 {
383 self.client.add_room_event_handler(self.room_id(), handler)
384 }
385
386 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
391 self.client.subscribe_to_room_updates(self.room_id())
392 }
393
394 pub fn subscribe_to_typing_notifications(
400 &self,
401 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
402 let (sender, receiver) = broadcast::channel(16);
403 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
404 let own_user_id = self.own_user_id().to_owned();
405 move |event: SyncTypingEvent| async move {
406 let typing_user_ids = event
408 .content
409 .user_ids
410 .into_iter()
411 .filter(|user_id| *user_id != own_user_id)
412 .collect();
413 let _ = sender.send(typing_user_ids);
415 }
416 });
417 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
418 (drop_guard, receiver)
419 }
420
421 #[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
444 pub async fn subscribe_to_identity_status_changes(
445 &self,
446 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
447 IdentityStatusChanges::create_stream(self.clone()).await
448 }
449
450 async fn try_decrypt_event(&self, event: Raw<AnyTimelineEvent>) -> Result<TimelineEvent> {
456 #[cfg(feature = "e2e-encryption")]
457 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
458 SyncMessageLikeEvent::Original(_),
459 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
460 {
461 if let Ok(event) = self.decrypt_event(event.cast_ref()).await {
462 return Ok(event);
463 }
464 }
465
466 let mut event = TimelineEvent::new(event.cast());
467 event.push_actions = self.event_push_actions(event.raw()).await?;
468
469 Ok(event)
470 }
471
472 pub async fn event(
477 &self,
478 event_id: &EventId,
479 request_config: Option<RequestConfig>,
480 ) -> Result<TimelineEvent> {
481 let request =
482 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
483
484 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
485 let event = self.try_decrypt_event(raw_event).await?;
486
487 if let Ok((cache, _handles)) = self.event_cache().await {
489 cache.save_event(event.clone()).await;
490 }
491
492 Ok(event)
493 }
494
495 pub async fn event_with_context(
498 &self,
499 event_id: &EventId,
500 lazy_load_members: bool,
501 context_size: UInt,
502 request_config: Option<RequestConfig>,
503 ) -> Result<EventWithContextResponse> {
504 let mut request =
505 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
506
507 request.limit = context_size;
508
509 if lazy_load_members {
510 request.filter.lazy_load_options =
511 LazyLoadOptions::Enabled { include_redundant_members: false };
512 }
513
514 let response = self.client.send(request).with_request_config(request_config).await?;
515
516 let target_event = if let Some(event) = response.event {
517 Some(self.try_decrypt_event(event).await?)
518 } else {
519 None
520 };
521
522 let (events_before, events_after) = try_join(
526 try_join_all(response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev))),
527 try_join_all(response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev))),
528 )
529 .await?;
530
531 if let Ok((cache, _handles)) = self.event_cache().await {
533 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
534 if let Some(event) = &target_event {
535 events_to_save.push(event.clone());
536 }
537
538 for event in &events_before {
539 events_to_save.push(event.clone());
540 }
541
542 for event in &events_after {
543 events_to_save.push(event.clone());
544 }
545
546 cache.save_events(events_to_save).await;
547 }
548
549 Ok(EventWithContextResponse {
550 event: target_event,
551 events_before,
552 events_after,
553 state: response.state,
554 prev_batch_token: response.start,
555 next_batch_token: response.end,
556 })
557 }
558
559 pub(crate) async fn request_members(&self) -> Result<()> {
560 self.client
561 .locks()
562 .members_request_deduplicated_handler
563 .run(self.room_id().to_owned(), async move {
564 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
565 let response = self
566 .client
567 .send(request.clone())
568 .with_request_config(
569 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
572 )
573 .await?;
574
575 Box::pin(self.client.base_client().receive_all_members(
577 self.room_id(),
578 &request,
579 &response,
580 ))
581 .await?;
582
583 Ok(())
584 })
585 .await
586 }
587
588 async fn request_encryption_state(&self) -> Result<()> {
589 self.client
590 .locks()
591 .encryption_state_deduplicated_handler
592 .run(self.room_id().to_owned(), async move {
593 let request = get_state_events_for_key::v3::Request::new(
595 self.room_id().to_owned(),
596 StateEventType::RoomEncryption,
597 "".to_owned(),
598 );
599 let response = match self.client.send(request).await {
600 Ok(response) => {
601 Some(response.content.deserialize_as::<RoomEncryptionEventContent>()?)
602 }
603 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
604 Err(err) => return Err(err.into()),
605 };
606
607 let _sync_lock = self.client.base_client().sync_lock().lock().await;
608
609 let mut room_info = self.clone_info();
612 room_info.mark_encryption_state_synced();
613 room_info.set_encryption_event(response.clone());
614 let mut changes = StateChanges::default();
615 changes.add_room(room_info.clone());
616
617 self.client.store().save_changes(&changes).await?;
618 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
619
620 Ok(())
621 })
622 .await
623 }
624
625 pub async fn is_encrypted(&self) -> Result<bool> {
630 if !self.is_encryption_state_synced() {
631 self.request_encryption_state().await?;
632 }
633
634 Ok(self.inner.is_encrypted())
635 }
636
637 #[cfg(feature = "e2e-encryption")]
639 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
640 let encryption = self.client.encryption();
641
642 let this_device_is_verified = match encryption.get_own_device().await {
643 Ok(Some(device)) => device.is_verified_with_cross_signing(),
644
645 _ => true,
647 };
648
649 let backup_exists_on_server =
650 encryption.backups().exists_on_server().await.unwrap_or(false);
651
652 CryptoContextInfo {
653 device_creation_ts: encryption.device_creation_timestamp().await,
654 this_device_is_verified,
655 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
656 backup_exists_on_server,
657 }
658 }
659
660 fn are_events_visible(&self) -> bool {
661 if let RoomState::Invited = self.inner.state() {
662 return matches!(
663 self.inner.history_visibility_or_default(),
664 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
665 );
666 }
667
668 true
669 }
670
671 pub async fn sync_members(&self) -> Result<()> {
677 if !self.are_events_visible() {
678 return Ok(());
679 }
680
681 if !self.are_members_synced() {
682 self.request_members().await
683 } else {
684 Ok(())
685 }
686 }
687
688 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
702 self.sync_members().await?;
703 self.get_member_no_sync(user_id).await
704 }
705
706 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
720 Ok(self
721 .inner
722 .get_member(user_id)
723 .await?
724 .map(|member| RoomMember::new(self.client.clone(), member)))
725 }
726
727 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
736 self.sync_members().await?;
737 self.members_no_sync(memberships).await
738 }
739
740 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
749 Ok(self
750 .inner
751 .members(memberships)
752 .await?
753 .into_iter()
754 .map(|member| RoomMember::new(self.client.clone(), member))
755 .collect())
756 }
757
758 pub async fn get_state_events(
760 &self,
761 event_type: StateEventType,
762 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
763 self.client.store().get_state_events(self.room_id(), event_type).await.map_err(Into::into)
764 }
765
766 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
783 where
784 C: StaticEventContent + StaticStateEventContent + RedactContent,
785 C::Redacted: RedactedStateEventContent,
786 {
787 Ok(self.client.store().get_state_events_static(self.room_id()).await?)
788 }
789
790 pub async fn get_state_events_for_keys(
793 &self,
794 event_type: StateEventType,
795 state_keys: &[&str],
796 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
797 self.client
798 .store()
799 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
800 .await
801 .map_err(Into::into)
802 }
803
804 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
824 &self,
825 state_keys: I,
826 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
827 where
828 C: StaticEventContent + StaticStateEventContent + RedactContent,
829 C::StateKey: Borrow<K>,
830 C::Redacted: RedactedStateEventContent,
831 K: AsRef<str> + Sized + Sync + 'a,
832 I: IntoIterator<Item = &'a K> + Send,
833 I::IntoIter: Send,
834 {
835 Ok(self.client.store().get_state_events_for_keys_static(self.room_id(), state_keys).await?)
836 }
837
838 pub async fn get_state_event(
840 &self,
841 event_type: StateEventType,
842 state_key: &str,
843 ) -> Result<Option<RawAnySyncOrStrippedState>> {
844 self.client
845 .store()
846 .get_state_event(self.room_id(), event_type, state_key)
847 .await
848 .map_err(Into::into)
849 }
850
851 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
870 where
871 C: StaticEventContent + StaticStateEventContent<StateKey = EmptyStateKey> + RedactContent,
872 C::Redacted: RedactedStateEventContent,
873 {
874 self.get_state_event_static_for_key(&EmptyStateKey).await
875 }
876
877 pub async fn get_state_event_static_for_key<C, K>(
897 &self,
898 state_key: &K,
899 ) -> Result<Option<RawSyncOrStrippedState<C>>>
900 where
901 C: StaticEventContent + StaticStateEventContent + RedactContent,
902 C::StateKey: Borrow<K>,
903 C::Redacted: RedactedStateEventContent,
904 K: AsRef<str> + ?Sized + Sync,
905 {
906 Ok(self.client.store().get_state_event_static_for_key(self.room_id(), state_key).await?)
907 }
908
909 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
913 Ok(self
918 .get_state_events_static::<SpaceParentEventContent>()
919 .await?
920 .into_iter()
921 .flat_map(|parent_event| match parent_event.deserialize() {
923 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
924 Some((e.state_key.to_owned(), e.sender))
925 }
926 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
927 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
928 Err(e) => {
929 info!(room_id = ?self.room_id(), "Could not deserialize m.room.parent: {e}");
930 None
931 }
932 })
933 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
935 let Some(parent_room) = self.client.get_room(&state_key) else {
936 return Ok(ParentSpace::Unverifiable(state_key));
939 };
940 if let Some(child_event) = parent_room
943 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
944 .await?
945 {
946 match child_event.deserialize() {
947 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
948 return Ok(ParentSpace::Reciprocal(parent_room));
951 }
952 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
953 Ok(SyncOrStrippedState::Stripped(_)) => {}
954 Err(e) => {
955 info!(
956 room_id = ?self.room_id(), parent_room_id = ?state_key,
957 "Could not deserialize m.room.child: {e}"
958 );
959 }
960 }
961 }
966
967 let Some(member) = parent_room.get_member(&sender).await? else {
970 return Ok(ParentSpace::Illegitimate(parent_room));
972 };
973
974 if member.can_send_state(StateEventType::SpaceChild) {
975 Ok(ParentSpace::WithPowerlevel(parent_room))
977 } else {
978 Ok(ParentSpace::Illegitimate(parent_room))
979 }
980 })
981 .collect::<FuturesUnordered<_>>())
982 }
983
984 pub async fn account_data(
986 &self,
987 data_type: RoomAccountDataEventType,
988 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
989 self.client
990 .store()
991 .get_room_account_data_event(self.room_id(), data_type)
992 .await
993 .map_err(Into::into)
994 }
995
996 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1015 where
1016 C: StaticEventContent + RoomAccountDataEventContent,
1017 {
1018 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast))
1019 }
1020
1021 #[cfg(feature = "e2e-encryption")]
1026 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1027 let user_ids =
1028 self.client.store().get_user_ids(self.room_id(), RoomMemberships::empty()).await?;
1029
1030 for user_id in user_ids {
1031 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1032 let any_unverified = devices.devices().any(|d| !d.is_verified());
1033
1034 if any_unverified {
1035 return Ok(false);
1036 }
1037 }
1038
1039 Ok(true)
1040 }
1041
1042 pub async fn set_account_data<T>(
1057 &self,
1058 content: T,
1059 ) -> Result<set_room_account_data::v3::Response>
1060 where
1061 T: RoomAccountDataEventContent,
1062 {
1063 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1064
1065 let request = set_room_account_data::v3::Request::new(
1066 own_user.to_owned(),
1067 self.room_id().to_owned(),
1068 &content,
1069 )?;
1070
1071 Ok(self.client.send(request).await?)
1072 }
1073
1074 pub async fn set_account_data_raw(
1099 &self,
1100 event_type: RoomAccountDataEventType,
1101 content: Raw<AnyRoomAccountDataEventContent>,
1102 ) -> Result<set_room_account_data::v3::Response> {
1103 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1104
1105 let request = set_room_account_data::v3::Request::new_raw(
1106 own_user.to_owned(),
1107 self.room_id().to_owned(),
1108 event_type,
1109 content,
1110 );
1111
1112 Ok(self.client.send(request).await?)
1113 }
1114
1115 pub async fn set_tag(
1146 &self,
1147 tag: TagName,
1148 tag_info: TagInfo,
1149 ) -> Result<create_tag::v3::Response> {
1150 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1151 let request = create_tag::v3::Request::new(
1152 user_id.to_owned(),
1153 self.inner.room_id().to_owned(),
1154 tag.to_string(),
1155 tag_info,
1156 );
1157 Ok(self.client.send(request).await?)
1158 }
1159
1160 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1167 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1168 let request = delete_tag::v3::Request::new(
1169 user_id.to_owned(),
1170 self.inner.room_id().to_owned(),
1171 tag.to_string(),
1172 );
1173 Ok(self.client.send(request).await?)
1174 }
1175
1176 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1186 if is_favourite {
1187 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1188
1189 self.set_tag(TagName::Favorite, tag_info).await?;
1190
1191 if self.is_low_priority() {
1192 self.remove_tag(TagName::LowPriority).await?;
1193 }
1194 } else {
1195 self.remove_tag(TagName::Favorite).await?;
1196 }
1197 Ok(())
1198 }
1199
1200 pub async fn set_is_low_priority(
1210 &self,
1211 is_low_priority: bool,
1212 tag_order: Option<f64>,
1213 ) -> Result<()> {
1214 if is_low_priority {
1215 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1216
1217 self.set_tag(TagName::LowPriority, tag_info).await?;
1218
1219 if self.is_favourite() {
1220 self.remove_tag(TagName::Favorite).await?;
1221 }
1222 } else {
1223 self.remove_tag(TagName::LowPriority).await?;
1224 }
1225 Ok(())
1226 }
1227
1228 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1237 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1238
1239 let mut content = self
1240 .client
1241 .account()
1242 .account_data::<DirectEventContent>()
1243 .await?
1244 .map(|c| c.deserialize())
1245 .transpose()?
1246 .unwrap_or_default();
1247
1248 let this_room_id = self.inner.room_id();
1249
1250 if is_direct {
1251 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1252 room_members.retain(|member| member.user_id() != self.own_user_id());
1253
1254 for member in room_members {
1255 let entry = content.entry(member.user_id().into()).or_default();
1256 if !entry.iter().any(|room_id| room_id == this_room_id) {
1257 entry.push(this_room_id.to_owned());
1258 }
1259 }
1260 } else {
1261 for (_, list) in content.iter_mut() {
1262 list.retain(|room_id| *room_id != this_room_id);
1263 }
1264
1265 content.retain(|_, list| !list.is_empty());
1267 }
1268
1269 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1270
1271 self.client.send(request).await?;
1272 Ok(())
1273 }
1274
1275 #[cfg(feature = "e2e-encryption")]
1283 pub async fn decrypt_event(
1284 &self,
1285 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1286 ) -> Result<TimelineEvent> {
1287 let machine = self.client.olm_machine().await;
1288 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1289
1290 let decryption_settings = DecryptionSettings {
1291 sender_device_trust_requirement: self.client.base_client().decryption_trust_requirement,
1292 };
1293 let mut event: TimelineEvent = match machine
1294 .try_decrypt_room_event(event.cast_ref(), self.inner.room_id(), &decryption_settings)
1295 .await?
1296 {
1297 RoomEventDecryptionResult::Decrypted(decrypted) => decrypted.into(),
1298 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1299 self.client
1300 .encryption()
1301 .backups()
1302 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1303 TimelineEvent::new_utd_event(event.clone().cast(), utd_info)
1304 }
1305 };
1306
1307 event.push_actions = self.event_push_actions(event.raw()).await?;
1308 Ok(event)
1309 }
1310
1311 #[cfg(feature = "e2e-encryption")]
1324 pub async fn discard_room_key(&self) -> Result<()> {
1325 let machine = self.client.olm_machine().await;
1326 if let Some(machine) = machine.as_ref() {
1327 machine.discard_room_key(self.inner.room_id()).await?;
1328 Ok(())
1329 } else {
1330 Err(Error::NoOlmMachine)
1331 }
1332 }
1333
1334 #[instrument(skip_all)]
1342 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1343 let request = assign!(
1344 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1345 { reason: reason.map(ToOwned::to_owned) }
1346 );
1347 self.client.send(request).await?;
1348 Ok(())
1349 }
1350
1351 #[instrument(skip_all)]
1359 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1360 let request = assign!(
1361 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1362 { reason: reason.map(ToOwned::to_owned) }
1363 );
1364 self.client.send(request).await?;
1365 Ok(())
1366 }
1367
1368 #[instrument(skip_all)]
1377 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1378 let request = assign!(
1379 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1380 { reason: reason.map(ToOwned::to_owned) }
1381 );
1382 self.client.send(request).await?;
1383 Ok(())
1384 }
1385
1386 #[instrument(skip_all)]
1392 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1393 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1394 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1395 self.client.send(request).await?;
1396
1397 self.mark_members_missing();
1401
1402 Ok(())
1403 }
1404
1405 #[instrument(skip_all)]
1411 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1412 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1413 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1414 self.client.send(request).await?;
1415
1416 self.mark_members_missing();
1420
1421 Ok(())
1422 }
1423
1424 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1459 self.ensure_room_joined()?;
1460
1461 let send = if let Some(typing_time) =
1464 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1465 {
1466 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1467 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1471 } else {
1472 !typing
1474 }
1475 } else {
1476 typing
1479 };
1480
1481 if send {
1482 self.send_typing_notice(typing).await?;
1483 }
1484
1485 Ok(())
1486 }
1487
1488 #[instrument(name = "typing_notice", skip(self))]
1489 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1490 let typing = if typing {
1491 self.client
1492 .inner
1493 .typing_notice_times
1494 .write()
1495 .unwrap()
1496 .insert(self.room_id().to_owned(), Instant::now());
1497 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1498 } else {
1499 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1500 Typing::No
1501 };
1502
1503 let request = create_typing_event::v3::Request::new(
1504 self.own_user_id().to_owned(),
1505 self.room_id().to_owned(),
1506 typing,
1507 );
1508
1509 self.client.send(request).await?;
1510
1511 Ok(())
1512 }
1513
1514 #[instrument(skip_all)]
1528 pub async fn send_single_receipt(
1529 &self,
1530 receipt_type: create_receipt::v3::ReceiptType,
1531 thread: ReceiptThread,
1532 event_id: OwnedEventId,
1533 ) -> Result<()> {
1534 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1537
1538 self.client
1539 .inner
1540 .locks
1541 .read_receipt_deduplicated_handler
1542 .run((request_key, event_id.clone()), async {
1543 let mut request = create_receipt::v3::Request::new(
1544 self.room_id().to_owned(),
1545 receipt_type,
1546 event_id,
1547 );
1548 request.thread = thread;
1549
1550 self.client.send(request).await?;
1551 Ok(())
1552 })
1553 .await
1554 }
1555
1556 #[instrument(skip_all)]
1564 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
1565 if receipts.is_empty() {
1566 return Ok(());
1567 }
1568
1569 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
1570 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
1571 fully_read,
1572 read_receipt: public_read_receipt,
1573 private_read_receipt,
1574 });
1575
1576 self.client.send(request).await?;
1577 Ok(())
1578 }
1579
1580 #[instrument(skip_all)]
1612 pub async fn enable_encryption(&self) -> Result<()> {
1613 use ruma::{
1614 events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm,
1615 };
1616 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
1617
1618 if !self.is_encrypted().await? {
1619 let content =
1620 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
1621 self.send_state_event(content).await?;
1622
1623 _ = timeout(self.client.inner.sync_beat.listen(), SYNC_WAIT_TIME).await;
1627
1628 let _sync_lock = self.client.base_client().sync_lock().lock().await;
1633 if !self.inner.is_encrypted() {
1634 debug!("still not marked as encrypted, marking encryption state as missing");
1635
1636 let mut room_info = self.clone_info();
1637 room_info.mark_encryption_state_missing();
1638 let mut changes = StateChanges::default();
1639 changes.add_room(room_info.clone());
1640
1641 self.client.store().save_changes(&changes).await?;
1642 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1643 } else {
1644 debug!("room successfully marked as encrypted");
1645 }
1646 }
1647
1648 Ok(())
1649 }
1650
1651 #[cfg(feature = "e2e-encryption")]
1660 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
1661 async fn preshare_room_key(&self) -> Result<()> {
1662 self.ensure_room_joined()?;
1663
1664 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
1666 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
1667
1668 self.client
1669 .locks()
1670 .group_session_deduplicated_handler
1671 .run(self.room_id().to_owned(), async move {
1672 {
1673 let members = self
1674 .client
1675 .store()
1676 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
1677 .await?;
1678 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
1679 };
1680
1681 let response = self.share_room_key().await;
1682
1683 if let Err(r) = response {
1687 let machine = self.client.olm_machine().await;
1688 if let Some(machine) = machine.as_ref() {
1689 machine.discard_room_key(self.room_id()).await?;
1690 }
1691 return Err(r);
1692 }
1693
1694 Ok(())
1695 })
1696 .await
1697 }
1698
1699 #[cfg(feature = "e2e-encryption")]
1705 #[instrument(skip_all)]
1706 async fn share_room_key(&self) -> Result<()> {
1707 self.ensure_room_joined()?;
1708
1709 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
1710
1711 for request in requests {
1712 let response = self.client.send_to_device(&request).await?;
1713 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
1714 }
1715
1716 Ok(())
1717 }
1718
1719 #[instrument(skip_all)]
1728 pub async fn sync_up(&self) {
1729 while !self.is_synced() && self.state() == RoomState::Joined {
1730 let wait_for_beat = self.client.inner.sync_beat.listen();
1731 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
1733 }
1734 }
1735
1736 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
1806 SendMessageLikeEvent::new(self, content)
1807 }
1808
1809 #[cfg(feature = "e2e-encryption")]
1811 async fn query_keys_for_untracked_users(&self) -> Result<()> {
1812 let olm = self.client.olm_machine().await;
1813 let olm = olm.as_ref().expect("Olm machine wasn't started");
1814
1815 let members =
1816 self.client.store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
1817
1818 let tracked: HashMap<_, _> = olm
1819 .store()
1820 .load_tracked_users()
1821 .await?
1822 .into_iter()
1823 .map(|tracked| (tracked.user_id, tracked.dirty))
1824 .collect();
1825
1826 let members_with_unknown_devices =
1829 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
1830
1831 let (req_id, request) =
1832 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
1833
1834 if !request.device_keys.is_empty() {
1835 self.client.keys_query(&req_id, request.device_keys).await?;
1836 }
1837
1838 Ok(())
1839 }
1840
1841 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
1885 pub fn send_raw<'a>(
1886 &'a self,
1887 event_type: &'a str,
1888 content: impl IntoRawMessageLikeEventContent,
1889 ) -> SendRawMessageLikeEvent<'a> {
1890 SendRawMessageLikeEvent::new(self, event_type, content)
1893 }
1894
1895 #[instrument(skip_all)]
1943 pub fn send_attachment<'a>(
1944 &'a self,
1945 filename: impl Into<String>,
1946 content_type: &'a Mime,
1947 data: Vec<u8>,
1948 config: AttachmentConfig,
1949 ) -> SendAttachment<'a> {
1950 SendAttachment::new(self, filename.into(), content_type, data, config)
1951 }
1952
1953 #[instrument(skip_all)]
1981 pub(super) async fn prepare_and_send_attachment<'a>(
1982 &'a self,
1983 filename: String,
1984 content_type: &'a Mime,
1985 data: Vec<u8>,
1986 mut config: AttachmentConfig,
1987 send_progress: SharedObservable<TransmissionProgress>,
1988 store_in_cache: bool,
1989 ) -> Result<send_message_event::v3::Response> {
1990 self.ensure_room_joined()?;
1991
1992 let txn_id = config.txn_id.take();
1993 let mentions = config.mentions.take();
1994
1995 let thumbnail = config.thumbnail.take();
1996
1997 let thumbnail_cache_info = if store_in_cache {
1999 thumbnail
2000 .as_ref()
2001 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2002 } else {
2003 None
2004 };
2005
2006 #[cfg(feature = "e2e-encryption")]
2007 let (media_source, thumbnail) = if self.is_encrypted().await? {
2008 self.client
2009 .upload_encrypted_media_and_thumbnail(content_type, &data, thumbnail, send_progress)
2010 .await?
2011 } else {
2012 self.client
2013 .media()
2014 .upload_plain_media_and_thumbnail(
2015 content_type,
2016 data.clone(),
2019 thumbnail,
2020 send_progress,
2021 )
2022 .await?
2023 };
2024
2025 #[cfg(not(feature = "e2e-encryption"))]
2026 let (media_source, thumbnail) = self
2027 .client
2028 .media()
2029 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2030 .await?;
2031
2032 if store_in_cache {
2033 let cache_store_lock_guard = self.client.event_cache_store().lock().await?;
2034
2035 debug!("caching the media");
2039 let request =
2040 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2041
2042 if let Err(err) = cache_store_lock_guard
2043 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2044 .await
2045 {
2046 warn!("unable to cache the media after uploading it: {err}");
2047 }
2048
2049 if let Some(((data, height, width), source)) =
2050 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2051 {
2052 debug!("caching the thumbnail");
2053
2054 let request = MediaRequestParameters {
2055 source: source.clone(),
2056 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2057 };
2058
2059 if let Err(err) = cache_store_lock_guard
2060 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2061 .await
2062 {
2063 warn!("unable to cache the media after uploading it: {err}");
2064 }
2065 }
2066 }
2067
2068 let content = Self::make_attachment_event(
2069 self.make_attachment_type(
2070 content_type,
2071 filename,
2072 media_source,
2073 config.caption,
2074 config.formatted_caption,
2075 config.info,
2076 thumbnail,
2077 ),
2078 mentions,
2079 );
2080
2081 let mut fut = self.send(content);
2082 if let Some(txn_id) = txn_id {
2083 fut = fut.with_transaction_id(txn_id);
2084 }
2085 fut.await
2086 }
2087
2088 #[allow(clippy::too_many_arguments)]
2091 pub(crate) fn make_attachment_type(
2092 &self,
2093 content_type: &Mime,
2094 filename: String,
2095 source: MediaSource,
2096 caption: Option<String>,
2097 formatted_caption: Option<FormattedBody>,
2098 info: Option<AttachmentInfo>,
2099 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2100 ) -> MessageType {
2101 let (body, filename) = match caption {
2105 Some(caption) => (caption, Some(filename)),
2106 None => (filename, None),
2107 };
2108
2109 let (thumbnail_source, thumbnail_info) = thumbnail.unzip();
2110
2111 match content_type.type_() {
2112 mime::IMAGE => {
2113 let info = assign!(info.map(ImageInfo::from).unwrap_or_default(), {
2114 mimetype: Some(content_type.as_ref().to_owned()),
2115 thumbnail_source,
2116 thumbnail_info
2117 });
2118 let content = assign!(ImageMessageEventContent::new(body, source), {
2119 info: Some(Box::new(info)),
2120 formatted: formatted_caption,
2121 filename
2122 });
2123 MessageType::Image(content)
2124 }
2125
2126 mime::AUDIO => {
2127 let mut content = assign!(AudioMessageEventContent::new(body, source), {
2128 formatted: formatted_caption,
2129 filename
2130 });
2131
2132 if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) =
2133 &info
2134 {
2135 if let Some(duration) = audio_info.duration {
2136 let waveform = waveform_vec.iter().map(|v| (*v).into()).collect();
2137 content.audio =
2138 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
2139 }
2140 content.voice = Some(UnstableVoiceContentBlock::new());
2141 }
2142
2143 let mut audio_info = info.map(AudioInfo::from).unwrap_or_default();
2144 audio_info.mimetype = Some(content_type.as_ref().to_owned());
2145 let content = content.info(Box::new(audio_info));
2146
2147 MessageType::Audio(content)
2148 }
2149
2150 mime::VIDEO => {
2151 let info = assign!(info.map(VideoInfo::from).unwrap_or_default(), {
2152 mimetype: Some(content_type.as_ref().to_owned()),
2153 thumbnail_source,
2154 thumbnail_info
2155 });
2156 let content = assign!(VideoMessageEventContent::new(body, source), {
2157 info: Some(Box::new(info)),
2158 formatted: formatted_caption,
2159 filename
2160 });
2161 MessageType::Video(content)
2162 }
2163
2164 _ => {
2165 let info = assign!(info.map(FileInfo::from).unwrap_or_default(), {
2166 mimetype: Some(content_type.as_ref().to_owned()),
2167 thumbnail_source,
2168 thumbnail_info
2169 });
2170 let content = assign!(FileMessageEventContent::new(body, source), {
2171 info: Some(Box::new(info)),
2172 formatted: formatted_caption,
2173 filename,
2174 });
2175 MessageType::File(content)
2176 }
2177 }
2178 }
2179
2180 pub(crate) fn make_attachment_event(
2183 msg_type: MessageType,
2184 mentions: Option<Mentions>,
2185 ) -> RoomMessageEventContent {
2186 let mut content = RoomMessageEventContent::new(msg_type);
2187 if let Some(mentions) = mentions {
2188 content = content.add_mentions(mentions);
2189 }
2190 content
2191 }
2192
2193 pub async fn update_power_levels(
2202 &self,
2203 updates: Vec<(&UserId, Int)>,
2204 ) -> Result<send_state_event::v3::Response> {
2205 let mut power_levels = self.power_levels().await?;
2206
2207 for (user_id, new_level) in updates {
2208 if new_level == power_levels.users_default {
2209 power_levels.users.remove(user_id);
2210 } else {
2211 power_levels.users.insert(user_id.to_owned(), new_level);
2212 }
2213 }
2214
2215 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await
2216 }
2217
2218 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2223 let mut power_levels = self.power_levels().await?;
2224 power_levels.apply(changes)?;
2225 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await?;
2226 Ok(())
2227 }
2228
2229 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2233 let default_power_levels = RoomPowerLevels::from(RoomPowerLevelsEventContent::new());
2234 let changes = RoomPowerLevelChanges::from(default_power_levels);
2235 self.apply_power_level_changes(changes).await?;
2236 Ok(self.power_levels().await?)
2237 }
2238
2239 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2244 let power_level = self.get_user_power_level(user_id).await?;
2245 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2246 }
2247
2248 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<i64> {
2253 let event = self.power_levels().await?;
2254 Ok(event.for_user(user_id).into())
2255 }
2256
2257 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2260 let power_levels = self.power_levels().await.ok();
2261 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2262 if let Some(power_levels) = power_levels {
2263 for (id, level) in power_levels.users.into_iter() {
2264 user_power_levels.insert(id, level.into());
2265 }
2266 }
2267 user_power_levels
2268 }
2269
2270 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2272 self.send_state_event(RoomNameEventContent::new(name)).await
2273 }
2274
2275 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2277 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2278 }
2279
2280 pub async fn set_avatar_url(
2286 &self,
2287 url: &MxcUri,
2288 info: Option<avatar::ImageInfo>,
2289 ) -> Result<send_state_event::v3::Response> {
2290 self.ensure_room_joined()?;
2291
2292 let mut room_avatar_event = RoomAvatarEventContent::new();
2293 room_avatar_event.url = Some(url.to_owned());
2294 room_avatar_event.info = info.map(Box::new);
2295
2296 self.send_state_event(room_avatar_event).await
2297 }
2298
2299 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2301 self.send_state_event(RoomAvatarEventContent::new()).await
2302 }
2303
2304 pub async fn upload_avatar(
2312 &self,
2313 mime: &Mime,
2314 data: Vec<u8>,
2315 info: Option<avatar::ImageInfo>,
2316 ) -> Result<send_state_event::v3::Response> {
2317 self.ensure_room_joined()?;
2318
2319 let upload_response = self.client.media().upload(mime, data, None).await?;
2320 let mut info = info.unwrap_or_default();
2321 info.blurhash = upload_response.blurhash;
2322 info.mimetype = Some(mime.to_string());
2323
2324 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2325 }
2326
2327 #[instrument(skip_all)]
2371 pub async fn send_state_event(
2372 &self,
2373 content: impl StateEventContent<StateKey = EmptyStateKey>,
2374 ) -> Result<send_state_event::v3::Response> {
2375 self.send_state_event_for_key(&EmptyStateKey, content).await
2376 }
2377
2378 pub async fn send_state_event_for_key<C, K>(
2419 &self,
2420 state_key: &K,
2421 content: C,
2422 ) -> Result<send_state_event::v3::Response>
2423 where
2424 C: StateEventContent,
2425 C::StateKey: Borrow<K>,
2426 K: AsRef<str> + ?Sized,
2427 {
2428 self.ensure_room_joined()?;
2429 let request =
2430 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
2431 let response = self.client.send(request).await?;
2432 Ok(response)
2433 }
2434
2435 #[instrument(skip_all)]
2470 pub async fn send_state_event_raw(
2471 &self,
2472 event_type: &str,
2473 state_key: &str,
2474 content: impl IntoRawStateEventContent,
2475 ) -> Result<send_state_event::v3::Response> {
2476 self.ensure_room_joined()?;
2477
2478 let request = send_state_event::v3::Request::new_raw(
2479 self.room_id().to_owned(),
2480 event_type.into(),
2481 state_key.to_owned(),
2482 content.into_raw_state_event_content(),
2483 );
2484
2485 Ok(self.client.send(request).await?)
2486 }
2487
2488 #[instrument(skip_all)]
2523 pub async fn redact(
2524 &self,
2525 event_id: &EventId,
2526 reason: Option<&str>,
2527 txn_id: Option<OwnedTransactionId>,
2528 ) -> HttpResult<redact_event::v3::Response> {
2529 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
2530 let request = assign!(
2531 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
2532 { reason: reason.map(ToOwned::to_owned) }
2533 );
2534
2535 self.client.send(request).await
2536 }
2537
2538 pub async fn can_user_redact_own(&self, user_id: &UserId) -> Result<bool> {
2543 Ok(self.power_levels().await?.user_can_redact_own_event(user_id))
2544 }
2545
2546 pub async fn can_user_redact_other(&self, user_id: &UserId) -> Result<bool> {
2551 Ok(self.power_levels().await?.user_can_redact_event_of_other(user_id))
2552 }
2553
2554 pub async fn can_user_ban(&self, user_id: &UserId) -> Result<bool> {
2559 Ok(self.power_levels().await?.user_can_ban(user_id))
2560 }
2561
2562 pub async fn can_user_invite(&self, user_id: &UserId) -> Result<bool> {
2567 Ok(self.power_levels().await?.user_can_invite(user_id))
2568 }
2569
2570 pub async fn can_user_kick(&self, user_id: &UserId) -> Result<bool> {
2575 Ok(self.power_levels().await?.user_can_kick(user_id))
2576 }
2577
2578 pub async fn can_user_send_state(
2583 &self,
2584 user_id: &UserId,
2585 state_event: StateEventType,
2586 ) -> Result<bool> {
2587 Ok(self.power_levels().await?.user_can_send_state(user_id, state_event))
2588 }
2589
2590 pub async fn can_user_send_message(
2595 &self,
2596 user_id: &UserId,
2597 message: MessageLikeEventType,
2598 ) -> Result<bool> {
2599 Ok(self.power_levels().await?.user_can_send_message(user_id, message))
2600 }
2601
2602 pub async fn can_user_pin_unpin(&self, user_id: &UserId) -> Result<bool> {
2607 Ok(self
2608 .power_levels()
2609 .await?
2610 .user_can_send_state(user_id, StateEventType::RoomPinnedEvents))
2611 }
2612
2613 pub async fn can_user_trigger_room_notification(&self, user_id: &UserId) -> Result<bool> {
2618 Ok(self.power_levels().await?.user_can_trigger_room_notification(user_id))
2619 }
2620
2621 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
2630 let acl_ev = self
2631 .get_state_event_static::<RoomServerAclEventContent>()
2632 .await?
2633 .and_then(|ev| ev.deserialize().ok());
2634 let acl = acl_ev.as_ref().and_then(|ev| match ev {
2635 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
2636 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
2637 });
2638
2639 let members: Vec<_> = self
2643 .members_no_sync(RoomMemberships::JOIN)
2644 .await?
2645 .into_iter()
2646 .filter(|member| {
2647 let server = member.user_id().server_name();
2648 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
2649 })
2650 .collect();
2651
2652 let max = members
2655 .iter()
2656 .max_by_key(|member| member.power_level())
2657 .filter(|max| max.power_level() >= 50)
2658 .map(|member| member.user_id().server_name());
2659
2660 let servers = members
2662 .iter()
2663 .map(|member| member.user_id().server_name())
2664 .filter(|server| max.filter(|max| max == server).is_none())
2665 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
2666 *servers.entry(server).or_default() += 1;
2667 servers
2668 });
2669 let mut servers: Vec<_> = servers.into_iter().collect();
2670 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
2671
2672 Ok(max
2673 .into_iter()
2674 .chain(servers.into_iter().map(|(name, _)| name))
2675 .take(3)
2676 .map(ToOwned::to_owned)
2677 .collect())
2678 }
2679
2680 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
2687 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2688 return Ok(alias.matrix_to_uri());
2689 }
2690
2691 let via = self.route().await?;
2692 Ok(self.room_id().matrix_to_uri_via(via))
2693 }
2694
2695 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
2706 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2707 return Ok(alias.matrix_uri(join));
2708 }
2709
2710 let via = self.route().await?;
2711 Ok(self.room_id().matrix_uri_via(via, join))
2712 }
2713
2714 pub async fn matrix_to_event_permalink(
2728 &self,
2729 event_id: impl Into<OwnedEventId>,
2730 ) -> Result<MatrixToUri> {
2731 let via = self.route().await?;
2734 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
2735 }
2736
2737 pub async fn matrix_event_permalink(
2751 &self,
2752 event_id: impl Into<OwnedEventId>,
2753 ) -> Result<MatrixUri> {
2754 let via = self.route().await?;
2757 Ok(self.room_id().matrix_event_uri_via(event_id, via))
2758 }
2759
2760 pub async fn load_user_receipt(
2773 &self,
2774 receipt_type: ReceiptType,
2775 thread: ReceiptThread,
2776 user_id: &UserId,
2777 ) -> Result<Option<(OwnedEventId, Receipt)>> {
2778 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
2779 }
2780
2781 pub async fn load_event_receipts(
2794 &self,
2795 receipt_type: ReceiptType,
2796 thread: ReceiptThread,
2797 event_id: &EventId,
2798 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
2799 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
2800 }
2801
2802 pub async fn push_context(&self) -> Result<Option<PushConditionRoomCtx>> {
2807 let room_id = self.room_id();
2808 let user_id = self.own_user_id();
2809 let room_info = self.clone_info();
2810 let member_count = room_info.active_members_count();
2811
2812 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
2813 member.name().to_owned()
2814 } else {
2815 return Ok(None);
2816 };
2817
2818 let power_levels = self
2819 .get_state_event_static::<RoomPowerLevelsEventContent>()
2820 .await?
2821 .and_then(|e| e.deserialize().ok())
2822 .map(|e| e.power_levels().into());
2823
2824 Ok(Some(PushConditionRoomCtx {
2825 user_id: user_id.to_owned(),
2826 room_id: room_id.to_owned(),
2827 member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
2828 user_display_name,
2829 power_levels,
2830 }))
2831 }
2832
2833 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
2838 let Some(push_context) = self.push_context().await? else {
2839 debug!("Could not aggregate push context");
2840 return Ok(None);
2841 };
2842
2843 let push_rules = self.client().account().push_rules().await?;
2844
2845 Ok(Some(push_rules.get_actions(event, &push_context).to_owned()))
2846 }
2847
2848 pub async fn invite_details(&self) -> Result<Invite> {
2851 let state = self.state();
2852 if state != RoomState::Invited {
2853 return Err(Error::WrongRoomState(WrongRoomState::new("Invited", state)));
2854 }
2855
2856 let invitee = self
2857 .get_member_no_sync(self.own_user_id())
2858 .await?
2859 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
2860 let event = invitee.event();
2861 let inviter_id = event.sender();
2862 let inviter = self.get_member_no_sync(inviter_id).await?;
2863 Ok(Invite { invitee, inviter })
2864 }
2865
2866 pub async fn own_membership_details(&self) -> Result<(RoomMember, Option<RoomMember>)> {
2874 let Some(own_member) = self.get_member_no_sync(self.own_user_id()).await? else {
2875 return Err(Error::InsufficientData);
2876 };
2877
2878 let sender_member =
2879 if let Some(member) = self.get_member_no_sync(own_member.event().sender()).await? {
2880 Some(member)
2882 } else if self.are_members_synced() {
2883 None
2885 } else if self.sync_members().await.is_ok() {
2886 self.get_member_no_sync(own_member.event().sender()).await?
2888 } else {
2889 None
2890 };
2891
2892 Ok((own_member, sender_member))
2893 }
2894
2895 pub async fn forget(&self) -> Result<()> {
2901 let state = self.state();
2902 match state {
2903 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
2904 return Err(Error::WrongRoomState(WrongRoomState::new("Left / Banned", state)));
2905 }
2906 RoomState::Left | RoomState::Banned => {}
2907 }
2908
2909 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
2910 let _response = self.client.send(request).await?;
2911
2912 if self.inner.direct_targets_length() != 0 {
2914 if let Err(e) = self.set_is_direct(false).await {
2915 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
2918 }
2919 }
2920
2921 self.client.base_client().forget_room(self.inner.room_id()).await?;
2922
2923 Ok(())
2924 }
2925
2926 fn ensure_room_joined(&self) -> Result<()> {
2927 let state = self.state();
2928 if state == RoomState::Joined {
2929 Ok(())
2930 } else {
2931 Err(Error::WrongRoomState(WrongRoomState::new("Joined", state)))
2932 }
2933 }
2934
2935 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
2937 if !matches!(self.state(), RoomState::Joined) {
2938 return None;
2939 }
2940
2941 let notification_settings = self.client().notification_settings().await;
2942
2943 let notification_mode =
2945 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
2946
2947 if notification_mode.is_some() {
2948 notification_mode
2949 } else if let Ok(is_encrypted) = self.is_encrypted().await {
2950 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
2955 let default_mode = notification_settings
2956 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
2957 .await;
2958 Some(default_mode)
2959 } else {
2960 None
2961 }
2962 }
2963
2964 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
2975 if !matches!(self.state(), RoomState::Joined) {
2976 return None;
2977 }
2978
2979 let notification_settings = self.client().notification_settings().await;
2980
2981 let mode =
2983 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
2984
2985 if let Some(mode) = mode {
2986 self.update_cached_user_defined_notification_mode(mode);
2987 }
2988
2989 mode
2990 }
2991
2992 pub async fn report_content(
3005 &self,
3006 event_id: OwnedEventId,
3007 score: Option<ReportedContentScore>,
3008 reason: Option<String>,
3009 ) -> Result<report_content::v3::Response> {
3010 let state = self.state();
3011 if state != RoomState::Joined {
3012 return Err(Error::WrongRoomState(WrongRoomState::new("Joined", state)));
3013 }
3014
3015 let request = report_content::v3::Request::new(
3016 self.inner.room_id().to_owned(),
3017 event_id,
3018 score.map(Into::into),
3019 reason,
3020 );
3021 Ok(self.client.send(request).await?)
3022 }
3023
3024 pub async fn report_room(&self, reason: Option<String>) -> Result<report_room::v3::Response> {
3035 let mut request = report_room::v3::Request::new(self.inner.room_id().to_owned());
3036 request.reason = reason;
3037
3038 Ok(self.client.send(request).await?)
3039 }
3040
3041 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3044 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3045
3046 let content = UnstableMarkedUnreadEventContent::from(MarkedUnreadEventContent::new(unread));
3047
3048 let request = set_room_account_data::v3::Request::new(
3049 user_id.to_owned(),
3050 self.inner.room_id().to_owned(),
3051 &content,
3052 )?;
3053
3054 self.client.send(request).await?;
3055 Ok(())
3056 }
3057
3058 pub async fn event_cache(
3061 &self,
3062 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3063 self.client.event_cache().for_room(self.room_id()).await
3064 }
3065
3066 pub async fn send_call_notification_if_needed(&self) -> Result<()> {
3076 if self.has_active_room_call() {
3077 return Ok(());
3078 }
3079
3080 if !self.can_user_trigger_room_notification(self.own_user_id()).await? {
3081 return Ok(());
3082 }
3083
3084 self.send_call_notification(
3085 self.room_id().to_string().to_owned(),
3086 ApplicationType::Call,
3087 if self.is_direct().await.unwrap_or(false) {
3088 NotifyType::Ring
3089 } else {
3090 NotifyType::Notify
3091 },
3092 Mentions::with_room_mention(),
3093 )
3094 .await?;
3095
3096 Ok(())
3097 }
3098
3099 pub(crate) async fn get_user_beacon_info(
3106 &self,
3107 user_id: &UserId,
3108 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3109 let raw_event = self
3110 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3111 .await?
3112 .ok_or(BeaconError::NotFound)?;
3113
3114 match raw_event.deserialize()? {
3115 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3116 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3117 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3118 }
3119 }
3120
3121 pub async fn start_live_location_share(
3134 &self,
3135 duration_millis: u64,
3136 description: Option<String>,
3137 ) -> Result<send_state_event::v3::Response> {
3138 self.ensure_room_joined()?;
3139
3140 self.send_state_event_for_key(
3141 self.own_user_id(),
3142 BeaconInfoEventContent::new(
3143 description,
3144 Duration::from_millis(duration_millis),
3145 true,
3146 None,
3147 ),
3148 )
3149 .await
3150 }
3151
3152 pub async fn stop_live_location_share(
3159 &self,
3160 ) -> Result<send_state_event::v3::Response, BeaconError> {
3161 self.ensure_room_joined()?;
3162
3163 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3164 beacon_info_event.content.stop();
3165 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3166 }
3167
3168 pub async fn send_location_beacon(
3180 &self,
3181 geo_uri: String,
3182 ) -> Result<send_message_event::v3::Response, BeaconError> {
3183 self.ensure_room_joined()?;
3184
3185 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3186
3187 if beacon_info_event.content.is_live() {
3188 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3189 Ok(self.send(content).await?)
3190 } else {
3191 Err(BeaconError::NotLive)
3192 }
3193 }
3194
3195 pub async fn send_call_notification(
3207 &self,
3208 call_id: String,
3209 application: ApplicationType,
3210 notify_type: NotifyType,
3211 mentions: Mentions,
3212 ) -> Result<()> {
3213 let call_notify_event_content =
3214 CallNotifyEventContent::new(call_id, application, notify_type, mentions);
3215 self.send(call_notify_event_content).await?;
3216 Ok(())
3217 }
3218
3219 pub async fn save_composer_draft(&self, draft: ComposerDraft) -> Result<()> {
3222 self.client
3223 .store()
3224 .set_kv_data(
3225 StateStoreDataKey::ComposerDraft(self.room_id()),
3226 StateStoreDataValue::ComposerDraft(draft),
3227 )
3228 .await?;
3229 Ok(())
3230 }
3231
3232 pub async fn load_composer_draft(&self) -> Result<Option<ComposerDraft>> {
3234 let data = self
3235 .client
3236 .store()
3237 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id()))
3238 .await?;
3239 Ok(data.and_then(|d| d.into_composer_draft()))
3240 }
3241
3242 pub async fn clear_composer_draft(&self) -> Result<()> {
3244 self.client
3245 .store()
3246 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id()))
3247 .await?;
3248 Ok(())
3249 }
3250
3251 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3254 let response = self
3255 .client
3256 .send(get_state_events_for_key::v3::Request::new(
3257 self.room_id().to_owned(),
3258 StateEventType::RoomPinnedEvents,
3259 "".to_owned(),
3260 ))
3261 .await;
3262
3263 match response {
3264 Ok(response) => {
3265 Ok(Some(response.content.deserialize_as::<RoomPinnedEventsEventContent>()?.pinned))
3266 }
3267 Err(http_error) => match http_error.as_client_api_error() {
3268 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3269 _ => Err(http_error.into()),
3270 },
3271 }
3272 }
3273
3274 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3282 ObservableLiveLocation::new(&self.client, self.room_id())
3283 }
3284
3285 pub async fn subscribe_to_knock_requests(
3299 &self,
3300 ) -> Result<(impl Stream<Item = Vec<KnockRequest>>, JoinHandle<()>)> {
3301 let this = Arc::new(self.clone());
3302
3303 let room_member_events_observer =
3304 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3305
3306 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3307 let mut seen_request_ids_stream = self
3308 .seen_knock_request_ids_map
3309 .subscribe()
3310 .await
3311 .map(|values| values.unwrap_or_default());
3312
3313 let mut room_info_stream = self.subscribe_info();
3314
3315 let clear_seen_ids_handle = spawn({
3318 let this = self.clone();
3319 async move {
3320 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3321 while member_updates_stream.recv().await.is_ok() {
3322 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3324 warn!("Failed to remove seen knock requests: {err}")
3325 }
3326 }
3327 }
3328 });
3329
3330 let combined_stream = stream! {
3331 match this.get_current_join_requests(¤t_seen_ids).await {
3333 Ok(initial_requests) => yield initial_requests,
3334 Err(err) => warn!("Failed to get initial requests to join: {err}")
3335 }
3336
3337 let mut requests_stream = room_member_events_observer.subscribe();
3338 let mut seen_ids = current_seen_ids.clone();
3339
3340 loop {
3341 tokio::select! {
3344 Some((event, _)) = requests_stream.next() => {
3345 if let Some(event) = event.as_original() {
3346 let emit = if event.prev_content().is_some() {
3348 matches!(event.membership_change(),
3349 MembershipChange::Banned |
3350 MembershipChange::Knocked |
3351 MembershipChange::KnockAccepted |
3352 MembershipChange::KnockDenied |
3353 MembershipChange::KnockRetracted
3354 )
3355 } else {
3356 true
3359 };
3360
3361 if emit {
3362 match this.get_current_join_requests(&seen_ids).await {
3363 Ok(requests) => yield requests,
3364 Err(err) => {
3365 warn!("Failed to get updated knock requests on new member event: {err}")
3366 }
3367 }
3368 }
3369 }
3370 }
3371
3372 Some(new_seen_ids) = seen_request_ids_stream.next() => {
3373 seen_ids = new_seen_ids;
3375
3376 match this.get_current_join_requests(&seen_ids).await {
3379 Ok(requests) => yield requests,
3380 Err(err) => {
3381 warn!("Failed to get updated knock requests on seen ids changed: {err}")
3382 }
3383 }
3384 }
3385
3386 Some(room_info) = room_info_stream.next() => {
3387 if !room_info.are_members_synced() {
3390 match this.get_current_join_requests(&seen_ids).await {
3391 Ok(requests) => yield requests,
3392 Err(err) => {
3393 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
3394 }
3395 }
3396 }
3397 }
3398 else => break,
3400 }
3401 }
3402 };
3403
3404 Ok((combined_stream, clear_seen_ids_handle))
3405 }
3406
3407 async fn get_current_join_requests(
3408 &self,
3409 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
3410 ) -> Result<Vec<KnockRequest>> {
3411 Ok(self
3412 .members(RoomMemberships::KNOCK)
3413 .await?
3414 .into_iter()
3415 .filter_map(|member| {
3416 let event_id = member.event().event_id()?;
3417 Some(KnockRequest::new(
3418 self,
3419 event_id,
3420 member.event().timestamp(),
3421 KnockRequestMemberInfo::from_member(&member),
3422 seen_request_ids.contains_key(event_id),
3423 ))
3424 })
3425 .collect())
3426 }
3427
3428 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
3430 RoomPrivacySettings::new(&self.inner, &self.client)
3431 }
3432}
3433
3434#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
3435impl RoomIdentityProvider for Room {
3436 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
3437 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
3438 }
3439
3440 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
3441 Box::pin(async {
3442 let members = self
3443 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
3444 .await
3445 .unwrap_or_else(|_| Default::default());
3446
3447 let mut ret: Vec<UserIdentity> = Vec::new();
3448 for member in members {
3449 if let Some(i) = self.user_identity(member.user_id()).await {
3450 ret.push(i);
3451 }
3452 }
3453 ret
3454 })
3455 }
3456
3457 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
3458 Box::pin(async {
3459 self.client
3460 .encryption()
3461 .get_user_identity(user_id)
3462 .await
3463 .unwrap_or(None)
3464 .map(|u| u.underlying_identity())
3465 })
3466 }
3467}
3468
3469#[derive(Clone)]
3472pub(crate) struct WeakRoom {
3473 client: WeakClient,
3474 room_id: OwnedRoomId,
3475}
3476
3477impl WeakRoom {
3478 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
3480 Self { client, room_id }
3481 }
3482
3483 pub fn get(&self) -> Option<Room> {
3485 self.client.get().and_then(|client| client.get_room(&self.room_id))
3486 }
3487
3488 pub fn room_id(&self) -> &RoomId {
3490 &self.room_id
3491 }
3492}
3493
3494#[derive(Debug, Clone)]
3496pub struct Invite {
3497 pub invitee: RoomMember,
3499 pub inviter: Option<RoomMember>,
3501}
3502
3503#[derive(Error, Debug)]
3504enum InvitationError {
3505 #[error("No membership event found")]
3506 EventMissing,
3507}
3508
3509#[derive(Debug, Clone, Default)]
3511#[non_exhaustive]
3512pub struct Receipts {
3513 pub fully_read: Option<OwnedEventId>,
3515 pub public_read_receipt: Option<OwnedEventId>,
3517 pub private_read_receipt: Option<OwnedEventId>,
3519}
3520
3521impl Receipts {
3522 pub fn new() -> Self {
3524 Self::default()
3525 }
3526
3527 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3536 self.fully_read = event_id.into();
3537 self
3538 }
3539
3540 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3546 self.public_read_receipt = event_id.into();
3547 self
3548 }
3549
3550 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3554 self.private_read_receipt = event_id.into();
3555 self
3556 }
3557
3558 pub fn is_empty(&self) -> bool {
3560 self.fully_read.is_none()
3561 && self.public_read_receipt.is_none()
3562 && self.private_read_receipt.is_none()
3563 }
3564}
3565
3566#[derive(Debug)]
3569pub enum ParentSpace {
3570 Reciprocal(Room),
3573 WithPowerlevel(Room),
3578 Illegitimate(Room),
3581 Unverifiable(OwnedRoomId),
3584}
3585
3586#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
3590pub struct ReportedContentScore(i8);
3591
3592impl ReportedContentScore {
3593 pub const MIN: Self = Self(-100);
3597
3598 pub const MAX: Self = Self(0);
3602
3603 pub fn new(value: i8) -> Option<Self> {
3612 value.try_into().ok()
3613 }
3614
3615 pub fn new_saturating(value: i8) -> Self {
3621 if value > Self::MAX {
3622 Self::MAX
3623 } else if value < Self::MIN {
3624 Self::MIN
3625 } else {
3626 Self(value)
3627 }
3628 }
3629
3630 pub fn value(&self) -> i8 {
3632 self.0
3633 }
3634}
3635
3636impl PartialEq<i8> for ReportedContentScore {
3637 fn eq(&self, other: &i8) -> bool {
3638 self.0.eq(other)
3639 }
3640}
3641
3642impl PartialEq<ReportedContentScore> for i8 {
3643 fn eq(&self, other: &ReportedContentScore) -> bool {
3644 self.eq(&other.0)
3645 }
3646}
3647
3648impl PartialOrd<i8> for ReportedContentScore {
3649 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
3650 self.0.partial_cmp(other)
3651 }
3652}
3653
3654impl PartialOrd<ReportedContentScore> for i8 {
3655 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
3656 self.partial_cmp(&other.0)
3657 }
3658}
3659
3660impl From<ReportedContentScore> for Int {
3661 fn from(value: ReportedContentScore) -> Self {
3662 value.0.into()
3663 }
3664}
3665
3666impl TryFrom<i8> for ReportedContentScore {
3667 type Error = TryFromReportedContentScoreError;
3668
3669 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
3670 if value > Self::MAX || value < Self::MIN {
3671 Err(TryFromReportedContentScoreError(()))
3672 } else {
3673 Ok(Self(value))
3674 }
3675 }
3676}
3677
3678impl TryFrom<i16> for ReportedContentScore {
3679 type Error = TryFromReportedContentScoreError;
3680
3681 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
3682 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3683 value.try_into()
3684 }
3685}
3686
3687impl TryFrom<i32> for ReportedContentScore {
3688 type Error = TryFromReportedContentScoreError;
3689
3690 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
3691 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3692 value.try_into()
3693 }
3694}
3695
3696impl TryFrom<i64> for ReportedContentScore {
3697 type Error = TryFromReportedContentScoreError;
3698
3699 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
3700 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3701 value.try_into()
3702 }
3703}
3704
3705impl TryFrom<Int> for ReportedContentScore {
3706 type Error = TryFromReportedContentScoreError;
3707
3708 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
3709 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3710 value.try_into()
3711 }
3712}
3713
3714#[derive(Debug, Clone, Error)]
3717#[error("out of range conversion attempted")]
3718pub struct TryFromReportedContentScoreError(());
3719
3720#[cfg(all(test, not(target_arch = "wasm32")))]
3721mod tests {
3722 use assert_matches2::assert_matches;
3723 use matrix_sdk_base::{store::ComposerDraftType, ComposerDraft};
3724 use matrix_sdk_test::{
3725 async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent,
3726 SyncResponseBuilder,
3727 };
3728 use ruma::{event_id, events::room::member::MembershipState, int, room_id, user_id};
3729 use wiremock::{
3730 matchers::{header, method, path_regex},
3731 Mock, MockServer, ResponseTemplate,
3732 };
3733
3734 use super::ReportedContentScore;
3735 use crate::{
3736 config::RequestConfig,
3737 test_utils::{client::mock_matrix_session, logged_in_client, mocks::MatrixMockServer},
3738 Client,
3739 };
3740
3741 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
3742 #[async_test]
3743 async fn test_cache_invalidation_while_encrypt() {
3744 use matrix_sdk_test::{message_like_event_content, DEFAULT_TEST_ROOM_ID};
3745
3746 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
3747 let session = mock_matrix_session();
3748
3749 let client = Client::builder()
3750 .homeserver_url("http://localhost:1234")
3751 .request_config(RequestConfig::new().disable_retry())
3752 .sqlite_store(&sqlite_path, None)
3753 .build()
3754 .await
3755 .unwrap();
3756 client.matrix_auth().restore_session(session.clone()).await.unwrap();
3757
3758 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
3759
3760 let server = MockServer::start().await;
3762 {
3763 Mock::given(method("GET"))
3764 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
3765 .and(header("authorization", "Bearer 1234"))
3766 .respond_with(
3767 ResponseTemplate::new(200)
3768 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
3769 )
3770 .mount(&server)
3771 .await;
3772 let response = SyncResponseBuilder::default()
3773 .add_joined_room(
3774 JoinedRoomBuilder::default()
3775 .add_state_event(StateTestEvent::Member)
3776 .add_state_event(StateTestEvent::PowerLevels)
3777 .add_state_event(StateTestEvent::Encryption),
3778 )
3779 .build_sync_response();
3780 client.base_client().receive_sync_response(response).await.unwrap();
3781 }
3782
3783 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
3784
3785 room.preshare_room_key().await.unwrap();
3787
3788 {
3791 let client = Client::builder()
3792 .homeserver_url("http://localhost:1234")
3793 .request_config(RequestConfig::new().disable_retry())
3794 .sqlite_store(&sqlite_path, None)
3795 .build()
3796 .await
3797 .unwrap();
3798 client.matrix_auth().restore_session(session.clone()).await.unwrap();
3799 client
3800 .encryption()
3801 .enable_cross_process_store_lock("client2".to_owned())
3802 .await
3803 .unwrap();
3804
3805 let guard = client.encryption().spin_lock_store(None).await.unwrap();
3806 assert!(guard.is_some());
3807 }
3808
3809 let guard = client.encryption().spin_lock_store(None).await.unwrap();
3811 assert!(guard.is_some());
3812
3813 let olm = client.olm_machine().await;
3815 let olm = olm.as_ref().expect("Olm machine wasn't started");
3816
3817 let _encrypted_content = olm
3820 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
3821 .await
3822 .unwrap();
3823 }
3824
3825 #[test]
3826 fn reported_content_score() {
3827 let score = ReportedContentScore::new(0).unwrap();
3829 assert_eq!(score.value(), 0);
3830 let score = ReportedContentScore::new(-50).unwrap();
3831 assert_eq!(score.value(), -50);
3832 let score = ReportedContentScore::new(-100).unwrap();
3833 assert_eq!(score.value(), -100);
3834 assert_eq!(ReportedContentScore::new(10), None);
3835 assert_eq!(ReportedContentScore::new(-110), None);
3836
3837 let score = ReportedContentScore::new_saturating(0);
3838 assert_eq!(score.value(), 0);
3839 let score = ReportedContentScore::new_saturating(-50);
3840 assert_eq!(score.value(), -50);
3841 let score = ReportedContentScore::new_saturating(-100);
3842 assert_eq!(score.value(), -100);
3843 let score = ReportedContentScore::new_saturating(10);
3844 assert_eq!(score, ReportedContentScore::MAX);
3845 let score = ReportedContentScore::new_saturating(-110);
3846 assert_eq!(score, ReportedContentScore::MIN);
3847
3848 let score = ReportedContentScore::try_from(0i16).unwrap();
3850 assert_eq!(score.value(), 0);
3851 let score = ReportedContentScore::try_from(-100i16).unwrap();
3852 assert_eq!(score.value(), -100);
3853 ReportedContentScore::try_from(10i16).unwrap_err();
3854 ReportedContentScore::try_from(-110i16).unwrap_err();
3855
3856 let score = ReportedContentScore::try_from(0i32).unwrap();
3858 assert_eq!(score.value(), 0);
3859 let score = ReportedContentScore::try_from(-100i32).unwrap();
3860 assert_eq!(score.value(), -100);
3861 ReportedContentScore::try_from(10i32).unwrap_err();
3862 ReportedContentScore::try_from(-110i32).unwrap_err();
3863
3864 let score = ReportedContentScore::try_from(0i64).unwrap();
3866 assert_eq!(score.value(), 0);
3867 let score = ReportedContentScore::try_from(-100i64).unwrap();
3868 assert_eq!(score.value(), -100);
3869 ReportedContentScore::try_from(10i64).unwrap_err();
3870 ReportedContentScore::try_from(-110i64).unwrap_err();
3871
3872 let score = ReportedContentScore::try_from(int!(0)).unwrap();
3874 assert_eq!(score.value(), 0);
3875 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
3876 assert_eq!(score.value(), -100);
3877 ReportedContentScore::try_from(int!(10)).unwrap_err();
3878 ReportedContentScore::try_from(int!(-110)).unwrap_err();
3879 }
3880
3881 #[async_test]
3882 async fn test_composer_draft() {
3883 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
3884
3885 let client = logged_in_client(None).await;
3886
3887 let response = SyncResponseBuilder::default()
3888 .add_joined_room(JoinedRoomBuilder::default())
3889 .build_sync_response();
3890 client.base_client().receive_sync_response(response).await.unwrap();
3891 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
3892
3893 assert_eq!(room.load_composer_draft().await.unwrap(), None);
3894
3895 let draft = ComposerDraft {
3896 plain_text: "Hello, world!".to_owned(),
3897 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
3898 draft_type: ComposerDraftType::NewMessage,
3899 };
3900 room.save_composer_draft(draft.clone()).await.unwrap();
3901 assert_eq!(room.load_composer_draft().await.unwrap(), Some(draft));
3902
3903 room.clear_composer_draft().await.unwrap();
3904 assert_eq!(room.load_composer_draft().await.unwrap(), None);
3905 }
3906
3907 #[async_test]
3908 async fn test_mark_join_requests_as_seen() {
3909 let server = MatrixMockServer::new().await;
3910 let client = server.client_builder().build().await;
3911 let event_id = event_id!("$a:b.c");
3912 let room_id = room_id!("!a:b.c");
3913 let user_id = user_id!("@alice:b.c");
3914
3915 let f = EventFactory::new().room(room_id);
3916 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f
3917 .member(user_id)
3918 .membership(MembershipState::Knock)
3919 .event_id(event_id)
3920 .into_raw_timeline()
3921 .cast()]);
3922 let room = server.sync_room(&client, joined_room_builder).await;
3923
3924 let seen_ids =
3926 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
3927 assert!(seen_ids.is_empty());
3928
3929 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
3931 .await
3932 .expect("Couldn't mark join request as seen");
3933
3934 let seen_ids =
3936 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
3937 assert_eq!(seen_ids.len(), 1);
3938 assert_eq!(
3939 seen_ids.into_iter().next().expect("No next value"),
3940 (event_id.to_owned(), user_id.to_owned())
3941 )
3942 }
3943
3944 #[async_test]
3945 async fn test_own_room_membership_with_no_own_member_event() {
3946 let server = MatrixMockServer::new().await;
3947 let client = server.client_builder().build().await;
3948 let room_id = room_id!("!a:b.c");
3949
3950 let room = server.sync_joined_room(&client, room_id).await;
3951
3952 let error = room.own_membership_details().await.err();
3955 assert!(error.is_some());
3956 }
3957
3958 #[async_test]
3959 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
3960 let server = MatrixMockServer::new().await;
3961 let client = server.client_builder().build().await;
3962 let room_id = room_id!("!a:b.c");
3963 let user_id = user_id!("@example:localhost");
3964
3965 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
3966 let joined_room_builder = JoinedRoomBuilder::new(room_id)
3967 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
3968 let room = server.sync_room(&client, joined_room_builder).await;
3969
3970 let ret = room.own_membership_details().await;
3972 assert_matches!(ret, Ok((member, sender)));
3973
3974 assert_eq!(member.event().user_id(), user_id);
3976
3977 assert!(sender.is_none());
3979 }
3980
3981 #[async_test]
3982 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
3983 let server = MatrixMockServer::new().await;
3984 let client = server.client_builder().build().await;
3985 let room_id = room_id!("!a:b.c");
3986 let user_id = user_id!("@example:localhost");
3987
3988 let f = EventFactory::new().room(room_id).sender(user_id);
3989 let joined_room_builder = JoinedRoomBuilder::new(room_id)
3990 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
3991 let room = server.sync_room(&client, joined_room_builder).await;
3992
3993 let ret = room.own_membership_details().await;
3995 assert_matches!(ret, Ok((member, sender)));
3996
3997 assert_eq!(member.event().user_id(), user_id);
3999
4000 assert!(sender.is_some());
4002 assert_eq!(sender.unwrap().event().user_id(), user_id);
4003 }
4004
4005 #[async_test]
4006 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
4007 let server = MatrixMockServer::new().await;
4008 let client = server.client_builder().build().await;
4009 let room_id = room_id!("!a:b.c");
4010 let user_id = user_id!("@example:localhost");
4011 let sender_id = user_id!("@alice:b.c");
4012
4013 let f = EventFactory::new().room(room_id).sender(sender_id);
4014 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4015 f.member(user_id).into_raw_sync().cast(),
4016 f.member(sender_id).into_raw_sync().cast(),
4018 ]);
4019 let room = server.sync_room(&client, joined_room_builder).await;
4020
4021 let ret = room.own_membership_details().await;
4023 assert_matches!(ret, Ok((member, sender)));
4024
4025 assert_eq!(member.event().user_id(), user_id);
4027
4028 assert!(sender.is_some());
4030 assert_eq!(sender.unwrap().event().user_id(), sender_id);
4031 }
4032
4033 #[async_test]
4034 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
4035 let server = MatrixMockServer::new().await;
4036 let client = server.client_builder().build().await;
4037 let room_id = room_id!("!a:b.c");
4038 let user_id = user_id!("@example:localhost");
4039 let sender_id = user_id!("@alice:b.c");
4040
4041 let f = EventFactory::new().room(room_id).sender(sender_id);
4042 let joined_room_builder = JoinedRoomBuilder::new(room_id)
4043 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
4044 let room = server.sync_room(&client, joined_room_builder).await;
4045
4046 server
4048 .mock_get_members()
4049 .ok(vec![f.member(sender_id).into_raw_timeline().cast()])
4050 .mock_once()
4051 .mount()
4052 .await;
4053
4054 let ret = room.own_membership_details().await;
4056 assert_matches!(ret, Ok((member, sender)));
4057
4058 assert_eq!(member.event().user_id(), user_id);
4060
4061 assert!(sender.is_some());
4063 assert_eq!(sender.unwrap().event().user_id(), sender_id);
4064 }
4065}