1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 future::{try_join, try_join_all},
31 stream::FuturesUnordered,
32};
33use http::StatusCode;
34#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
35pub use identity_status_changes::IdentityStatusChanges;
36#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
37use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
38#[cfg(feature = "e2e-encryption")]
39use matrix_sdk_base::{
40 crypto::{DecryptionSettings, RoomEventDecryptionResult},
41 deserialized_responses::EncryptionInfo,
42};
43use matrix_sdk_base::{
44 deserialized_responses::{
45 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
46 },
47 event_cache::store::media::IgnoreMediaRetentionPolicy,
48 media::MediaThumbnailSettings,
49 store::StateStoreExt,
50 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
51 StateChanges, StateStoreDataKey, StateStoreDataValue,
52};
53#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
54use matrix_sdk_common::BoxFuture;
55use matrix_sdk_common::{
56 deserialized_responses::TimelineEvent,
57 executor::{spawn, JoinHandle},
58 timeout::timeout,
59};
60use mime::Mime;
61use reply::Reply;
62#[cfg(feature = "e2e-encryption")]
63use ruma::events::{
64 room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
65 SyncMessageLikeEvent,
66};
67use ruma::{
68 api::client::{
69 config::{set_global_account_data, set_room_account_data},
70 context,
71 error::ErrorKind,
72 filter::LazyLoadOptions,
73 membership::{
74 ban_user, forget_room, get_member_events,
75 invite_user::{self, v3::InvitationRecipient},
76 kick_user, leave_room, unban_user, Invite3pid,
77 },
78 message::send_message_event,
79 read_marker::set_read_marker,
80 receipt::create_receipt,
81 redact::redact_event,
82 room::{get_room_event, report_content, report_room},
83 state::{get_state_events_for_key, send_state_event},
84 tag::{create_tag, delete_tag},
85 typing::create_typing_event::{self, v3::Typing},
86 },
87 assign,
88 events::{
89 beacon::BeaconEventContent,
90 beacon_info::BeaconInfoEventContent,
91 call::notify::{ApplicationType, CallNotifyEventContent, NotifyType},
92 direct::DirectEventContent,
93 marked_unread::{MarkedUnreadEventContent, UnstableMarkedUnreadEventContent},
94 receipt::{Receipt, ReceiptThread, ReceiptType},
95 room::{
96 avatar::{self, RoomAvatarEventContent},
97 encryption::RoomEncryptionEventContent,
98 history_visibility::HistoryVisibility,
99 member::{MembershipChange, SyncRoomMemberEvent},
100 message::{
101 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
102 FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
103 UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo,
104 VideoMessageEventContent,
105 },
106 name::RoomNameEventContent,
107 pinned_events::RoomPinnedEventsEventContent,
108 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
109 server_acl::RoomServerAclEventContent,
110 topic::RoomTopicEventContent,
111 ImageInfo, MediaSource, ThumbnailInfo,
112 },
113 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
114 tag::{TagInfo, TagName},
115 typing::SyncTypingEvent,
116 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
117 Mentions, MessageLikeEventContent, MessageLikeEventType, OriginalSyncStateEvent,
118 RedactContent, RedactedStateEventContent, RoomAccountDataEvent,
119 RoomAccountDataEventContent, RoomAccountDataEventType, StateEventContent, StateEventType,
120 StaticEventContent, StaticStateEventContent, SyncStateEvent,
121 },
122 push::{Action, PushConditionRoomCtx},
123 serde::Raw,
124 time::Instant,
125 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
126 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
127};
128use serde::de::DeserializeOwned;
129use thiserror::Error;
130use tokio::sync::broadcast;
131use tokio_stream::StreamExt;
132use tracing::{debug, info, instrument, warn};
133
134use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
135pub use self::{
136 member::{RoomMember, RoomMemberRole},
137 messages::{EventWithContextResponse, Messages, MessagesOptions},
138};
139#[cfg(doc)]
140use crate::event_cache::EventCache;
141use crate::{
142 attachment::{AttachmentConfig, AttachmentInfo},
143 client::WeakClient,
144 config::RequestConfig,
145 error::{BeaconError, WrongRoomState},
146 event_cache::{self, EventCacheDropHandles, RoomEventCache},
147 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
148 live_location_share::ObservableLiveLocation,
149 media::{MediaFormat, MediaRequestParameters},
150 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
151 room::{
152 knock_requests::{KnockRequest, KnockRequestMemberInfo},
153 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
154 privacy_settings::RoomPrivacySettings,
155 },
156 sync::RoomUpdate,
157 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
158 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
159};
160#[cfg(feature = "e2e-encryption")]
161use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
162
163pub mod edit;
164pub mod futures;
165pub mod identity_status_changes;
166pub mod knock_requests;
168mod member;
169mod messages;
170pub mod power_levels;
171pub mod reply;
172
173pub mod privacy_settings;
175
176#[derive(Debug, Clone)]
179pub struct Room {
180 inner: BaseRoom,
181 pub(crate) client: Client,
182}
183
184impl Deref for Room {
185 type Target = BaseRoom;
186
187 fn deref(&self) -> &Self::Target {
188 &self.inner
189 }
190}
191
192const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
193const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
194
195impl Room {
196 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
203 Self { inner: room, client }
204 }
205
206 #[doc(alias = "reject_invitation")]
210 pub async fn leave(&self) -> Result<()> {
211 let state = self.state();
212 if state == RoomState::Left {
213 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
214 "Joined or Invited",
215 state,
216 ))));
217 }
218
219 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
220 self.client.send(request).await?;
221 self.client.base_client().room_left(self.room_id()).await?;
222 Ok(())
223 }
224
225 #[doc(alias = "accept_invitation")]
229 pub async fn join(&self) -> Result<()> {
230 let state = self.state();
231 if state == RoomState::Joined {
232 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
233 "Invited or Left",
234 state,
235 ))));
236 }
237
238 let prev_room_state = self.inner.state();
239
240 let mark_as_direct = prev_room_state == RoomState::Invited
241 && self.inner.is_direct().await.unwrap_or_else(|e| {
242 warn!(room_id = ?self.room_id(), "is_direct() failed: {e}");
243 false
244 });
245
246 self.client.join_room_by_id(self.room_id()).await?;
247
248 if mark_as_direct {
249 self.set_is_direct(true).await?;
250 }
251
252 Ok(())
253 }
254
255 pub fn client(&self) -> Client {
259 self.client.clone()
260 }
261
262 pub fn is_synced(&self) -> bool {
265 self.inner.is_state_fully_synced()
266 }
267
268 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
298 let Some(url) = self.avatar_url() else { return Ok(None) };
299 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
300 Ok(Some(self.client.media().get_media_content(&request, true).await?))
301 }
302
303 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
332 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
333 let room_id = self.inner.room_id();
334 let request = options.into_request(room_id);
335 let http_response = self.client.send(request).await?;
336
337 #[allow(unused_mut)]
338 let mut response = Messages {
339 start: http_response.start,
340 end: http_response.end,
341 #[cfg(not(feature = "e2e-encryption"))]
342 chunk: http_response
343 .chunk
344 .into_iter()
345 .map(|raw| TimelineEvent::new(raw.cast()))
346 .collect(),
347 #[cfg(feature = "e2e-encryption")]
348 chunk: Vec::with_capacity(http_response.chunk.len()),
349 state: http_response.state,
350 };
351
352 #[cfg(feature = "e2e-encryption")]
353 for event in http_response.chunk {
354 let decrypted_event = if let Ok(AnySyncTimelineEvent::MessageLike(
355 AnySyncMessageLikeEvent::RoomEncrypted(SyncMessageLikeEvent::Original(_)),
356 )) = event.deserialize_as::<AnySyncTimelineEvent>()
357 {
358 if let Ok(event) = self.decrypt_event(event.cast_ref()).await {
359 event
360 } else {
361 TimelineEvent::new(event.cast())
362 }
363 } else {
364 TimelineEvent::new(event.cast())
365 };
366 response.chunk.push(decrypted_event);
367 }
368
369 if let Some(push_context) = self.push_context().await? {
370 let push_rules = self.client().account().push_rules().await?;
371
372 for event in &mut response.chunk {
373 event.push_actions =
374 Some(push_rules.get_actions(event.raw(), &push_context).to_owned());
375 }
376 }
377
378 Ok(response)
379 }
380
381 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
391 where
392 Ev: SyncEvent + DeserializeOwned + Send + 'static,
393 H: EventHandler<Ev, Ctx>,
394 {
395 self.client.add_room_event_handler(self.room_id(), handler)
396 }
397
398 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
403 self.client.subscribe_to_room_updates(self.room_id())
404 }
405
406 pub fn subscribe_to_typing_notifications(
412 &self,
413 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
414 let (sender, receiver) = broadcast::channel(16);
415 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
416 let own_user_id = self.own_user_id().to_owned();
417 move |event: SyncTypingEvent| async move {
418 let typing_user_ids = event
420 .content
421 .user_ids
422 .into_iter()
423 .filter(|user_id| *user_id != own_user_id)
424 .collect();
425 let _ = sender.send(typing_user_ids);
427 }
428 });
429 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
430 (drop_guard, receiver)
431 }
432
433 #[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
456 pub async fn subscribe_to_identity_status_changes(
457 &self,
458 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
459 IdentityStatusChanges::create_stream(self.clone()).await
460 }
461
462 async fn try_decrypt_event(&self, event: Raw<AnyTimelineEvent>) -> Result<TimelineEvent> {
468 #[cfg(feature = "e2e-encryption")]
469 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
470 SyncMessageLikeEvent::Original(_),
471 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
472 {
473 if let Ok(event) = self.decrypt_event(event.cast_ref()).await {
474 return Ok(event);
475 }
476 }
477
478 let mut event = TimelineEvent::new(event.cast());
479 event.push_actions = self.event_push_actions(event.raw()).await?;
480
481 Ok(event)
482 }
483
484 pub async fn event(
489 &self,
490 event_id: &EventId,
491 request_config: Option<RequestConfig>,
492 ) -> Result<TimelineEvent> {
493 let request =
494 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
495
496 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
497 let event = self.try_decrypt_event(raw_event).await?;
498
499 if let Ok((cache, _handles)) = self.event_cache().await {
501 cache.save_events([event.clone()]).await;
502 }
503
504 Ok(event)
505 }
506
507 pub async fn load_or_fetch_event(
514 &self,
515 event_id: &EventId,
516 request_config: Option<RequestConfig>,
517 ) -> Result<TimelineEvent> {
518 match self.event_cache().await {
519 Ok((event_cache, _drop_handles)) => {
520 if let Some(event) = event_cache.event(event_id).await {
521 return Ok(event);
522 }
523 }
525 Err(err) => {
526 debug!("error when getting the event cache: {err}");
527 }
528 }
529 self.event(event_id, request_config).await
530 }
531
532 pub async fn event_with_context(
535 &self,
536 event_id: &EventId,
537 lazy_load_members: bool,
538 context_size: UInt,
539 request_config: Option<RequestConfig>,
540 ) -> Result<EventWithContextResponse> {
541 let mut request =
542 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
543
544 request.limit = context_size;
545
546 if lazy_load_members {
547 request.filter.lazy_load_options =
548 LazyLoadOptions::Enabled { include_redundant_members: false };
549 }
550
551 let response = self.client.send(request).with_request_config(request_config).await?;
552
553 let target_event = if let Some(event) = response.event {
554 Some(self.try_decrypt_event(event).await?)
555 } else {
556 None
557 };
558
559 let (events_before, events_after) = try_join(
563 try_join_all(response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev))),
564 try_join_all(response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev))),
565 )
566 .await?;
567
568 if let Ok((cache, _handles)) = self.event_cache().await {
570 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
571 if let Some(event) = &target_event {
572 events_to_save.push(event.clone());
573 }
574
575 for event in &events_before {
576 events_to_save.push(event.clone());
577 }
578
579 for event in &events_after {
580 events_to_save.push(event.clone());
581 }
582
583 cache.save_events(events_to_save).await;
584 }
585
586 Ok(EventWithContextResponse {
587 event: target_event,
588 events_before,
589 events_after,
590 state: response.state,
591 prev_batch_token: response.start,
592 next_batch_token: response.end,
593 })
594 }
595
596 pub(crate) async fn request_members(&self) -> Result<()> {
597 self.client
598 .locks()
599 .members_request_deduplicated_handler
600 .run(self.room_id().to_owned(), async move {
601 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
602 let response = self
603 .client
604 .send(request.clone())
605 .with_request_config(
606 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
609 )
610 .await?;
611
612 Box::pin(self.client.base_client().receive_all_members(
614 self.room_id(),
615 &request,
616 &response,
617 ))
618 .await?;
619
620 Ok(())
621 })
622 .await
623 }
624
625 pub async fn request_encryption_state(&self) -> Result<()> {
630 if !self.inner.encryption_state().is_unknown() {
631 return Ok(());
632 }
633
634 self.client
635 .locks()
636 .encryption_state_deduplicated_handler
637 .run(self.room_id().to_owned(), async move {
638 let request = get_state_events_for_key::v3::Request::new(
640 self.room_id().to_owned(),
641 StateEventType::RoomEncryption,
642 "".to_owned(),
643 );
644 let response = match self.client.send(request).await {
645 Ok(response) => {
646 Some(response.content.deserialize_as::<RoomEncryptionEventContent>()?)
647 }
648 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
649 Err(err) => return Err(err.into()),
650 };
651
652 let _sync_lock = self.client.base_client().sync_lock().lock().await;
653
654 let mut room_info = self.clone_info();
657 room_info.mark_encryption_state_synced();
658 room_info.set_encryption_event(response.clone());
659 let mut changes = StateChanges::default();
660 changes.add_room(room_info.clone());
661
662 self.client.state_store().save_changes(&changes).await?;
663 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
664
665 Ok(())
666 })
667 .await
668 }
669
670 pub fn encryption_state(&self) -> EncryptionState {
675 self.inner.encryption_state()
676 }
677
678 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
684 self.request_encryption_state().await?;
685
686 Ok(self.encryption_state())
687 }
688
689 #[cfg(feature = "e2e-encryption")]
691 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
692 let encryption = self.client.encryption();
693
694 let this_device_is_verified = match encryption.get_own_device().await {
695 Ok(Some(device)) => device.is_verified_with_cross_signing(),
696
697 _ => true,
699 };
700
701 let backup_exists_on_server =
702 encryption.backups().exists_on_server().await.unwrap_or(false);
703
704 CryptoContextInfo {
705 device_creation_ts: encryption.device_creation_timestamp().await,
706 this_device_is_verified,
707 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
708 backup_exists_on_server,
709 }
710 }
711
712 fn are_events_visible(&self) -> bool {
713 if let RoomState::Invited = self.inner.state() {
714 return matches!(
715 self.inner.history_visibility_or_default(),
716 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
717 );
718 }
719
720 true
721 }
722
723 pub async fn sync_members(&self) -> Result<()> {
729 if !self.are_events_visible() {
730 return Ok(());
731 }
732
733 if !self.are_members_synced() {
734 self.request_members().await
735 } else {
736 Ok(())
737 }
738 }
739
740 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
754 self.sync_members().await?;
755 self.get_member_no_sync(user_id).await
756 }
757
758 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
772 Ok(self
773 .inner
774 .get_member(user_id)
775 .await?
776 .map(|member| RoomMember::new(self.client.clone(), member)))
777 }
778
779 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
788 self.sync_members().await?;
789 self.members_no_sync(memberships).await
790 }
791
792 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
801 Ok(self
802 .inner
803 .members(memberships)
804 .await?
805 .into_iter()
806 .map(|member| RoomMember::new(self.client.clone(), member))
807 .collect())
808 }
809
810 pub async fn get_state_events(
812 &self,
813 event_type: StateEventType,
814 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
815 self.client
816 .state_store()
817 .get_state_events(self.room_id(), event_type)
818 .await
819 .map_err(Into::into)
820 }
821
822 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
839 where
840 C: StaticEventContent + StaticStateEventContent + RedactContent,
841 C::Redacted: RedactedStateEventContent,
842 {
843 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
844 }
845
846 pub async fn get_state_events_for_keys(
849 &self,
850 event_type: StateEventType,
851 state_keys: &[&str],
852 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
853 self.client
854 .state_store()
855 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
856 .await
857 .map_err(Into::into)
858 }
859
860 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
880 &self,
881 state_keys: I,
882 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
883 where
884 C: StaticEventContent + StaticStateEventContent + RedactContent,
885 C::StateKey: Borrow<K>,
886 C::Redacted: RedactedStateEventContent,
887 K: AsRef<str> + Sized + Sync + 'a,
888 I: IntoIterator<Item = &'a K> + Send,
889 I::IntoIter: Send,
890 {
891 Ok(self
892 .client
893 .state_store()
894 .get_state_events_for_keys_static(self.room_id(), state_keys)
895 .await?)
896 }
897
898 pub async fn get_state_event(
900 &self,
901 event_type: StateEventType,
902 state_key: &str,
903 ) -> Result<Option<RawAnySyncOrStrippedState>> {
904 self.client
905 .state_store()
906 .get_state_event(self.room_id(), event_type, state_key)
907 .await
908 .map_err(Into::into)
909 }
910
911 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
930 where
931 C: StaticEventContent + StaticStateEventContent<StateKey = EmptyStateKey> + RedactContent,
932 C::Redacted: RedactedStateEventContent,
933 {
934 self.get_state_event_static_for_key(&EmptyStateKey).await
935 }
936
937 pub async fn get_state_event_static_for_key<C, K>(
957 &self,
958 state_key: &K,
959 ) -> Result<Option<RawSyncOrStrippedState<C>>>
960 where
961 C: StaticEventContent + StaticStateEventContent + RedactContent,
962 C::StateKey: Borrow<K>,
963 C::Redacted: RedactedStateEventContent,
964 K: AsRef<str> + ?Sized + Sync,
965 {
966 Ok(self
967 .client
968 .state_store()
969 .get_state_event_static_for_key(self.room_id(), state_key)
970 .await?)
971 }
972
973 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
977 Ok(self
982 .get_state_events_static::<SpaceParentEventContent>()
983 .await?
984 .into_iter()
985 .flat_map(|parent_event| match parent_event.deserialize() {
987 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
988 Some((e.state_key.to_owned(), e.sender))
989 }
990 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
991 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
992 Err(e) => {
993 info!(room_id = ?self.room_id(), "Could not deserialize m.room.parent: {e}");
994 None
995 }
996 })
997 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
999 let Some(parent_room) = self.client.get_room(&state_key) else {
1000 return Ok(ParentSpace::Unverifiable(state_key));
1003 };
1004 if let Some(child_event) = parent_room
1007 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1008 .await?
1009 {
1010 match child_event.deserialize() {
1011 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1012 return Ok(ParentSpace::Reciprocal(parent_room));
1015 }
1016 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1017 Ok(SyncOrStrippedState::Stripped(_)) => {}
1018 Err(e) => {
1019 info!(
1020 room_id = ?self.room_id(), parent_room_id = ?state_key,
1021 "Could not deserialize m.room.child: {e}"
1022 );
1023 }
1024 }
1025 }
1030
1031 let Some(member) = parent_room.get_member(&sender).await? else {
1034 return Ok(ParentSpace::Illegitimate(parent_room));
1036 };
1037
1038 if member.can_send_state(StateEventType::SpaceChild) {
1039 Ok(ParentSpace::WithPowerlevel(parent_room))
1041 } else {
1042 Ok(ParentSpace::Illegitimate(parent_room))
1043 }
1044 })
1045 .collect::<FuturesUnordered<_>>())
1046 }
1047
1048 pub async fn account_data(
1050 &self,
1051 data_type: RoomAccountDataEventType,
1052 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1053 self.client
1054 .state_store()
1055 .get_room_account_data_event(self.room_id(), data_type)
1056 .await
1057 .map_err(Into::into)
1058 }
1059
1060 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1079 where
1080 C: StaticEventContent + RoomAccountDataEventContent,
1081 {
1082 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast))
1083 }
1084
1085 #[cfg(feature = "e2e-encryption")]
1090 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1091 let user_ids = self
1092 .client
1093 .state_store()
1094 .get_user_ids(self.room_id(), RoomMemberships::empty())
1095 .await?;
1096
1097 for user_id in user_ids {
1098 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1099 let any_unverified = devices.devices().any(|d| !d.is_verified());
1100
1101 if any_unverified {
1102 return Ok(false);
1103 }
1104 }
1105
1106 Ok(true)
1107 }
1108
1109 pub async fn set_account_data<T>(
1124 &self,
1125 content: T,
1126 ) -> Result<set_room_account_data::v3::Response>
1127 where
1128 T: RoomAccountDataEventContent,
1129 {
1130 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1131
1132 let request = set_room_account_data::v3::Request::new(
1133 own_user.to_owned(),
1134 self.room_id().to_owned(),
1135 &content,
1136 )?;
1137
1138 Ok(self.client.send(request).await?)
1139 }
1140
1141 pub async fn set_account_data_raw(
1166 &self,
1167 event_type: RoomAccountDataEventType,
1168 content: Raw<AnyRoomAccountDataEventContent>,
1169 ) -> Result<set_room_account_data::v3::Response> {
1170 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1171
1172 let request = set_room_account_data::v3::Request::new_raw(
1173 own_user.to_owned(),
1174 self.room_id().to_owned(),
1175 event_type,
1176 content,
1177 );
1178
1179 Ok(self.client.send(request).await?)
1180 }
1181
1182 pub async fn set_tag(
1213 &self,
1214 tag: TagName,
1215 tag_info: TagInfo,
1216 ) -> Result<create_tag::v3::Response> {
1217 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1218 let request = create_tag::v3::Request::new(
1219 user_id.to_owned(),
1220 self.inner.room_id().to_owned(),
1221 tag.to_string(),
1222 tag_info,
1223 );
1224 Ok(self.client.send(request).await?)
1225 }
1226
1227 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1234 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1235 let request = delete_tag::v3::Request::new(
1236 user_id.to_owned(),
1237 self.inner.room_id().to_owned(),
1238 tag.to_string(),
1239 );
1240 Ok(self.client.send(request).await?)
1241 }
1242
1243 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1253 if is_favourite {
1254 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1255
1256 self.set_tag(TagName::Favorite, tag_info).await?;
1257
1258 if self.is_low_priority() {
1259 self.remove_tag(TagName::LowPriority).await?;
1260 }
1261 } else {
1262 self.remove_tag(TagName::Favorite).await?;
1263 }
1264 Ok(())
1265 }
1266
1267 pub async fn set_is_low_priority(
1277 &self,
1278 is_low_priority: bool,
1279 tag_order: Option<f64>,
1280 ) -> Result<()> {
1281 if is_low_priority {
1282 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1283
1284 self.set_tag(TagName::LowPriority, tag_info).await?;
1285
1286 if self.is_favourite() {
1287 self.remove_tag(TagName::Favorite).await?;
1288 }
1289 } else {
1290 self.remove_tag(TagName::LowPriority).await?;
1291 }
1292 Ok(())
1293 }
1294
1295 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1304 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1305
1306 let mut content = self
1307 .client
1308 .account()
1309 .account_data::<DirectEventContent>()
1310 .await?
1311 .map(|c| c.deserialize())
1312 .transpose()?
1313 .unwrap_or_default();
1314
1315 let this_room_id = self.inner.room_id();
1316
1317 if is_direct {
1318 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1319 room_members.retain(|member| member.user_id() != self.own_user_id());
1320
1321 for member in room_members {
1322 let entry = content.entry(member.user_id().into()).or_default();
1323 if !entry.iter().any(|room_id| room_id == this_room_id) {
1324 entry.push(this_room_id.to_owned());
1325 }
1326 }
1327 } else {
1328 for (_, list) in content.iter_mut() {
1329 list.retain(|room_id| *room_id != this_room_id);
1330 }
1331
1332 content.retain(|_, list| !list.is_empty());
1334 }
1335
1336 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1337
1338 self.client.send(request).await?;
1339 Ok(())
1340 }
1341
1342 #[cfg(feature = "e2e-encryption")]
1350 pub async fn decrypt_event(
1351 &self,
1352 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1353 ) -> Result<TimelineEvent> {
1354 let machine = self.client.olm_machine().await;
1355 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1356
1357 let decryption_settings = DecryptionSettings {
1358 sender_device_trust_requirement: self.client.base_client().decryption_trust_requirement,
1359 };
1360 let mut event: TimelineEvent = match machine
1361 .try_decrypt_room_event(event.cast_ref(), self.inner.room_id(), &decryption_settings)
1362 .await?
1363 {
1364 RoomEventDecryptionResult::Decrypted(decrypted) => decrypted.into(),
1365 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1366 self.client
1367 .encryption()
1368 .backups()
1369 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1370 TimelineEvent::new_utd_event(event.clone().cast(), utd_info)
1371 }
1372 };
1373
1374 event.push_actions = self.event_push_actions(event.raw()).await?;
1375 Ok(event)
1376 }
1377
1378 #[cfg(feature = "e2e-encryption")]
1385 pub async fn get_encryption_info(
1386 &self,
1387 session_id: &str,
1388 sender: &UserId,
1389 ) -> Option<EncryptionInfo> {
1390 let machine = self.client.olm_machine().await;
1391 let machine = machine.as_ref()?;
1392 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1393 }
1394
1395 #[cfg(feature = "e2e-encryption")]
1408 pub async fn discard_room_key(&self) -> Result<()> {
1409 let machine = self.client.olm_machine().await;
1410 if let Some(machine) = machine.as_ref() {
1411 machine.discard_room_key(self.inner.room_id()).await?;
1412 Ok(())
1413 } else {
1414 Err(Error::NoOlmMachine)
1415 }
1416 }
1417
1418 #[instrument(skip_all)]
1426 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1427 let request = assign!(
1428 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1429 { reason: reason.map(ToOwned::to_owned) }
1430 );
1431 self.client.send(request).await?;
1432 Ok(())
1433 }
1434
1435 #[instrument(skip_all)]
1443 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1444 let request = assign!(
1445 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1446 { reason: reason.map(ToOwned::to_owned) }
1447 );
1448 self.client.send(request).await?;
1449 Ok(())
1450 }
1451
1452 #[instrument(skip_all)]
1461 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1462 let request = assign!(
1463 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1464 { reason: reason.map(ToOwned::to_owned) }
1465 );
1466 self.client.send(request).await?;
1467 Ok(())
1468 }
1469
1470 #[instrument(skip_all)]
1476 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1477 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1478 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1479 self.client.send(request).await?;
1480
1481 self.mark_members_missing();
1485
1486 Ok(())
1487 }
1488
1489 #[instrument(skip_all)]
1495 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1496 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1497 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1498 self.client.send(request).await?;
1499
1500 self.mark_members_missing();
1504
1505 Ok(())
1506 }
1507
1508 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1543 self.ensure_room_joined()?;
1544
1545 let send = if let Some(typing_time) =
1548 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1549 {
1550 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1551 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1555 } else {
1556 !typing
1558 }
1559 } else {
1560 typing
1563 };
1564
1565 if send {
1566 self.send_typing_notice(typing).await?;
1567 }
1568
1569 Ok(())
1570 }
1571
1572 #[instrument(name = "typing_notice", skip(self))]
1573 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1574 let typing = if typing {
1575 self.client
1576 .inner
1577 .typing_notice_times
1578 .write()
1579 .unwrap()
1580 .insert(self.room_id().to_owned(), Instant::now());
1581 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1582 } else {
1583 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1584 Typing::No
1585 };
1586
1587 let request = create_typing_event::v3::Request::new(
1588 self.own_user_id().to_owned(),
1589 self.room_id().to_owned(),
1590 typing,
1591 );
1592
1593 self.client.send(request).await?;
1594
1595 Ok(())
1596 }
1597
1598 #[instrument(skip_all)]
1612 pub async fn send_single_receipt(
1613 &self,
1614 receipt_type: create_receipt::v3::ReceiptType,
1615 thread: ReceiptThread,
1616 event_id: OwnedEventId,
1617 ) -> Result<()> {
1618 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1621
1622 self.client
1623 .inner
1624 .locks
1625 .read_receipt_deduplicated_handler
1626 .run((request_key, event_id.clone()), async {
1627 let mut request = create_receipt::v3::Request::new(
1628 self.room_id().to_owned(),
1629 receipt_type,
1630 event_id,
1631 );
1632 request.thread = thread;
1633
1634 self.client.send(request).await?;
1635 Ok(())
1636 })
1637 .await
1638 }
1639
1640 #[instrument(skip_all)]
1648 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
1649 if receipts.is_empty() {
1650 return Ok(());
1651 }
1652
1653 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
1654 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
1655 fully_read,
1656 read_receipt: public_read_receipt,
1657 private_read_receipt,
1658 });
1659
1660 self.client.send(request).await?;
1661 Ok(())
1662 }
1663
1664 #[instrument(skip_all)]
1696 pub async fn enable_encryption(&self) -> Result<()> {
1697 use ruma::{
1698 events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm,
1699 };
1700 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
1701
1702 if !self.latest_encryption_state().await?.is_encrypted() {
1703 let content =
1704 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
1705 self.send_state_event(content).await?;
1706
1707 _ = timeout(self.client.inner.sync_beat.listen(), SYNC_WAIT_TIME).await;
1711
1712 let _sync_lock = self.client.base_client().sync_lock().lock().await;
1717 if !self.inner.encryption_state().is_encrypted() {
1718 debug!("still not marked as encrypted, marking encryption state as missing");
1719
1720 let mut room_info = self.clone_info();
1721 room_info.mark_encryption_state_missing();
1722 let mut changes = StateChanges::default();
1723 changes.add_room(room_info.clone());
1724
1725 self.client.state_store().save_changes(&changes).await?;
1726 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1727 } else {
1728 debug!("room successfully marked as encrypted");
1729 }
1730 }
1731
1732 Ok(())
1733 }
1734
1735 #[cfg(feature = "e2e-encryption")]
1744 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
1745 async fn preshare_room_key(&self) -> Result<()> {
1746 self.ensure_room_joined()?;
1747
1748 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
1750 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
1751
1752 self.client
1753 .locks()
1754 .group_session_deduplicated_handler
1755 .run(self.room_id().to_owned(), async move {
1756 {
1757 let members = self
1758 .client
1759 .state_store()
1760 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
1761 .await?;
1762 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
1763 };
1764
1765 let response = self.share_room_key().await;
1766
1767 if let Err(r) = response {
1771 let machine = self.client.olm_machine().await;
1772 if let Some(machine) = machine.as_ref() {
1773 machine.discard_room_key(self.room_id()).await?;
1774 }
1775 return Err(r);
1776 }
1777
1778 Ok(())
1779 })
1780 .await
1781 }
1782
1783 #[cfg(feature = "e2e-encryption")]
1789 #[instrument(skip_all)]
1790 async fn share_room_key(&self) -> Result<()> {
1791 self.ensure_room_joined()?;
1792
1793 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
1794
1795 for request in requests {
1796 let response = self.client.send_to_device(&request).await?;
1797 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
1798 }
1799
1800 Ok(())
1801 }
1802
1803 #[instrument(skip_all)]
1812 pub async fn sync_up(&self) {
1813 while !self.is_synced() && self.state() == RoomState::Joined {
1814 let wait_for_beat = self.client.inner.sync_beat.listen();
1815 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
1817 }
1818 }
1819
1820 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
1890 SendMessageLikeEvent::new(self, content)
1891 }
1892
1893 #[cfg(feature = "e2e-encryption")]
1895 async fn query_keys_for_untracked_users(&self) -> Result<()> {
1896 let olm = self.client.olm_machine().await;
1897 let olm = olm.as_ref().expect("Olm machine wasn't started");
1898
1899 let members =
1900 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
1901
1902 let tracked: HashMap<_, _> = olm
1903 .store()
1904 .load_tracked_users()
1905 .await?
1906 .into_iter()
1907 .map(|tracked| (tracked.user_id, tracked.dirty))
1908 .collect();
1909
1910 let members_with_unknown_devices =
1913 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
1914
1915 let (req_id, request) =
1916 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
1917
1918 if !request.device_keys.is_empty() {
1919 self.client.keys_query(&req_id, request.device_keys).await?;
1920 }
1921
1922 Ok(())
1923 }
1924
1925 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
1969 pub fn send_raw<'a>(
1970 &'a self,
1971 event_type: &'a str,
1972 content: impl IntoRawMessageLikeEventContent,
1973 ) -> SendRawMessageLikeEvent<'a> {
1974 SendRawMessageLikeEvent::new(self, event_type, content)
1977 }
1978
1979 #[instrument(skip_all)]
2027 pub fn send_attachment<'a>(
2028 &'a self,
2029 filename: impl Into<String>,
2030 content_type: &'a Mime,
2031 data: Vec<u8>,
2032 config: AttachmentConfig,
2033 ) -> SendAttachment<'a> {
2034 SendAttachment::new(self, filename.into(), content_type, data, config)
2035 }
2036
2037 #[instrument(skip_all)]
2065 pub(super) async fn prepare_and_send_attachment<'a>(
2066 &'a self,
2067 filename: String,
2068 content_type: &'a Mime,
2069 data: Vec<u8>,
2070 mut config: AttachmentConfig,
2071 send_progress: SharedObservable<TransmissionProgress>,
2072 store_in_cache: bool,
2073 ) -> Result<send_message_event::v3::Response> {
2074 self.ensure_room_joined()?;
2075
2076 let txn_id = config.txn_id.take();
2077 let mentions = config.mentions.take();
2078
2079 let thumbnail = config.thumbnail.take();
2080
2081 let thumbnail_cache_info = if store_in_cache {
2083 thumbnail
2084 .as_ref()
2085 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2086 } else {
2087 None
2088 };
2089
2090 #[cfg(feature = "e2e-encryption")]
2091 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2092 self.client
2093 .upload_encrypted_media_and_thumbnail(content_type, &data, thumbnail, send_progress)
2094 .await?
2095 } else {
2096 self.client
2097 .media()
2098 .upload_plain_media_and_thumbnail(
2099 content_type,
2100 data.clone(),
2103 thumbnail,
2104 send_progress,
2105 )
2106 .await?
2107 };
2108
2109 #[cfg(not(feature = "e2e-encryption"))]
2110 let (media_source, thumbnail) = self
2111 .client
2112 .media()
2113 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2114 .await?;
2115
2116 if store_in_cache {
2117 let cache_store_lock_guard = self.client.event_cache_store().lock().await?;
2118
2119 debug!("caching the media");
2123 let request =
2124 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2125
2126 if let Err(err) = cache_store_lock_guard
2127 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2128 .await
2129 {
2130 warn!("unable to cache the media after uploading it: {err}");
2131 }
2132
2133 if let Some(((data, height, width), source)) =
2134 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2135 {
2136 debug!("caching the thumbnail");
2137
2138 let request = MediaRequestParameters {
2139 source: source.clone(),
2140 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2141 };
2142
2143 if let Err(err) = cache_store_lock_guard
2144 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2145 .await
2146 {
2147 warn!("unable to cache the media after uploading it: {err}");
2148 }
2149 }
2150 }
2151
2152 let content = self
2153 .make_attachment_event(
2154 self.make_attachment_type(
2155 content_type,
2156 filename,
2157 media_source,
2158 config.caption,
2159 config.formatted_caption,
2160 config.info,
2161 thumbnail,
2162 ),
2163 mentions,
2164 config.reply,
2165 )
2166 .await?;
2167
2168 let mut fut = self.send(content);
2169 if let Some(txn_id) = txn_id {
2170 fut = fut.with_transaction_id(txn_id);
2171 }
2172 fut.await
2173 }
2174
2175 #[allow(clippy::too_many_arguments)]
2178 pub(crate) fn make_attachment_type(
2179 &self,
2180 content_type: &Mime,
2181 filename: String,
2182 source: MediaSource,
2183 caption: Option<String>,
2184 formatted_caption: Option<FormattedBody>,
2185 info: Option<AttachmentInfo>,
2186 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2187 ) -> MessageType {
2188 let (body, filename) = match caption {
2192 Some(caption) => (caption, Some(filename)),
2193 None => (filename, None),
2194 };
2195
2196 let (thumbnail_source, thumbnail_info) = thumbnail.unzip();
2197
2198 match content_type.type_() {
2199 mime::IMAGE => {
2200 let info = assign!(info.map(ImageInfo::from).unwrap_or_default(), {
2201 mimetype: Some(content_type.as_ref().to_owned()),
2202 thumbnail_source,
2203 thumbnail_info
2204 });
2205 let content = assign!(ImageMessageEventContent::new(body, source), {
2206 info: Some(Box::new(info)),
2207 formatted: formatted_caption,
2208 filename
2209 });
2210 MessageType::Image(content)
2211 }
2212
2213 mime::AUDIO => {
2214 let mut content = assign!(AudioMessageEventContent::new(body, source), {
2215 formatted: formatted_caption,
2216 filename
2217 });
2218
2219 if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) =
2220 &info
2221 {
2222 if let Some(duration) = audio_info.duration {
2223 let waveform = waveform_vec.iter().map(|v| (*v).into()).collect();
2224 content.audio =
2225 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
2226 }
2227 content.voice = Some(UnstableVoiceContentBlock::new());
2228 }
2229
2230 let mut audio_info = info.map(AudioInfo::from).unwrap_or_default();
2231 audio_info.mimetype = Some(content_type.as_ref().to_owned());
2232 let content = content.info(Box::new(audio_info));
2233
2234 MessageType::Audio(content)
2235 }
2236
2237 mime::VIDEO => {
2238 let info = assign!(info.map(VideoInfo::from).unwrap_or_default(), {
2239 mimetype: Some(content_type.as_ref().to_owned()),
2240 thumbnail_source,
2241 thumbnail_info
2242 });
2243 let content = assign!(VideoMessageEventContent::new(body, source), {
2244 info: Some(Box::new(info)),
2245 formatted: formatted_caption,
2246 filename
2247 });
2248 MessageType::Video(content)
2249 }
2250
2251 _ => {
2252 let info = assign!(info.map(FileInfo::from).unwrap_or_default(), {
2253 mimetype: Some(content_type.as_ref().to_owned()),
2254 thumbnail_source,
2255 thumbnail_info
2256 });
2257 let content = assign!(FileMessageEventContent::new(body, source), {
2258 info: Some(Box::new(info)),
2259 formatted: formatted_caption,
2260 filename,
2261 });
2262 MessageType::File(content)
2263 }
2264 }
2265 }
2266
2267 pub(crate) async fn make_attachment_event(
2270 &self,
2271 msg_type: MessageType,
2272 mentions: Option<Mentions>,
2273 reply: Option<Reply>,
2274 ) -> Result<RoomMessageEventContent> {
2275 let mut content = RoomMessageEventContent::new(msg_type);
2276 if let Some(mentions) = mentions {
2277 content = content.add_mentions(mentions);
2278 }
2279 if let Some(reply) = reply {
2280 content = self.make_reply_event(content.into(), reply).await?;
2283 }
2284 Ok(content)
2285 }
2286
2287 pub async fn update_power_levels(
2296 &self,
2297 updates: Vec<(&UserId, Int)>,
2298 ) -> Result<send_state_event::v3::Response> {
2299 let mut power_levels = self.power_levels().await?;
2300
2301 for (user_id, new_level) in updates {
2302 if new_level == power_levels.users_default {
2303 power_levels.users.remove(user_id);
2304 } else {
2305 power_levels.users.insert(user_id.to_owned(), new_level);
2306 }
2307 }
2308
2309 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await
2310 }
2311
2312 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2317 let mut power_levels = self.power_levels().await?;
2318 power_levels.apply(changes)?;
2319 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await?;
2320 Ok(())
2321 }
2322
2323 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2327 let default_power_levels = RoomPowerLevels::from(RoomPowerLevelsEventContent::new());
2328 let changes = RoomPowerLevelChanges::from(default_power_levels);
2329 self.apply_power_level_changes(changes).await?;
2330 Ok(self.power_levels().await?)
2331 }
2332
2333 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2338 let power_level = self.get_user_power_level(user_id).await?;
2339 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2340 }
2341
2342 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<i64> {
2347 let event = self.power_levels().await?;
2348 Ok(event.for_user(user_id).into())
2349 }
2350
2351 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2354 let power_levels = self.power_levels().await.ok();
2355 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2356 if let Some(power_levels) = power_levels {
2357 for (id, level) in power_levels.users.into_iter() {
2358 user_power_levels.insert(id, level.into());
2359 }
2360 }
2361 user_power_levels
2362 }
2363
2364 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2366 self.send_state_event(RoomNameEventContent::new(name)).await
2367 }
2368
2369 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2371 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2372 }
2373
2374 pub async fn set_avatar_url(
2380 &self,
2381 url: &MxcUri,
2382 info: Option<avatar::ImageInfo>,
2383 ) -> Result<send_state_event::v3::Response> {
2384 self.ensure_room_joined()?;
2385
2386 let mut room_avatar_event = RoomAvatarEventContent::new();
2387 room_avatar_event.url = Some(url.to_owned());
2388 room_avatar_event.info = info.map(Box::new);
2389
2390 self.send_state_event(room_avatar_event).await
2391 }
2392
2393 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2395 self.send_state_event(RoomAvatarEventContent::new()).await
2396 }
2397
2398 pub async fn upload_avatar(
2406 &self,
2407 mime: &Mime,
2408 data: Vec<u8>,
2409 info: Option<avatar::ImageInfo>,
2410 ) -> Result<send_state_event::v3::Response> {
2411 self.ensure_room_joined()?;
2412
2413 let upload_response = self.client.media().upload(mime, data, None).await?;
2414 let mut info = info.unwrap_or_default();
2415 info.blurhash = upload_response.blurhash;
2416 info.mimetype = Some(mime.to_string());
2417
2418 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2419 }
2420
2421 #[instrument(skip_all)]
2465 pub async fn send_state_event(
2466 &self,
2467 content: impl StateEventContent<StateKey = EmptyStateKey>,
2468 ) -> Result<send_state_event::v3::Response> {
2469 self.send_state_event_for_key(&EmptyStateKey, content).await
2470 }
2471
2472 pub async fn send_state_event_for_key<C, K>(
2513 &self,
2514 state_key: &K,
2515 content: C,
2516 ) -> Result<send_state_event::v3::Response>
2517 where
2518 C: StateEventContent,
2519 C::StateKey: Borrow<K>,
2520 K: AsRef<str> + ?Sized,
2521 {
2522 self.ensure_room_joined()?;
2523 let request =
2524 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
2525 let response = self.client.send(request).await?;
2526 Ok(response)
2527 }
2528
2529 #[instrument(skip_all)]
2564 pub async fn send_state_event_raw(
2565 &self,
2566 event_type: &str,
2567 state_key: &str,
2568 content: impl IntoRawStateEventContent,
2569 ) -> Result<send_state_event::v3::Response> {
2570 self.ensure_room_joined()?;
2571
2572 let request = send_state_event::v3::Request::new_raw(
2573 self.room_id().to_owned(),
2574 event_type.into(),
2575 state_key.to_owned(),
2576 content.into_raw_state_event_content(),
2577 );
2578
2579 Ok(self.client.send(request).await?)
2580 }
2581
2582 #[instrument(skip_all)]
2617 pub async fn redact(
2618 &self,
2619 event_id: &EventId,
2620 reason: Option<&str>,
2621 txn_id: Option<OwnedTransactionId>,
2622 ) -> HttpResult<redact_event::v3::Response> {
2623 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
2624 let request = assign!(
2625 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
2626 { reason: reason.map(ToOwned::to_owned) }
2627 );
2628
2629 self.client.send(request).await
2630 }
2631
2632 pub async fn can_user_redact_own(&self, user_id: &UserId) -> Result<bool> {
2637 Ok(self.power_levels().await?.user_can_redact_own_event(user_id))
2638 }
2639
2640 pub async fn can_user_redact_other(&self, user_id: &UserId) -> Result<bool> {
2645 Ok(self.power_levels().await?.user_can_redact_event_of_other(user_id))
2646 }
2647
2648 pub async fn can_user_ban(&self, user_id: &UserId) -> Result<bool> {
2653 Ok(self.power_levels().await?.user_can_ban(user_id))
2654 }
2655
2656 pub async fn can_user_invite(&self, user_id: &UserId) -> Result<bool> {
2661 Ok(self.power_levels().await?.user_can_invite(user_id))
2662 }
2663
2664 pub async fn can_user_kick(&self, user_id: &UserId) -> Result<bool> {
2669 Ok(self.power_levels().await?.user_can_kick(user_id))
2670 }
2671
2672 pub async fn can_user_send_state(
2677 &self,
2678 user_id: &UserId,
2679 state_event: StateEventType,
2680 ) -> Result<bool> {
2681 Ok(self.power_levels().await?.user_can_send_state(user_id, state_event))
2682 }
2683
2684 pub async fn can_user_send_message(
2689 &self,
2690 user_id: &UserId,
2691 message: MessageLikeEventType,
2692 ) -> Result<bool> {
2693 Ok(self.power_levels().await?.user_can_send_message(user_id, message))
2694 }
2695
2696 pub async fn can_user_pin_unpin(&self, user_id: &UserId) -> Result<bool> {
2701 Ok(self
2702 .power_levels()
2703 .await?
2704 .user_can_send_state(user_id, StateEventType::RoomPinnedEvents))
2705 }
2706
2707 pub async fn can_user_trigger_room_notification(&self, user_id: &UserId) -> Result<bool> {
2712 Ok(self.power_levels().await?.user_can_trigger_room_notification(user_id))
2713 }
2714
2715 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
2724 let acl_ev = self
2725 .get_state_event_static::<RoomServerAclEventContent>()
2726 .await?
2727 .and_then(|ev| ev.deserialize().ok());
2728 let acl = acl_ev.as_ref().and_then(|ev| match ev {
2729 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
2730 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
2731 });
2732
2733 let members: Vec<_> = self
2737 .members_no_sync(RoomMemberships::JOIN)
2738 .await?
2739 .into_iter()
2740 .filter(|member| {
2741 let server = member.user_id().server_name();
2742 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
2743 })
2744 .collect();
2745
2746 let max = members
2749 .iter()
2750 .max_by_key(|member| member.power_level())
2751 .filter(|max| max.power_level() >= 50)
2752 .map(|member| member.user_id().server_name());
2753
2754 let servers = members
2756 .iter()
2757 .map(|member| member.user_id().server_name())
2758 .filter(|server| max.filter(|max| max == server).is_none())
2759 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
2760 *servers.entry(server).or_default() += 1;
2761 servers
2762 });
2763 let mut servers: Vec<_> = servers.into_iter().collect();
2764 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
2765
2766 Ok(max
2767 .into_iter()
2768 .chain(servers.into_iter().map(|(name, _)| name))
2769 .take(3)
2770 .map(ToOwned::to_owned)
2771 .collect())
2772 }
2773
2774 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
2781 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2782 return Ok(alias.matrix_to_uri());
2783 }
2784
2785 let via = self.route().await?;
2786 Ok(self.room_id().matrix_to_uri_via(via))
2787 }
2788
2789 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
2800 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2801 return Ok(alias.matrix_uri(join));
2802 }
2803
2804 let via = self.route().await?;
2805 Ok(self.room_id().matrix_uri_via(via, join))
2806 }
2807
2808 pub async fn matrix_to_event_permalink(
2822 &self,
2823 event_id: impl Into<OwnedEventId>,
2824 ) -> Result<MatrixToUri> {
2825 let via = self.route().await?;
2828 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
2829 }
2830
2831 pub async fn matrix_event_permalink(
2845 &self,
2846 event_id: impl Into<OwnedEventId>,
2847 ) -> Result<MatrixUri> {
2848 let via = self.route().await?;
2851 Ok(self.room_id().matrix_event_uri_via(event_id, via))
2852 }
2853
2854 pub async fn load_user_receipt(
2867 &self,
2868 receipt_type: ReceiptType,
2869 thread: ReceiptThread,
2870 user_id: &UserId,
2871 ) -> Result<Option<(OwnedEventId, Receipt)>> {
2872 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
2873 }
2874
2875 pub async fn load_event_receipts(
2888 &self,
2889 receipt_type: ReceiptType,
2890 thread: ReceiptThread,
2891 event_id: &EventId,
2892 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
2893 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
2894 }
2895
2896 pub async fn push_context(&self) -> Result<Option<PushConditionRoomCtx>> {
2901 let room_id = self.room_id();
2902 let user_id = self.own_user_id();
2903 let room_info = self.clone_info();
2904 let member_count = room_info.active_members_count();
2905
2906 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
2907 member.name().to_owned()
2908 } else {
2909 return Ok(None);
2910 };
2911
2912 let power_levels = self
2913 .get_state_event_static::<RoomPowerLevelsEventContent>()
2914 .await?
2915 .and_then(|e| e.deserialize().ok())
2916 .map(|e| e.power_levels().into());
2917
2918 Ok(Some(PushConditionRoomCtx {
2919 user_id: user_id.to_owned(),
2920 room_id: room_id.to_owned(),
2921 member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
2922 user_display_name,
2923 power_levels,
2924 }))
2925 }
2926
2927 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
2932 let Some(push_context) = self.push_context().await? else {
2933 debug!("Could not aggregate push context");
2934 return Ok(None);
2935 };
2936
2937 let push_rules = self.client().account().push_rules().await?;
2938
2939 Ok(Some(push_rules.get_actions(event, &push_context).to_owned()))
2940 }
2941
2942 pub async fn invite_details(&self) -> Result<Invite> {
2945 let state = self.state();
2946 if state != RoomState::Invited {
2947 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
2948 }
2949
2950 let invitee = self
2951 .get_member_no_sync(self.own_user_id())
2952 .await?
2953 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
2954 let event = invitee.event();
2955 let inviter_id = event.sender();
2956 let inviter = self.get_member_no_sync(inviter_id).await?;
2957 Ok(Invite { invitee, inviter })
2958 }
2959
2960 pub async fn member_with_sender_info(
2968 &self,
2969 user_id: &UserId,
2970 ) -> Result<RoomMemberWithSenderInfo> {
2971 let Some(member) = self.get_member_no_sync(user_id).await? else {
2972 return Err(Error::InsufficientData);
2973 };
2974
2975 let sender_member =
2976 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
2977 Some(member)
2979 } else if self.are_members_synced() {
2980 None
2982 } else if self.sync_members().await.is_ok() {
2983 self.get_member_no_sync(member.event().sender()).await?
2985 } else {
2986 None
2987 };
2988
2989 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
2990 }
2991
2992 pub async fn forget(&self) -> Result<()> {
2998 let state = self.state();
2999 match state {
3000 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3001 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3002 "Left / Banned",
3003 state,
3004 ))));
3005 }
3006 RoomState::Left | RoomState::Banned => {}
3007 }
3008
3009 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3010 let _response = self.client.send(request).await?;
3011
3012 if self.inner.direct_targets_length() != 0 {
3014 if let Err(e) = self.set_is_direct(false).await {
3015 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3018 }
3019 }
3020
3021 self.client.base_client().forget_room(self.inner.room_id()).await?;
3022
3023 Ok(())
3024 }
3025
3026 fn ensure_room_joined(&self) -> Result<()> {
3027 let state = self.state();
3028 if state == RoomState::Joined {
3029 Ok(())
3030 } else {
3031 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3032 }
3033 }
3034
3035 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3037 if !matches!(self.state(), RoomState::Joined) {
3038 return None;
3039 }
3040
3041 let notification_settings = self.client().notification_settings().await;
3042
3043 let notification_mode =
3045 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3046
3047 if notification_mode.is_some() {
3048 notification_mode
3049 } else if let Ok(is_encrypted) =
3050 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3051 {
3052 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3057 let default_mode = notification_settings
3058 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3059 .await;
3060 Some(default_mode)
3061 } else {
3062 None
3063 }
3064 }
3065
3066 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3077 if !matches!(self.state(), RoomState::Joined) {
3078 return None;
3079 }
3080
3081 let notification_settings = self.client().notification_settings().await;
3082
3083 let mode =
3085 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3086
3087 if let Some(mode) = mode {
3088 self.update_cached_user_defined_notification_mode(mode);
3089 }
3090
3091 mode
3092 }
3093
3094 pub async fn report_content(
3107 &self,
3108 event_id: OwnedEventId,
3109 score: Option<ReportedContentScore>,
3110 reason: Option<String>,
3111 ) -> Result<report_content::v3::Response> {
3112 let state = self.state();
3113 if state != RoomState::Joined {
3114 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3115 }
3116
3117 let request = report_content::v3::Request::new(
3118 self.inner.room_id().to_owned(),
3119 event_id,
3120 score.map(Into::into),
3121 reason,
3122 );
3123 Ok(self.client.send(request).await?)
3124 }
3125
3126 pub async fn report_room(&self, reason: Option<String>) -> Result<report_room::v3::Response> {
3137 let mut request = report_room::v3::Request::new(self.inner.room_id().to_owned());
3138 request.reason = reason;
3139
3140 Ok(self.client.send(request).await?)
3141 }
3142
3143 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3146 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3147
3148 let content = UnstableMarkedUnreadEventContent::from(MarkedUnreadEventContent::new(unread));
3149
3150 let request = set_room_account_data::v3::Request::new(
3151 user_id.to_owned(),
3152 self.inner.room_id().to_owned(),
3153 &content,
3154 )?;
3155
3156 self.client.send(request).await?;
3157 Ok(())
3158 }
3159
3160 pub async fn event_cache(
3163 &self,
3164 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3165 self.client.event_cache().for_room(self.room_id()).await
3166 }
3167
3168 pub async fn send_call_notification_if_needed(&self) -> Result<()> {
3178 if self.has_active_room_call() {
3179 return Ok(());
3180 }
3181
3182 if !self.can_user_trigger_room_notification(self.own_user_id()).await? {
3183 return Ok(());
3184 }
3185
3186 self.send_call_notification(
3187 self.room_id().to_string().to_owned(),
3188 ApplicationType::Call,
3189 if self.is_direct().await.unwrap_or(false) {
3190 NotifyType::Ring
3191 } else {
3192 NotifyType::Notify
3193 },
3194 Mentions::with_room_mention(),
3195 )
3196 .await?;
3197
3198 Ok(())
3199 }
3200
3201 pub(crate) async fn get_user_beacon_info(
3208 &self,
3209 user_id: &UserId,
3210 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3211 let raw_event = self
3212 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3213 .await?
3214 .ok_or(BeaconError::NotFound)?;
3215
3216 match raw_event.deserialize()? {
3217 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3218 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3219 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3220 }
3221 }
3222
3223 pub async fn start_live_location_share(
3236 &self,
3237 duration_millis: u64,
3238 description: Option<String>,
3239 ) -> Result<send_state_event::v3::Response> {
3240 self.ensure_room_joined()?;
3241
3242 self.send_state_event_for_key(
3243 self.own_user_id(),
3244 BeaconInfoEventContent::new(
3245 description,
3246 Duration::from_millis(duration_millis),
3247 true,
3248 None,
3249 ),
3250 )
3251 .await
3252 }
3253
3254 pub async fn stop_live_location_share(
3261 &self,
3262 ) -> Result<send_state_event::v3::Response, BeaconError> {
3263 self.ensure_room_joined()?;
3264
3265 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3266 beacon_info_event.content.stop();
3267 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3268 }
3269
3270 pub async fn send_location_beacon(
3282 &self,
3283 geo_uri: String,
3284 ) -> Result<send_message_event::v3::Response, BeaconError> {
3285 self.ensure_room_joined()?;
3286
3287 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3288
3289 if beacon_info_event.content.is_live() {
3290 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3291 Ok(self.send(content).await?)
3292 } else {
3293 Err(BeaconError::NotLive)
3294 }
3295 }
3296
3297 pub async fn send_call_notification(
3309 &self,
3310 call_id: String,
3311 application: ApplicationType,
3312 notify_type: NotifyType,
3313 mentions: Mentions,
3314 ) -> Result<()> {
3315 let call_notify_event_content =
3316 CallNotifyEventContent::new(call_id, application, notify_type, mentions);
3317 self.send(call_notify_event_content).await?;
3318 Ok(())
3319 }
3320
3321 pub async fn save_composer_draft(&self, draft: ComposerDraft) -> Result<()> {
3324 self.client
3325 .state_store()
3326 .set_kv_data(
3327 StateStoreDataKey::ComposerDraft(self.room_id()),
3328 StateStoreDataValue::ComposerDraft(draft),
3329 )
3330 .await?;
3331 Ok(())
3332 }
3333
3334 pub async fn load_composer_draft(&self) -> Result<Option<ComposerDraft>> {
3336 let data = self
3337 .client
3338 .state_store()
3339 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id()))
3340 .await?;
3341 Ok(data.and_then(|d| d.into_composer_draft()))
3342 }
3343
3344 pub async fn clear_composer_draft(&self) -> Result<()> {
3346 self.client
3347 .state_store()
3348 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id()))
3349 .await?;
3350 Ok(())
3351 }
3352
3353 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3356 let response = self
3357 .client
3358 .send(get_state_events_for_key::v3::Request::new(
3359 self.room_id().to_owned(),
3360 StateEventType::RoomPinnedEvents,
3361 "".to_owned(),
3362 ))
3363 .await;
3364
3365 match response {
3366 Ok(response) => {
3367 Ok(Some(response.content.deserialize_as::<RoomPinnedEventsEventContent>()?.pinned))
3368 }
3369 Err(http_error) => match http_error.as_client_api_error() {
3370 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3371 _ => Err(http_error.into()),
3372 },
3373 }
3374 }
3375
3376 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3384 ObservableLiveLocation::new(&self.client, self.room_id())
3385 }
3386
3387 pub async fn subscribe_to_knock_requests(
3401 &self,
3402 ) -> Result<(impl Stream<Item = Vec<KnockRequest>>, JoinHandle<()>)> {
3403 let this = Arc::new(self.clone());
3404
3405 let room_member_events_observer =
3406 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3407
3408 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3409 let mut seen_request_ids_stream = self
3410 .seen_knock_request_ids_map
3411 .subscribe()
3412 .await
3413 .map(|values| values.unwrap_or_default());
3414
3415 let mut room_info_stream = self.subscribe_info();
3416
3417 let clear_seen_ids_handle = spawn({
3420 let this = self.clone();
3421 async move {
3422 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3423 while member_updates_stream.recv().await.is_ok() {
3424 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3426 warn!("Failed to remove seen knock requests: {err}")
3427 }
3428 }
3429 }
3430 });
3431
3432 let combined_stream = stream! {
3433 match this.get_current_join_requests(¤t_seen_ids).await {
3435 Ok(initial_requests) => yield initial_requests,
3436 Err(err) => warn!("Failed to get initial requests to join: {err}")
3437 }
3438
3439 let mut requests_stream = room_member_events_observer.subscribe();
3440 let mut seen_ids = current_seen_ids.clone();
3441
3442 loop {
3443 tokio::select! {
3446 Some((event, _)) = requests_stream.next() => {
3447 if let Some(event) = event.as_original() {
3448 let emit = if event.prev_content().is_some() {
3450 matches!(event.membership_change(),
3451 MembershipChange::Banned |
3452 MembershipChange::Knocked |
3453 MembershipChange::KnockAccepted |
3454 MembershipChange::KnockDenied |
3455 MembershipChange::KnockRetracted
3456 )
3457 } else {
3458 true
3461 };
3462
3463 if emit {
3464 match this.get_current_join_requests(&seen_ids).await {
3465 Ok(requests) => yield requests,
3466 Err(err) => {
3467 warn!("Failed to get updated knock requests on new member event: {err}")
3468 }
3469 }
3470 }
3471 }
3472 }
3473
3474 Some(new_seen_ids) = seen_request_ids_stream.next() => {
3475 seen_ids = new_seen_ids;
3477
3478 match this.get_current_join_requests(&seen_ids).await {
3481 Ok(requests) => yield requests,
3482 Err(err) => {
3483 warn!("Failed to get updated knock requests on seen ids changed: {err}")
3484 }
3485 }
3486 }
3487
3488 Some(room_info) = room_info_stream.next() => {
3489 if !room_info.are_members_synced() {
3492 match this.get_current_join_requests(&seen_ids).await {
3493 Ok(requests) => yield requests,
3494 Err(err) => {
3495 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
3496 }
3497 }
3498 }
3499 }
3500 else => break,
3502 }
3503 }
3504 };
3505
3506 Ok((combined_stream, clear_seen_ids_handle))
3507 }
3508
3509 async fn get_current_join_requests(
3510 &self,
3511 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
3512 ) -> Result<Vec<KnockRequest>> {
3513 Ok(self
3514 .members(RoomMemberships::KNOCK)
3515 .await?
3516 .into_iter()
3517 .filter_map(|member| {
3518 let event_id = member.event().event_id()?;
3519 Some(KnockRequest::new(
3520 self,
3521 event_id,
3522 member.event().timestamp(),
3523 KnockRequestMemberInfo::from_member(&member),
3524 seen_request_ids.contains_key(event_id),
3525 ))
3526 })
3527 .collect())
3528 }
3529
3530 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
3532 RoomPrivacySettings::new(&self.inner, &self.client)
3533 }
3534}
3535
3536#[cfg(all(feature = "e2e-encryption", not(target_arch = "wasm32")))]
3537impl RoomIdentityProvider for Room {
3538 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
3539 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
3540 }
3541
3542 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
3543 Box::pin(async {
3544 let members = self
3545 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
3546 .await
3547 .unwrap_or_else(|_| Default::default());
3548
3549 let mut ret: Vec<UserIdentity> = Vec::new();
3550 for member in members {
3551 if let Some(i) = self.user_identity(member.user_id()).await {
3552 ret.push(i);
3553 }
3554 }
3555 ret
3556 })
3557 }
3558
3559 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
3560 Box::pin(async {
3561 self.client
3562 .encryption()
3563 .get_user_identity(user_id)
3564 .await
3565 .unwrap_or(None)
3566 .map(|u| u.underlying_identity())
3567 })
3568 }
3569}
3570
3571#[derive(Clone)]
3574pub(crate) struct WeakRoom {
3575 client: WeakClient,
3576 room_id: OwnedRoomId,
3577}
3578
3579impl WeakRoom {
3580 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
3582 Self { client, room_id }
3583 }
3584
3585 pub fn get(&self) -> Option<Room> {
3587 self.client.get().and_then(|client| client.get_room(&self.room_id))
3588 }
3589
3590 pub fn room_id(&self) -> &RoomId {
3592 &self.room_id
3593 }
3594}
3595
3596#[derive(Debug, Clone)]
3598pub struct Invite {
3599 pub invitee: RoomMember,
3601 pub inviter: Option<RoomMember>,
3603}
3604
3605#[derive(Error, Debug)]
3606enum InvitationError {
3607 #[error("No membership event found")]
3608 EventMissing,
3609}
3610
3611#[derive(Debug, Clone, Default)]
3613#[non_exhaustive]
3614pub struct Receipts {
3615 pub fully_read: Option<OwnedEventId>,
3617 pub public_read_receipt: Option<OwnedEventId>,
3619 pub private_read_receipt: Option<OwnedEventId>,
3621}
3622
3623impl Receipts {
3624 pub fn new() -> Self {
3626 Self::default()
3627 }
3628
3629 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3638 self.fully_read = event_id.into();
3639 self
3640 }
3641
3642 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3648 self.public_read_receipt = event_id.into();
3649 self
3650 }
3651
3652 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3656 self.private_read_receipt = event_id.into();
3657 self
3658 }
3659
3660 pub fn is_empty(&self) -> bool {
3662 self.fully_read.is_none()
3663 && self.public_read_receipt.is_none()
3664 && self.private_read_receipt.is_none()
3665 }
3666}
3667
3668#[derive(Debug)]
3671pub enum ParentSpace {
3672 Reciprocal(Room),
3675 WithPowerlevel(Room),
3680 Illegitimate(Room),
3683 Unverifiable(OwnedRoomId),
3686}
3687
3688#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
3692pub struct ReportedContentScore(i8);
3693
3694impl ReportedContentScore {
3695 pub const MIN: Self = Self(-100);
3699
3700 pub const MAX: Self = Self(0);
3704
3705 pub fn new(value: i8) -> Option<Self> {
3714 value.try_into().ok()
3715 }
3716
3717 pub fn new_saturating(value: i8) -> Self {
3723 if value > Self::MAX {
3724 Self::MAX
3725 } else if value < Self::MIN {
3726 Self::MIN
3727 } else {
3728 Self(value)
3729 }
3730 }
3731
3732 pub fn value(&self) -> i8 {
3734 self.0
3735 }
3736}
3737
3738impl PartialEq<i8> for ReportedContentScore {
3739 fn eq(&self, other: &i8) -> bool {
3740 self.0.eq(other)
3741 }
3742}
3743
3744impl PartialEq<ReportedContentScore> for i8 {
3745 fn eq(&self, other: &ReportedContentScore) -> bool {
3746 self.eq(&other.0)
3747 }
3748}
3749
3750impl PartialOrd<i8> for ReportedContentScore {
3751 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
3752 self.0.partial_cmp(other)
3753 }
3754}
3755
3756impl PartialOrd<ReportedContentScore> for i8 {
3757 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
3758 self.partial_cmp(&other.0)
3759 }
3760}
3761
3762impl From<ReportedContentScore> for Int {
3763 fn from(value: ReportedContentScore) -> Self {
3764 value.0.into()
3765 }
3766}
3767
3768impl TryFrom<i8> for ReportedContentScore {
3769 type Error = TryFromReportedContentScoreError;
3770
3771 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
3772 if value > Self::MAX || value < Self::MIN {
3773 Err(TryFromReportedContentScoreError(()))
3774 } else {
3775 Ok(Self(value))
3776 }
3777 }
3778}
3779
3780impl TryFrom<i16> for ReportedContentScore {
3781 type Error = TryFromReportedContentScoreError;
3782
3783 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
3784 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3785 value.try_into()
3786 }
3787}
3788
3789impl TryFrom<i32> for ReportedContentScore {
3790 type Error = TryFromReportedContentScoreError;
3791
3792 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
3793 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3794 value.try_into()
3795 }
3796}
3797
3798impl TryFrom<i64> for ReportedContentScore {
3799 type Error = TryFromReportedContentScoreError;
3800
3801 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
3802 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3803 value.try_into()
3804 }
3805}
3806
3807impl TryFrom<Int> for ReportedContentScore {
3808 type Error = TryFromReportedContentScoreError;
3809
3810 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
3811 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3812 value.try_into()
3813 }
3814}
3815
3816trait EventSource {
3817 fn get_event(
3818 &self,
3819 event_id: &EventId,
3820 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
3821}
3822
3823impl EventSource for &Room {
3824 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
3825 self.load_or_fetch_event(event_id, None).await
3826 }
3827}
3828
3829#[derive(Debug, Clone, Error)]
3832#[error("out of range conversion attempted")]
3833pub struct TryFromReportedContentScoreError(());
3834
3835#[derive(Debug)]
3838pub struct RoomMemberWithSenderInfo {
3839 pub room_member: RoomMember,
3841 pub sender_info: Option<RoomMember>,
3844}
3845
3846#[cfg(all(test, not(target_arch = "wasm32")))]
3847mod tests {
3848 use matrix_sdk_base::{store::ComposerDraftType, ComposerDraft};
3849 use matrix_sdk_test::{
3850 async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent,
3851 SyncResponseBuilder,
3852 };
3853 use ruma::{event_id, events::room::member::MembershipState, int, room_id, user_id};
3854 use wiremock::{
3855 matchers::{header, method, path_regex},
3856 Mock, MockServer, ResponseTemplate,
3857 };
3858
3859 use super::ReportedContentScore;
3860 use crate::{
3861 config::RequestConfig,
3862 test_utils::{client::mock_matrix_session, logged_in_client, mocks::MatrixMockServer},
3863 Client,
3864 };
3865
3866 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
3867 #[async_test]
3868 async fn test_cache_invalidation_while_encrypt() {
3869 use matrix_sdk_base::store::RoomLoadSettings;
3870 use matrix_sdk_test::{message_like_event_content, DEFAULT_TEST_ROOM_ID};
3871
3872 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
3873 let session = mock_matrix_session();
3874
3875 let client = Client::builder()
3876 .homeserver_url("http://localhost:1234")
3877 .request_config(RequestConfig::new().disable_retry())
3878 .sqlite_store(&sqlite_path, None)
3879 .build()
3880 .await
3881 .unwrap();
3882 client
3883 .matrix_auth()
3884 .restore_session(session.clone(), RoomLoadSettings::default())
3885 .await
3886 .unwrap();
3887
3888 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
3889
3890 let server = MockServer::start().await;
3892 {
3893 Mock::given(method("GET"))
3894 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
3895 .and(header("authorization", "Bearer 1234"))
3896 .respond_with(
3897 ResponseTemplate::new(200)
3898 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
3899 )
3900 .mount(&server)
3901 .await;
3902 let response = SyncResponseBuilder::default()
3903 .add_joined_room(
3904 JoinedRoomBuilder::default()
3905 .add_state_event(StateTestEvent::Member)
3906 .add_state_event(StateTestEvent::PowerLevels)
3907 .add_state_event(StateTestEvent::Encryption),
3908 )
3909 .build_sync_response();
3910 client.base_client().receive_sync_response(response).await.unwrap();
3911 }
3912
3913 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
3914
3915 room.preshare_room_key().await.unwrap();
3917
3918 {
3921 let client = Client::builder()
3922 .homeserver_url("http://localhost:1234")
3923 .request_config(RequestConfig::new().disable_retry())
3924 .sqlite_store(&sqlite_path, None)
3925 .build()
3926 .await
3927 .unwrap();
3928 client
3929 .matrix_auth()
3930 .restore_session(session.clone(), RoomLoadSettings::default())
3931 .await
3932 .unwrap();
3933 client
3934 .encryption()
3935 .enable_cross_process_store_lock("client2".to_owned())
3936 .await
3937 .unwrap();
3938
3939 let guard = client.encryption().spin_lock_store(None).await.unwrap();
3940 assert!(guard.is_some());
3941 }
3942
3943 let guard = client.encryption().spin_lock_store(None).await.unwrap();
3945 assert!(guard.is_some());
3946
3947 let olm = client.olm_machine().await;
3949 let olm = olm.as_ref().expect("Olm machine wasn't started");
3950
3951 let _encrypted_content = olm
3954 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
3955 .await
3956 .unwrap();
3957 }
3958
3959 #[test]
3960 fn reported_content_score() {
3961 let score = ReportedContentScore::new(0).unwrap();
3963 assert_eq!(score.value(), 0);
3964 let score = ReportedContentScore::new(-50).unwrap();
3965 assert_eq!(score.value(), -50);
3966 let score = ReportedContentScore::new(-100).unwrap();
3967 assert_eq!(score.value(), -100);
3968 assert_eq!(ReportedContentScore::new(10), None);
3969 assert_eq!(ReportedContentScore::new(-110), None);
3970
3971 let score = ReportedContentScore::new_saturating(0);
3972 assert_eq!(score.value(), 0);
3973 let score = ReportedContentScore::new_saturating(-50);
3974 assert_eq!(score.value(), -50);
3975 let score = ReportedContentScore::new_saturating(-100);
3976 assert_eq!(score.value(), -100);
3977 let score = ReportedContentScore::new_saturating(10);
3978 assert_eq!(score, ReportedContentScore::MAX);
3979 let score = ReportedContentScore::new_saturating(-110);
3980 assert_eq!(score, ReportedContentScore::MIN);
3981
3982 let score = ReportedContentScore::try_from(0i16).unwrap();
3984 assert_eq!(score.value(), 0);
3985 let score = ReportedContentScore::try_from(-100i16).unwrap();
3986 assert_eq!(score.value(), -100);
3987 ReportedContentScore::try_from(10i16).unwrap_err();
3988 ReportedContentScore::try_from(-110i16).unwrap_err();
3989
3990 let score = ReportedContentScore::try_from(0i32).unwrap();
3992 assert_eq!(score.value(), 0);
3993 let score = ReportedContentScore::try_from(-100i32).unwrap();
3994 assert_eq!(score.value(), -100);
3995 ReportedContentScore::try_from(10i32).unwrap_err();
3996 ReportedContentScore::try_from(-110i32).unwrap_err();
3997
3998 let score = ReportedContentScore::try_from(0i64).unwrap();
4000 assert_eq!(score.value(), 0);
4001 let score = ReportedContentScore::try_from(-100i64).unwrap();
4002 assert_eq!(score.value(), -100);
4003 ReportedContentScore::try_from(10i64).unwrap_err();
4004 ReportedContentScore::try_from(-110i64).unwrap_err();
4005
4006 let score = ReportedContentScore::try_from(int!(0)).unwrap();
4008 assert_eq!(score.value(), 0);
4009 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4010 assert_eq!(score.value(), -100);
4011 ReportedContentScore::try_from(int!(10)).unwrap_err();
4012 ReportedContentScore::try_from(int!(-110)).unwrap_err();
4013 }
4014
4015 #[async_test]
4016 async fn test_composer_draft() {
4017 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4018
4019 let client = logged_in_client(None).await;
4020
4021 let response = SyncResponseBuilder::default()
4022 .add_joined_room(JoinedRoomBuilder::default())
4023 .build_sync_response();
4024 client.base_client().receive_sync_response(response).await.unwrap();
4025 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4026
4027 assert_eq!(room.load_composer_draft().await.unwrap(), None);
4028
4029 let draft = ComposerDraft {
4030 plain_text: "Hello, world!".to_owned(),
4031 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4032 draft_type: ComposerDraftType::NewMessage,
4033 };
4034 room.save_composer_draft(draft.clone()).await.unwrap();
4035 assert_eq!(room.load_composer_draft().await.unwrap(), Some(draft));
4036
4037 room.clear_composer_draft().await.unwrap();
4038 assert_eq!(room.load_composer_draft().await.unwrap(), None);
4039 }
4040
4041 #[async_test]
4042 async fn test_mark_join_requests_as_seen() {
4043 let server = MatrixMockServer::new().await;
4044 let client = server.client_builder().build().await;
4045 let event_id = event_id!("$a:b.c");
4046 let room_id = room_id!("!a:b.c");
4047 let user_id = user_id!("@alice:b.c");
4048
4049 let f = EventFactory::new().room(room_id);
4050 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f
4051 .member(user_id)
4052 .membership(MembershipState::Knock)
4053 .event_id(event_id)
4054 .into_raw_timeline()
4055 .cast()]);
4056 let room = server.sync_room(&client, joined_room_builder).await;
4057
4058 let seen_ids =
4060 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4061 assert!(seen_ids.is_empty());
4062
4063 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4065 .await
4066 .expect("Couldn't mark join request as seen");
4067
4068 let seen_ids =
4070 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4071 assert_eq!(seen_ids.len(), 1);
4072 assert_eq!(
4073 seen_ids.into_iter().next().expect("No next value"),
4074 (event_id.to_owned(), user_id.to_owned())
4075 )
4076 }
4077
4078 #[async_test]
4079 async fn test_own_room_membership_with_no_own_member_event() {
4080 let server = MatrixMockServer::new().await;
4081 let client = server.client_builder().build().await;
4082 let room_id = room_id!("!a:b.c");
4083
4084 let room = server.sync_joined_room(&client, room_id).await;
4085
4086 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4089 assert!(error.is_some());
4090 }
4091
4092 #[async_test]
4093 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4094 let server = MatrixMockServer::new().await;
4095 let client = server.client_builder().build().await;
4096 let room_id = room_id!("!a:b.c");
4097 let user_id = user_id!("@example:localhost");
4098
4099 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4100 let joined_room_builder = JoinedRoomBuilder::new(room_id)
4101 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
4102 let room = server.sync_room(&client, joined_room_builder).await;
4103
4104 let ret = room
4106 .member_with_sender_info(client.user_id().unwrap())
4107 .await
4108 .expect("Room member info should be available");
4109
4110 assert_eq!(ret.room_member.event().user_id(), user_id);
4112
4113 assert!(ret.sender_info.is_none());
4115 }
4116
4117 #[async_test]
4118 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4119 let server = MatrixMockServer::new().await;
4120 let client = server.client_builder().build().await;
4121 let room_id = room_id!("!a:b.c");
4122 let user_id = user_id!("@example:localhost");
4123
4124 let f = EventFactory::new().room(room_id).sender(user_id);
4125 let joined_room_builder = JoinedRoomBuilder::new(room_id)
4126 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
4127 let room = server.sync_room(&client, joined_room_builder).await;
4128
4129 let ret = room
4131 .member_with_sender_info(client.user_id().unwrap())
4132 .await
4133 .expect("Room member info should be available");
4134
4135 assert_eq!(ret.room_member.event().user_id(), user_id);
4137
4138 assert!(ret.sender_info.is_some());
4140 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
4141 }
4142
4143 #[async_test]
4144 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
4145 let server = MatrixMockServer::new().await;
4146 let client = server.client_builder().build().await;
4147 let room_id = room_id!("!a:b.c");
4148 let user_id = user_id!("@example:localhost");
4149 let sender_id = user_id!("@alice:b.c");
4150
4151 let f = EventFactory::new().room(room_id).sender(sender_id);
4152 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4153 f.member(user_id).into_raw_sync().cast(),
4154 f.member(sender_id).into_raw_sync().cast(),
4156 ]);
4157 let room = server.sync_room(&client, joined_room_builder).await;
4158
4159 let ret = room
4161 .member_with_sender_info(client.user_id().unwrap())
4162 .await
4163 .expect("Room member info should be available");
4164
4165 assert_eq!(ret.room_member.event().user_id(), user_id);
4167
4168 assert!(ret.sender_info.is_some());
4170 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
4171 }
4172
4173 #[async_test]
4174 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
4175 let server = MatrixMockServer::new().await;
4176 let client = server.client_builder().build().await;
4177 let room_id = room_id!("!a:b.c");
4178 let user_id = user_id!("@example:localhost");
4179 let sender_id = user_id!("@alice:b.c");
4180
4181 let f = EventFactory::new().room(room_id).sender(sender_id);
4182 let joined_room_builder = JoinedRoomBuilder::new(room_id)
4183 .add_state_bulk(vec![f.member(user_id).into_raw_sync().cast()]);
4184 let room = server.sync_room(&client, joined_room_builder).await;
4185
4186 server
4188 .mock_get_members()
4189 .ok(vec![f.member(sender_id).into_raw_timeline().cast()])
4190 .mock_once()
4191 .mount()
4192 .await;
4193
4194 let ret = room
4196 .member_with_sender_info(client.user_id().unwrap())
4197 .await
4198 .expect("Room member info should be available");
4199
4200 assert_eq!(ret.room_member.event().user_id(), user_id);
4202
4203 assert!(ret.sender_info.is_some());
4205 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
4206 }
4207}