1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{future::join_all, stream::FuturesUnordered};
30use http::StatusCode;
31#[cfg(feature = "e2e-encryption")]
32pub use identity_status_changes::IdentityStatusChanges;
33#[cfg(feature = "e2e-encryption")]
34use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
35#[cfg(feature = "e2e-encryption")]
36use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
37use matrix_sdk_base::{
38 deserialized_responses::{
39 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
40 },
41 event_cache::store::media::IgnoreMediaRetentionPolicy,
42 media::MediaThumbnailSettings,
43 store::StateStoreExt,
44 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
45 StateChanges, StateStoreDataKey, StateStoreDataValue,
46};
47#[cfg(feature = "e2e-encryption")]
48use matrix_sdk_common::BoxFuture;
49use matrix_sdk_common::{
50 deserialized_responses::TimelineEvent,
51 executor::{spawn, JoinHandle},
52 timeout::timeout,
53};
54use mime::Mime;
55use reply::Reply;
56#[cfg(feature = "unstable-msc4274")]
57use ruma::events::room::message::GalleryItemType;
58#[cfg(feature = "e2e-encryption")]
59use ruma::events::{
60 room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
61 SyncMessageLikeEvent,
62};
63use ruma::{
64 api::client::{
65 config::{set_global_account_data, set_room_account_data},
66 context,
67 error::ErrorKind,
68 filter::LazyLoadOptions,
69 membership::{
70 ban_user, forget_room, get_member_events,
71 invite_user::{self, v3::InvitationRecipient},
72 kick_user, leave_room, unban_user, Invite3pid,
73 },
74 message::send_message_event,
75 read_marker::set_read_marker,
76 receipt::create_receipt,
77 redact::redact_event,
78 room::{get_room_event, report_content, report_room},
79 state::{get_state_events_for_key, send_state_event},
80 tag::{create_tag, delete_tag},
81 typing::create_typing_event::{self, v3::Typing},
82 },
83 assign,
84 events::{
85 beacon::BeaconEventContent,
86 beacon_info::BeaconInfoEventContent,
87 call::notify::{ApplicationType, CallNotifyEventContent, NotifyType},
88 direct::DirectEventContent,
89 marked_unread::MarkedUnreadEventContent,
90 receipt::{Receipt, ReceiptThread, ReceiptType},
91 room::{
92 avatar::{self, RoomAvatarEventContent},
93 encryption::RoomEncryptionEventContent,
94 history_visibility::HistoryVisibility,
95 member::{MembershipChange, SyncRoomMemberEvent},
96 message::{
97 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
98 FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
99 UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo,
100 VideoMessageEventContent,
101 },
102 name::RoomNameEventContent,
103 pinned_events::RoomPinnedEventsEventContent,
104 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent},
105 server_acl::RoomServerAclEventContent,
106 topic::RoomTopicEventContent,
107 ImageInfo, MediaSource, ThumbnailInfo,
108 },
109 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
110 tag::{TagInfo, TagName},
111 typing::SyncTypingEvent,
112 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
113 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
114 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
115 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
116 StaticStateEventContent, SyncStateEvent,
117 },
118 push::{Action, PushConditionRoomCtx, Ruleset},
119 serde::Raw,
120 time::Instant,
121 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
122 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
123};
124use serde::de::DeserializeOwned;
125use thiserror::Error;
126use tokio::{join, sync::broadcast};
127use tokio_stream::StreamExt;
128use tracing::{debug, error, info, instrument, trace, warn};
129
130use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
131pub use self::{
132 member::{RoomMember, RoomMemberRole},
133 messages::{
134 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
135 Relations, RelationsOptions, ThreadRoots,
136 },
137};
138#[cfg(doc)]
139use crate::event_cache::EventCache;
140use crate::{
141 attachment::{AttachmentConfig, AttachmentInfo},
142 client::WeakClient,
143 config::RequestConfig,
144 error::{BeaconError, WrongRoomState},
145 event_cache::{self, EventCacheDropHandles, RoomEventCache},
146 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
147 live_location_share::ObservableLiveLocation,
148 media::{MediaFormat, MediaRequestParameters},
149 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
150 room::{
151 knock_requests::{KnockRequest, KnockRequestMemberInfo},
152 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
153 privacy_settings::RoomPrivacySettings,
154 },
155 sync::RoomUpdate,
156 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
157 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
158};
159#[cfg(feature = "e2e-encryption")]
160use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
161
162pub mod edit;
163pub mod futures;
164pub mod identity_status_changes;
165pub mod knock_requests;
167mod member;
168mod messages;
169pub mod power_levels;
170pub mod reply;
171
172pub mod privacy_settings;
174
175#[cfg(feature = "e2e-encryption")]
176pub(crate) mod shared_room_history;
177
178#[derive(Debug, Clone)]
181pub struct Room {
182 inner: BaseRoom,
183 pub(crate) client: Client,
184}
185
186impl Deref for Room {
187 type Target = BaseRoom;
188
189 fn deref(&self) -> &Self::Target {
190 &self.inner
191 }
192}
193
194const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
195const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
196
197#[derive(Debug)]
199pub struct PushContext {
200 push_condition_room_ctx: PushConditionRoomCtx,
202
203 push_rules: Ruleset,
206}
207
208impl PushContext {
209 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
211 Self { push_condition_room_ctx, push_rules }
212 }
213
214 pub fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
216 self.push_rules.get_actions(event, &self.push_condition_room_ctx).to_owned()
217 }
218}
219
220macro_rules! make_media_type {
221 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $formatted_caption: ident, $info: ident, $thumbnail: ident) => {{
222 let (body, filename) = match $caption {
226 Some(caption) => (caption, Some($filename)),
227 None => ($filename, None),
228 };
229
230 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
231
232 match $content_type.type_() {
233 mime::IMAGE => {
234 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
235 mimetype: Some($content_type.as_ref().to_owned()),
236 thumbnail_source,
237 thumbnail_info
238 });
239 let content = assign!(ImageMessageEventContent::new(body, $source), {
240 info: Some(Box::new(info)),
241 formatted: $formatted_caption,
242 filename
243 });
244 <$t>::Image(content)
245 }
246
247 mime::AUDIO => {
248 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
249 formatted: $formatted_caption,
250 filename
251 });
252
253 if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) =
254 &$info
255 {
256 if let Some(duration) = audio_info.duration {
257 let waveform = waveform_vec.iter().map(|v| (*v).into()).collect();
258 content.audio =
259 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
260 }
261 content.voice = Some(UnstableVoiceContentBlock::new());
262 }
263
264 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
265 audio_info.mimetype = Some($content_type.as_ref().to_owned());
266 let content = content.info(Box::new(audio_info));
267
268 <$t>::Audio(content)
269 }
270
271 mime::VIDEO => {
272 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
273 mimetype: Some($content_type.as_ref().to_owned()),
274 thumbnail_source,
275 thumbnail_info
276 });
277 let content = assign!(VideoMessageEventContent::new(body, $source), {
278 info: Some(Box::new(info)),
279 formatted: $formatted_caption,
280 filename
281 });
282 <$t>::Video(content)
283 }
284
285 _ => {
286 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
287 mimetype: Some($content_type.as_ref().to_owned()),
288 thumbnail_source,
289 thumbnail_info
290 });
291 let content = assign!(FileMessageEventContent::new(body, $source), {
292 info: Some(Box::new(info)),
293 formatted: $formatted_caption,
294 filename,
295 });
296 <$t>::File(content)
297 }
298 }
299 }};
300}
301
302impl Room {
303 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
310 Self { inner: room, client }
311 }
312
313 #[doc(alias = "reject_invitation")]
319 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
320 pub async fn leave(&self) -> Result<()> {
321 let state = self.state();
322 if state == RoomState::Left {
323 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
324 "Joined or Invited",
325 state,
326 ))));
327 }
328
329 let should_forget = matches!(self.state(), RoomState::Invited);
332
333 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
334 let response = self.client.send(request).await;
335
336 if let Err(error) = response {
339 #[allow(clippy::collapsible_match)]
340 let ignore_error = if let Some(error) = error.client_api_error_kind() {
341 match error {
342 ErrorKind::Forbidden { .. } => true,
345 _ => false,
346 }
347 } else {
348 false
349 };
350
351 error!(?error, ignore_error, should_forget, "Failed to leave the room");
352
353 if !ignore_error {
354 return Err(error.into());
355 }
356 }
357
358 self.client.base_client().room_left(self.room_id()).await?;
359
360 if should_forget {
361 trace!("Trying to forget the room");
362
363 if let Err(error) = self.forget().await {
364 error!(?error, "Failed to forget the room");
365 }
366 }
367
368 Ok(())
369 }
370
371 #[doc(alias = "accept_invitation")]
375 pub async fn join(&self) -> Result<()> {
376 let prev_room_state = self.inner.state();
377
378 if prev_room_state == RoomState::Joined {
379 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
380 "Invited or Left",
381 prev_room_state,
382 ))));
383 }
384
385 self.client.join_room_by_id(self.room_id()).await?;
386
387 Ok(())
388 }
389
390 pub fn client(&self) -> Client {
394 self.client.clone()
395 }
396
397 pub fn is_synced(&self) -> bool {
400 self.inner.is_state_fully_synced()
401 }
402
403 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
433 let Some(url) = self.avatar_url() else { return Ok(None) };
434 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
435 Ok(Some(self.client.media().get_media_content(&request, true).await?))
436 }
437
438 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
467 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
468 let room_id = self.inner.room_id();
469 let request = options.into_request(room_id);
470 let http_response = self.client.send(request).await?;
471
472 let push_ctx = self.push_context().await?;
473 let chunk = join_all(
474 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
475 )
476 .await;
477
478 Ok(Messages {
479 start: http_response.start,
480 end: http_response.end,
481 chunk,
482 state: http_response.state,
483 })
484 }
485
486 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
496 where
497 Ev: SyncEvent + DeserializeOwned + Send + 'static,
498 H: EventHandler<Ev, Ctx>,
499 {
500 self.client.add_room_event_handler(self.room_id(), handler)
501 }
502
503 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
508 self.client.subscribe_to_room_updates(self.room_id())
509 }
510
511 pub fn subscribe_to_typing_notifications(
517 &self,
518 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
519 let (sender, receiver) = broadcast::channel(16);
520 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
521 let own_user_id = self.own_user_id().to_owned();
522 move |event: SyncTypingEvent| async move {
523 let typing_user_ids = event
525 .content
526 .user_ids
527 .into_iter()
528 .filter(|user_id| *user_id != own_user_id)
529 .collect();
530 let _ = sender.send(typing_user_ids);
532 }
533 });
534 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
535 (drop_guard, receiver)
536 }
537
538 #[cfg(feature = "e2e-encryption")]
561 pub async fn subscribe_to_identity_status_changes(
562 &self,
563 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
564 IdentityStatusChanges::create_stream(self.clone()).await
565 }
566
567 #[allow(clippy::unused_async)] async fn try_decrypt_event(
573 &self,
574 event: Raw<AnyTimelineEvent>,
575 push_ctx: Option<&PushContext>,
576 ) -> TimelineEvent {
577 #[cfg(feature = "e2e-encryption")]
578 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
579 SyncMessageLikeEvent::Original(_),
580 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
581 {
582 if let Ok(event) = self.decrypt_event(event.cast_ref(), push_ctx).await {
583 return event;
584 }
585 }
586
587 let mut event = TimelineEvent::from_plaintext(event.cast());
588 if let Some(push_ctx) = push_ctx {
589 event.set_push_actions(push_ctx.for_event(event.raw()));
590 }
591
592 event
593 }
594
595 pub async fn event(
600 &self,
601 event_id: &EventId,
602 request_config: Option<RequestConfig>,
603 ) -> Result<TimelineEvent> {
604 let request =
605 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
606
607 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
608 let push_ctx = self.push_context().await?;
609 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
610
611 if let Ok((cache, _handles)) = self.event_cache().await {
613 cache.save_events([event.clone()]).await;
614 }
615
616 Ok(event)
617 }
618
619 pub async fn load_or_fetch_event(
626 &self,
627 event_id: &EventId,
628 request_config: Option<RequestConfig>,
629 ) -> Result<TimelineEvent> {
630 match self.event_cache().await {
631 Ok((event_cache, _drop_handles)) => {
632 if let Some(event) = event_cache.find_event(event_id).await {
633 return Ok(event);
634 }
635 }
637 Err(err) => {
638 debug!("error when getting the event cache: {err}");
639 }
640 }
641 self.event(event_id, request_config).await
642 }
643
644 pub async fn event_with_context(
647 &self,
648 event_id: &EventId,
649 lazy_load_members: bool,
650 context_size: UInt,
651 request_config: Option<RequestConfig>,
652 ) -> Result<EventWithContextResponse> {
653 let mut request =
654 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
655
656 request.limit = context_size;
657
658 if lazy_load_members {
659 request.filter.lazy_load_options =
660 LazyLoadOptions::Enabled { include_redundant_members: false };
661 }
662
663 let response = self.client.send(request).with_request_config(request_config).await?;
664
665 let push_ctx = self.push_context().await?;
666 let push_ctx = push_ctx.as_ref();
667 let target_event = if let Some(event) = response.event {
668 Some(self.try_decrypt_event(event, push_ctx).await)
669 } else {
670 None
671 };
672
673 let (events_before, events_after) = join!(
677 join_all(
678 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
679 ),
680 join_all(
681 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
682 ),
683 );
684
685 if let Ok((cache, _handles)) = self.event_cache().await {
687 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
688 if let Some(event) = &target_event {
689 events_to_save.push(event.clone());
690 }
691
692 for event in &events_before {
693 events_to_save.push(event.clone());
694 }
695
696 for event in &events_after {
697 events_to_save.push(event.clone());
698 }
699
700 cache.save_events(events_to_save).await;
701 }
702
703 Ok(EventWithContextResponse {
704 event: target_event,
705 events_before,
706 events_after,
707 state: response.state,
708 prev_batch_token: response.start,
709 next_batch_token: response.end,
710 })
711 }
712
713 pub(crate) async fn request_members(&self) -> Result<()> {
714 self.client
715 .locks()
716 .members_request_deduplicated_handler
717 .run(self.room_id().to_owned(), async move {
718 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
719 let response = self
720 .client
721 .send(request.clone())
722 .with_request_config(
723 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
726 )
727 .await?;
728
729 Box::pin(self.client.base_client().receive_all_members(
731 self.room_id(),
732 &request,
733 &response,
734 ))
735 .await?;
736
737 Ok(())
738 })
739 .await
740 }
741
742 pub async fn request_encryption_state(&self) -> Result<()> {
747 if !self.inner.encryption_state().is_unknown() {
748 return Ok(());
749 }
750
751 self.client
752 .locks()
753 .encryption_state_deduplicated_handler
754 .run(self.room_id().to_owned(), async move {
755 let request = get_state_events_for_key::v3::Request::new(
757 self.room_id().to_owned(),
758 StateEventType::RoomEncryption,
759 "".to_owned(),
760 );
761 let response = match self.client.send(request).await {
762 Ok(response) => {
763 Some(response.content.deserialize_as::<RoomEncryptionEventContent>()?)
764 }
765 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
766 Err(err) => return Err(err.into()),
767 };
768
769 let _sync_lock = self.client.base_client().sync_lock().lock().await;
770
771 let mut room_info = self.clone_info();
774 room_info.mark_encryption_state_synced();
775 room_info.set_encryption_event(response.clone());
776 let mut changes = StateChanges::default();
777 changes.add_room(room_info.clone());
778
779 self.client.state_store().save_changes(&changes).await?;
780 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
781
782 Ok(())
783 })
784 .await
785 }
786
787 pub fn encryption_state(&self) -> EncryptionState {
792 self.inner.encryption_state()
793 }
794
795 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
801 self.request_encryption_state().await?;
802
803 Ok(self.encryption_state())
804 }
805
806 #[cfg(feature = "e2e-encryption")]
808 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
809 let encryption = self.client.encryption();
810
811 let this_device_is_verified = match encryption.get_own_device().await {
812 Ok(Some(device)) => device.is_verified_with_cross_signing(),
813
814 _ => true,
816 };
817
818 let backup_exists_on_server =
819 encryption.backups().exists_on_server().await.unwrap_or(false);
820
821 CryptoContextInfo {
822 device_creation_ts: encryption.device_creation_timestamp().await,
823 this_device_is_verified,
824 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
825 backup_exists_on_server,
826 }
827 }
828
829 fn are_events_visible(&self) -> bool {
830 if let RoomState::Invited = self.inner.state() {
831 return matches!(
832 self.inner.history_visibility_or_default(),
833 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
834 );
835 }
836
837 true
838 }
839
840 pub async fn sync_members(&self) -> Result<()> {
846 if !self.are_events_visible() {
847 return Ok(());
848 }
849
850 if !self.are_members_synced() {
851 self.request_members().await
852 } else {
853 Ok(())
854 }
855 }
856
857 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
871 self.sync_members().await?;
872 self.get_member_no_sync(user_id).await
873 }
874
875 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
889 Ok(self
890 .inner
891 .get_member(user_id)
892 .await?
893 .map(|member| RoomMember::new(self.client.clone(), member)))
894 }
895
896 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
905 self.sync_members().await?;
906 self.members_no_sync(memberships).await
907 }
908
909 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
918 Ok(self
919 .inner
920 .members(memberships)
921 .await?
922 .into_iter()
923 .map(|member| RoomMember::new(self.client.clone(), member))
924 .collect())
925 }
926
927 pub async fn get_state_events(
929 &self,
930 event_type: StateEventType,
931 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
932 self.client
933 .state_store()
934 .get_state_events(self.room_id(), event_type)
935 .await
936 .map_err(Into::into)
937 }
938
939 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
956 where
957 C: StaticEventContent + StaticStateEventContent + RedactContent,
958 C::Redacted: RedactedStateEventContent,
959 {
960 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
961 }
962
963 pub async fn get_state_events_for_keys(
966 &self,
967 event_type: StateEventType,
968 state_keys: &[&str],
969 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
970 self.client
971 .state_store()
972 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
973 .await
974 .map_err(Into::into)
975 }
976
977 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
997 &self,
998 state_keys: I,
999 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1000 where
1001 C: StaticEventContent + StaticStateEventContent + RedactContent,
1002 C::StateKey: Borrow<K>,
1003 C::Redacted: RedactedStateEventContent,
1004 K: AsRef<str> + Sized + Sync + 'a,
1005 I: IntoIterator<Item = &'a K> + Send,
1006 I::IntoIter: Send,
1007 {
1008 Ok(self
1009 .client
1010 .state_store()
1011 .get_state_events_for_keys_static(self.room_id(), state_keys)
1012 .await?)
1013 }
1014
1015 pub async fn get_state_event(
1017 &self,
1018 event_type: StateEventType,
1019 state_key: &str,
1020 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1021 self.client
1022 .state_store()
1023 .get_state_event(self.room_id(), event_type, state_key)
1024 .await
1025 .map_err(Into::into)
1026 }
1027
1028 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1047 where
1048 C: StaticEventContent + StaticStateEventContent<StateKey = EmptyStateKey> + RedactContent,
1049 C::Redacted: RedactedStateEventContent,
1050 {
1051 self.get_state_event_static_for_key(&EmptyStateKey).await
1052 }
1053
1054 pub async fn get_state_event_static_for_key<C, K>(
1074 &self,
1075 state_key: &K,
1076 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1077 where
1078 C: StaticEventContent + StaticStateEventContent + RedactContent,
1079 C::StateKey: Borrow<K>,
1080 C::Redacted: RedactedStateEventContent,
1081 K: AsRef<str> + ?Sized + Sync,
1082 {
1083 Ok(self
1084 .client
1085 .state_store()
1086 .get_state_event_static_for_key(self.room_id(), state_key)
1087 .await?)
1088 }
1089
1090 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1094 Ok(self
1099 .get_state_events_static::<SpaceParentEventContent>()
1100 .await?
1101 .into_iter()
1102 .flat_map(|parent_event| match parent_event.deserialize() {
1104 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1105 Some((e.state_key.to_owned(), e.sender))
1106 }
1107 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1108 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1109 Err(e) => {
1110 info!(room_id = ?self.room_id(), "Could not deserialize m.room.parent: {e}");
1111 None
1112 }
1113 })
1114 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1116 let Some(parent_room) = self.client.get_room(&state_key) else {
1117 return Ok(ParentSpace::Unverifiable(state_key));
1120 };
1121 if let Some(child_event) = parent_room
1124 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1125 .await?
1126 {
1127 match child_event.deserialize() {
1128 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1129 return Ok(ParentSpace::Reciprocal(parent_room));
1132 }
1133 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1134 Ok(SyncOrStrippedState::Stripped(_)) => {}
1135 Err(e) => {
1136 info!(
1137 room_id = ?self.room_id(), parent_room_id = ?state_key,
1138 "Could not deserialize m.room.child: {e}"
1139 );
1140 }
1141 }
1142 }
1147
1148 let Some(member) = parent_room.get_member(&sender).await? else {
1151 return Ok(ParentSpace::Illegitimate(parent_room));
1153 };
1154
1155 if member.can_send_state(StateEventType::SpaceChild) {
1156 Ok(ParentSpace::WithPowerlevel(parent_room))
1158 } else {
1159 Ok(ParentSpace::Illegitimate(parent_room))
1160 }
1161 })
1162 .collect::<FuturesUnordered<_>>())
1163 }
1164
1165 pub async fn account_data(
1167 &self,
1168 data_type: RoomAccountDataEventType,
1169 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1170 self.client
1171 .state_store()
1172 .get_room_account_data_event(self.room_id(), data_type)
1173 .await
1174 .map_err(Into::into)
1175 }
1176
1177 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1196 where
1197 C: StaticEventContent + RoomAccountDataEventContent,
1198 {
1199 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast))
1200 }
1201
1202 #[cfg(feature = "e2e-encryption")]
1207 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1208 let user_ids = self
1209 .client
1210 .state_store()
1211 .get_user_ids(self.room_id(), RoomMemberships::empty())
1212 .await?;
1213
1214 for user_id in user_ids {
1215 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1216 let any_unverified = devices.devices().any(|d| !d.is_verified());
1217
1218 if any_unverified {
1219 return Ok(false);
1220 }
1221 }
1222
1223 Ok(true)
1224 }
1225
1226 pub async fn set_account_data<T>(
1241 &self,
1242 content: T,
1243 ) -> Result<set_room_account_data::v3::Response>
1244 where
1245 T: RoomAccountDataEventContent,
1246 {
1247 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1248
1249 let request = set_room_account_data::v3::Request::new(
1250 own_user.to_owned(),
1251 self.room_id().to_owned(),
1252 &content,
1253 )?;
1254
1255 Ok(self.client.send(request).await?)
1256 }
1257
1258 pub async fn set_account_data_raw(
1283 &self,
1284 event_type: RoomAccountDataEventType,
1285 content: Raw<AnyRoomAccountDataEventContent>,
1286 ) -> Result<set_room_account_data::v3::Response> {
1287 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1288
1289 let request = set_room_account_data::v3::Request::new_raw(
1290 own_user.to_owned(),
1291 self.room_id().to_owned(),
1292 event_type,
1293 content,
1294 );
1295
1296 Ok(self.client.send(request).await?)
1297 }
1298
1299 pub async fn set_tag(
1330 &self,
1331 tag: TagName,
1332 tag_info: TagInfo,
1333 ) -> Result<create_tag::v3::Response> {
1334 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1335 let request = create_tag::v3::Request::new(
1336 user_id.to_owned(),
1337 self.inner.room_id().to_owned(),
1338 tag.to_string(),
1339 tag_info,
1340 );
1341 Ok(self.client.send(request).await?)
1342 }
1343
1344 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1351 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1352 let request = delete_tag::v3::Request::new(
1353 user_id.to_owned(),
1354 self.inner.room_id().to_owned(),
1355 tag.to_string(),
1356 );
1357 Ok(self.client.send(request).await?)
1358 }
1359
1360 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1370 if is_favourite {
1371 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1372
1373 self.set_tag(TagName::Favorite, tag_info).await?;
1374
1375 if self.is_low_priority() {
1376 self.remove_tag(TagName::LowPriority).await?;
1377 }
1378 } else {
1379 self.remove_tag(TagName::Favorite).await?;
1380 }
1381 Ok(())
1382 }
1383
1384 pub async fn set_is_low_priority(
1394 &self,
1395 is_low_priority: bool,
1396 tag_order: Option<f64>,
1397 ) -> Result<()> {
1398 if is_low_priority {
1399 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1400
1401 self.set_tag(TagName::LowPriority, tag_info).await?;
1402
1403 if self.is_favourite() {
1404 self.remove_tag(TagName::Favorite).await?;
1405 }
1406 } else {
1407 self.remove_tag(TagName::LowPriority).await?;
1408 }
1409 Ok(())
1410 }
1411
1412 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1421 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1422
1423 let mut content = self
1424 .client
1425 .account()
1426 .account_data::<DirectEventContent>()
1427 .await?
1428 .map(|c| c.deserialize())
1429 .transpose()?
1430 .unwrap_or_default();
1431
1432 let this_room_id = self.inner.room_id();
1433
1434 if is_direct {
1435 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1436 room_members.retain(|member| member.user_id() != self.own_user_id());
1437
1438 for member in room_members {
1439 let entry = content.entry(member.user_id().into()).or_default();
1440 if !entry.iter().any(|room_id| room_id == this_room_id) {
1441 entry.push(this_room_id.to_owned());
1442 }
1443 }
1444 } else {
1445 for (_, list) in content.iter_mut() {
1446 list.retain(|room_id| *room_id != this_room_id);
1447 }
1448
1449 content.retain(|_, list| !list.is_empty());
1451 }
1452
1453 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1454
1455 self.client.send(request).await?;
1456 Ok(())
1457 }
1458
1459 #[cfg(feature = "e2e-encryption")]
1467 pub async fn decrypt_event(
1468 &self,
1469 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1470 push_ctx: Option<&PushContext>,
1471 ) -> Result<TimelineEvent> {
1472 let machine = self.client.olm_machine().await;
1473 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1474
1475 match machine
1476 .try_decrypt_room_event(
1477 event.cast_ref(),
1478 self.inner.room_id(),
1479 self.client.decryption_settings(),
1480 )
1481 .await?
1482 {
1483 RoomEventDecryptionResult::Decrypted(decrypted) => {
1484 let push_actions = push_ctx.map(|push_ctx| push_ctx.for_event(&decrypted.event));
1485 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1486 }
1487 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1488 self.client
1489 .encryption()
1490 .backups()
1491 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1492 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1493 }
1494 }
1495 }
1496
1497 #[cfg(feature = "e2e-encryption")]
1510 pub async fn get_encryption_info(
1511 &self,
1512 session_id: &str,
1513 sender: &UserId,
1514 ) -> Option<Arc<EncryptionInfo>> {
1515 let machine = self.client.olm_machine().await;
1516 let machine = machine.as_ref()?;
1517 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1518 }
1519
1520 #[cfg(feature = "e2e-encryption")]
1533 pub async fn discard_room_key(&self) -> Result<()> {
1534 let machine = self.client.olm_machine().await;
1535 if let Some(machine) = machine.as_ref() {
1536 machine.discard_room_key(self.inner.room_id()).await?;
1537 Ok(())
1538 } else {
1539 Err(Error::NoOlmMachine)
1540 }
1541 }
1542
1543 #[instrument(skip_all)]
1551 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1552 let request = assign!(
1553 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1554 { reason: reason.map(ToOwned::to_owned) }
1555 );
1556 self.client.send(request).await?;
1557 Ok(())
1558 }
1559
1560 #[instrument(skip_all)]
1568 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1569 let request = assign!(
1570 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1571 { reason: reason.map(ToOwned::to_owned) }
1572 );
1573 self.client.send(request).await?;
1574 Ok(())
1575 }
1576
1577 #[instrument(skip_all)]
1586 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1587 let request = assign!(
1588 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1589 { reason: reason.map(ToOwned::to_owned) }
1590 );
1591 self.client.send(request).await?;
1592 Ok(())
1593 }
1594
1595 #[instrument(skip_all)]
1601 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1602 #[cfg(feature = "e2e-encryption")]
1603 if self.client.inner.enable_share_history_on_invite {
1604 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1605 }
1606
1607 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1608 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1609 self.client.send(request).await?;
1610
1611 self.mark_members_missing();
1615
1616 Ok(())
1617 }
1618
1619 #[instrument(skip_all)]
1625 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1626 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1627 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1628 self.client.send(request).await?;
1629
1630 self.mark_members_missing();
1634
1635 Ok(())
1636 }
1637
1638 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1673 self.ensure_room_joined()?;
1674
1675 let send = if let Some(typing_time) =
1678 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1679 {
1680 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1681 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1685 } else {
1686 !typing
1688 }
1689 } else {
1690 typing
1693 };
1694
1695 if send {
1696 self.send_typing_notice(typing).await?;
1697 }
1698
1699 Ok(())
1700 }
1701
1702 #[instrument(name = "typing_notice", skip(self))]
1703 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1704 let typing = if typing {
1705 self.client
1706 .inner
1707 .typing_notice_times
1708 .write()
1709 .unwrap()
1710 .insert(self.room_id().to_owned(), Instant::now());
1711 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1712 } else {
1713 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1714 Typing::No
1715 };
1716
1717 let request = create_typing_event::v3::Request::new(
1718 self.own_user_id().to_owned(),
1719 self.room_id().to_owned(),
1720 typing,
1721 );
1722
1723 self.client.send(request).await?;
1724
1725 Ok(())
1726 }
1727
1728 #[instrument(skip_all)]
1745 pub async fn send_single_receipt(
1746 &self,
1747 receipt_type: create_receipt::v3::ReceiptType,
1748 thread: ReceiptThread,
1749 event_id: OwnedEventId,
1750 ) -> Result<()> {
1751 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1754
1755 self.client
1756 .inner
1757 .locks
1758 .read_receipt_deduplicated_handler
1759 .run((request_key, event_id.clone()), async {
1760 let is_unthreaded = thread == ReceiptThread::Unthreaded;
1762
1763 let mut request = create_receipt::v3::Request::new(
1764 self.room_id().to_owned(),
1765 receipt_type,
1766 event_id,
1767 );
1768 request.thread = thread;
1769
1770 self.client.send(request).await?;
1771
1772 if is_unthreaded {
1773 self.set_unread_flag(false).await?;
1774 }
1775
1776 Ok(())
1777 })
1778 .await
1779 }
1780
1781 #[instrument(skip_all)]
1791 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
1792 if receipts.is_empty() {
1793 return Ok(());
1794 }
1795
1796 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
1797 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
1798 fully_read,
1799 read_receipt: public_read_receipt,
1800 private_read_receipt,
1801 });
1802
1803 self.client.send(request).await?;
1804
1805 self.set_unread_flag(false).await?;
1806
1807 Ok(())
1808 }
1809
1810 #[instrument(skip_all)]
1842 pub async fn enable_encryption(&self) -> Result<()> {
1843 use ruma::{
1844 events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm,
1845 };
1846 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
1847
1848 if !self.latest_encryption_state().await?.is_encrypted() {
1849 let content =
1850 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
1851 self.send_state_event(content).await?;
1852
1853 _ = timeout(self.client.inner.sync_beat.listen(), SYNC_WAIT_TIME).await;
1857
1858 let _sync_lock = self.client.base_client().sync_lock().lock().await;
1863 if !self.inner.encryption_state().is_encrypted() {
1864 debug!("still not marked as encrypted, marking encryption state as missing");
1865
1866 let mut room_info = self.clone_info();
1867 room_info.mark_encryption_state_missing();
1868 let mut changes = StateChanges::default();
1869 changes.add_room(room_info.clone());
1870
1871 self.client.state_store().save_changes(&changes).await?;
1872 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
1873 } else {
1874 debug!("room successfully marked as encrypted");
1875 }
1876 }
1877
1878 Ok(())
1879 }
1880
1881 #[cfg(feature = "e2e-encryption")]
1890 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
1891 async fn preshare_room_key(&self) -> Result<()> {
1892 self.ensure_room_joined()?;
1893
1894 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
1896 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
1897
1898 self.client
1899 .locks()
1900 .group_session_deduplicated_handler
1901 .run(self.room_id().to_owned(), async move {
1902 {
1903 let members = self
1904 .client
1905 .state_store()
1906 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
1907 .await?;
1908 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
1909 };
1910
1911 let response = self.share_room_key().await;
1912
1913 if let Err(r) = response {
1917 let machine = self.client.olm_machine().await;
1918 if let Some(machine) = machine.as_ref() {
1919 machine.discard_room_key(self.room_id()).await?;
1920 }
1921 return Err(r);
1922 }
1923
1924 Ok(())
1925 })
1926 .await
1927 }
1928
1929 #[cfg(feature = "e2e-encryption")]
1935 #[instrument(skip_all)]
1936 async fn share_room_key(&self) -> Result<()> {
1937 self.ensure_room_joined()?;
1938
1939 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
1940
1941 for request in requests {
1942 let response = self.client.send_to_device(&request).await?;
1943 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
1944 }
1945
1946 Ok(())
1947 }
1948
1949 #[instrument(skip_all)]
1958 pub async fn sync_up(&self) {
1959 while !self.is_synced() && self.state() == RoomState::Joined {
1960 let wait_for_beat = self.client.inner.sync_beat.listen();
1961 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
1963 }
1964 }
1965
1966 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2036 SendMessageLikeEvent::new(self, content)
2037 }
2038
2039 #[cfg(feature = "e2e-encryption")]
2042 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2043 let olm = self.client.olm_machine().await;
2044 let olm = olm.as_ref().expect("Olm machine wasn't started");
2045
2046 let members =
2047 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2048
2049 let tracked: HashMap<_, _> = olm
2050 .store()
2051 .load_tracked_users()
2052 .await?
2053 .into_iter()
2054 .map(|tracked| (tracked.user_id, tracked.dirty))
2055 .collect();
2056
2057 let members_with_unknown_devices =
2060 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2061
2062 let (req_id, request) =
2063 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2064
2065 if !request.device_keys.is_empty() {
2066 self.client.keys_query(&req_id, request.device_keys).await?;
2067 }
2068
2069 Ok(())
2070 }
2071
2072 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2116 pub fn send_raw<'a>(
2117 &'a self,
2118 event_type: &'a str,
2119 content: impl IntoRawMessageLikeEventContent,
2120 ) -> SendRawMessageLikeEvent<'a> {
2121 SendRawMessageLikeEvent::new(self, event_type, content)
2124 }
2125
2126 #[instrument(skip_all)]
2174 pub fn send_attachment<'a>(
2175 &'a self,
2176 filename: impl Into<String>,
2177 content_type: &'a Mime,
2178 data: Vec<u8>,
2179 config: AttachmentConfig,
2180 ) -> SendAttachment<'a> {
2181 SendAttachment::new(self, filename.into(), content_type, data, config)
2182 }
2183
2184 #[instrument(skip_all)]
2212 pub(super) async fn prepare_and_send_attachment<'a>(
2213 &'a self,
2214 filename: String,
2215 content_type: &'a Mime,
2216 data: Vec<u8>,
2217 mut config: AttachmentConfig,
2218 send_progress: SharedObservable<TransmissionProgress>,
2219 store_in_cache: bool,
2220 ) -> Result<send_message_event::v3::Response> {
2221 self.ensure_room_joined()?;
2222
2223 let txn_id = config.txn_id.take();
2224 let mentions = config.mentions.take();
2225
2226 let thumbnail = config.thumbnail.take();
2227
2228 let thumbnail_cache_info = if store_in_cache {
2230 thumbnail
2231 .as_ref()
2232 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2233 } else {
2234 None
2235 };
2236
2237 #[cfg(feature = "e2e-encryption")]
2238 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2239 self.client
2240 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2241 .await?
2242 } else {
2243 self.client
2244 .media()
2245 .upload_plain_media_and_thumbnail(
2246 content_type,
2247 data.clone(),
2250 thumbnail,
2251 send_progress,
2252 )
2253 .await?
2254 };
2255
2256 #[cfg(not(feature = "e2e-encryption"))]
2257 let (media_source, thumbnail) = self
2258 .client
2259 .media()
2260 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2261 .await?;
2262
2263 if store_in_cache {
2264 let cache_store_lock_guard = self.client.event_cache_store().lock().await?;
2265
2266 debug!("caching the media");
2270 let request =
2271 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2272
2273 if let Err(err) = cache_store_lock_guard
2274 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2275 .await
2276 {
2277 warn!("unable to cache the media after uploading it: {err}");
2278 }
2279
2280 if let Some(((data, height, width), source)) =
2281 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2282 {
2283 debug!("caching the thumbnail");
2284
2285 let request = MediaRequestParameters {
2286 source: source.clone(),
2287 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2288 };
2289
2290 if let Err(err) = cache_store_lock_guard
2291 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2292 .await
2293 {
2294 warn!("unable to cache the media after uploading it: {err}");
2295 }
2296 }
2297 }
2298
2299 let content = self
2300 .make_media_event(
2301 Room::make_attachment_type(
2302 content_type,
2303 filename,
2304 media_source,
2305 config.caption,
2306 config.formatted_caption,
2307 config.info,
2308 thumbnail,
2309 ),
2310 mentions,
2311 config.reply,
2312 )
2313 .await?;
2314
2315 let mut fut = self.send(content);
2316 if let Some(txn_id) = txn_id {
2317 fut = fut.with_transaction_id(txn_id);
2318 }
2319 fut.await
2320 }
2321
2322 #[allow(clippy::too_many_arguments)]
2325 pub(crate) fn make_attachment_type(
2326 content_type: &Mime,
2327 filename: String,
2328 source: MediaSource,
2329 caption: Option<String>,
2330 formatted_caption: Option<FormattedBody>,
2331 info: Option<AttachmentInfo>,
2332 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2333 ) -> MessageType {
2334 make_media_type!(
2335 MessageType,
2336 content_type,
2337 filename,
2338 source,
2339 caption,
2340 formatted_caption,
2341 info,
2342 thumbnail
2343 )
2344 }
2345
2346 pub(crate) async fn make_media_event(
2349 &self,
2350 msg_type: MessageType,
2351 mentions: Option<Mentions>,
2352 reply: Option<Reply>,
2353 ) -> Result<RoomMessageEventContent> {
2354 let mut content = RoomMessageEventContent::new(msg_type);
2355 if let Some(mentions) = mentions {
2356 content = content.add_mentions(mentions);
2357 }
2358 if let Some(reply) = reply {
2359 content = self.make_reply_event(content.into(), reply).await?;
2362 }
2363 Ok(content)
2364 }
2365
2366 #[cfg(feature = "unstable-msc4274")]
2369 #[allow(clippy::too_many_arguments)]
2370 pub(crate) fn make_gallery_item_type(
2371 content_type: &Mime,
2372 filename: String,
2373 source: MediaSource,
2374 caption: Option<String>,
2375 formatted_caption: Option<FormattedBody>,
2376 info: Option<AttachmentInfo>,
2377 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2378 ) -> GalleryItemType {
2379 make_media_type!(
2380 GalleryItemType,
2381 content_type,
2382 filename,
2383 source,
2384 caption,
2385 formatted_caption,
2386 info,
2387 thumbnail
2388 )
2389 }
2390
2391 pub async fn update_power_levels(
2400 &self,
2401 updates: Vec<(&UserId, Int)>,
2402 ) -> Result<send_state_event::v3::Response> {
2403 let mut power_levels = self.power_levels().await?;
2404
2405 for (user_id, new_level) in updates {
2406 if new_level == power_levels.users_default {
2407 power_levels.users.remove(user_id);
2408 } else {
2409 power_levels.users.insert(user_id.to_owned(), new_level);
2410 }
2411 }
2412
2413 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await
2414 }
2415
2416 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2421 let mut power_levels = self.power_levels().await?;
2422 power_levels.apply(changes)?;
2423 self.send_state_event(RoomPowerLevelsEventContent::from(power_levels)).await?;
2424 Ok(())
2425 }
2426
2427 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2431 let default_power_levels = RoomPowerLevels::from(RoomPowerLevelsEventContent::new());
2432 let changes = RoomPowerLevelChanges::from(default_power_levels);
2433 self.apply_power_level_changes(changes).await?;
2434 Ok(self.power_levels().await?)
2435 }
2436
2437 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2442 let power_level = self.get_user_power_level(user_id).await?;
2443 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2444 }
2445
2446 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<i64> {
2451 let event = self.power_levels().await?;
2452 Ok(event.for_user(user_id).into())
2453 }
2454
2455 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2458 let power_levels = self.power_levels().await.ok();
2459 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2460 if let Some(power_levels) = power_levels {
2461 for (id, level) in power_levels.users.into_iter() {
2462 user_power_levels.insert(id, level.into());
2463 }
2464 }
2465 user_power_levels
2466 }
2467
2468 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2470 self.send_state_event(RoomNameEventContent::new(name)).await
2471 }
2472
2473 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2475 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2476 }
2477
2478 pub async fn set_avatar_url(
2484 &self,
2485 url: &MxcUri,
2486 info: Option<avatar::ImageInfo>,
2487 ) -> Result<send_state_event::v3::Response> {
2488 self.ensure_room_joined()?;
2489
2490 let mut room_avatar_event = RoomAvatarEventContent::new();
2491 room_avatar_event.url = Some(url.to_owned());
2492 room_avatar_event.info = info.map(Box::new);
2493
2494 self.send_state_event(room_avatar_event).await
2495 }
2496
2497 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2499 self.send_state_event(RoomAvatarEventContent::new()).await
2500 }
2501
2502 pub async fn upload_avatar(
2510 &self,
2511 mime: &Mime,
2512 data: Vec<u8>,
2513 info: Option<avatar::ImageInfo>,
2514 ) -> Result<send_state_event::v3::Response> {
2515 self.ensure_room_joined()?;
2516
2517 let upload_response = self.client.media().upload(mime, data, None).await?;
2518 let mut info = info.unwrap_or_default();
2519 info.blurhash = upload_response.blurhash;
2520 info.mimetype = Some(mime.to_string());
2521
2522 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2523 }
2524
2525 #[instrument(skip_all)]
2569 pub async fn send_state_event(
2570 &self,
2571 content: impl StateEventContent<StateKey = EmptyStateKey>,
2572 ) -> Result<send_state_event::v3::Response> {
2573 self.send_state_event_for_key(&EmptyStateKey, content).await
2574 }
2575
2576 pub async fn send_state_event_for_key<C, K>(
2617 &self,
2618 state_key: &K,
2619 content: C,
2620 ) -> Result<send_state_event::v3::Response>
2621 where
2622 C: StateEventContent,
2623 C::StateKey: Borrow<K>,
2624 K: AsRef<str> + ?Sized,
2625 {
2626 self.ensure_room_joined()?;
2627 let request =
2628 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
2629 let response = self.client.send(request).await?;
2630 Ok(response)
2631 }
2632
2633 #[instrument(skip_all)]
2668 pub async fn send_state_event_raw(
2669 &self,
2670 event_type: &str,
2671 state_key: &str,
2672 content: impl IntoRawStateEventContent,
2673 ) -> Result<send_state_event::v3::Response> {
2674 self.ensure_room_joined()?;
2675
2676 let request = send_state_event::v3::Request::new_raw(
2677 self.room_id().to_owned(),
2678 event_type.into(),
2679 state_key.to_owned(),
2680 content.into_raw_state_event_content(),
2681 );
2682
2683 Ok(self.client.send(request).await?)
2684 }
2685
2686 #[instrument(skip_all)]
2721 pub async fn redact(
2722 &self,
2723 event_id: &EventId,
2724 reason: Option<&str>,
2725 txn_id: Option<OwnedTransactionId>,
2726 ) -> HttpResult<redact_event::v3::Response> {
2727 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
2728 let request = assign!(
2729 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
2730 { reason: reason.map(ToOwned::to_owned) }
2731 );
2732
2733 self.client.send(request).await
2734 }
2735
2736 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
2745 let acl_ev = self
2746 .get_state_event_static::<RoomServerAclEventContent>()
2747 .await?
2748 .and_then(|ev| ev.deserialize().ok());
2749 let acl = acl_ev.as_ref().and_then(|ev| match ev {
2750 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
2751 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
2752 });
2753
2754 let members: Vec<_> = self
2758 .members_no_sync(RoomMemberships::JOIN)
2759 .await?
2760 .into_iter()
2761 .filter(|member| {
2762 let server = member.user_id().server_name();
2763 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
2764 })
2765 .collect();
2766
2767 let max = members
2770 .iter()
2771 .max_by_key(|member| member.power_level())
2772 .filter(|max| max.power_level() >= 50)
2773 .map(|member| member.user_id().server_name());
2774
2775 let servers = members
2777 .iter()
2778 .map(|member| member.user_id().server_name())
2779 .filter(|server| max.filter(|max| max == server).is_none())
2780 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
2781 *servers.entry(server).or_default() += 1;
2782 servers
2783 });
2784 let mut servers: Vec<_> = servers.into_iter().collect();
2785 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
2786
2787 Ok(max
2788 .into_iter()
2789 .chain(servers.into_iter().map(|(name, _)| name))
2790 .take(3)
2791 .map(ToOwned::to_owned)
2792 .collect())
2793 }
2794
2795 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
2802 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2803 return Ok(alias.matrix_to_uri());
2804 }
2805
2806 let via = self.route().await?;
2807 Ok(self.room_id().matrix_to_uri_via(via))
2808 }
2809
2810 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
2821 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
2822 return Ok(alias.matrix_uri(join));
2823 }
2824
2825 let via = self.route().await?;
2826 Ok(self.room_id().matrix_uri_via(via, join))
2827 }
2828
2829 pub async fn matrix_to_event_permalink(
2843 &self,
2844 event_id: impl Into<OwnedEventId>,
2845 ) -> Result<MatrixToUri> {
2846 let via = self.route().await?;
2849 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
2850 }
2851
2852 pub async fn matrix_event_permalink(
2866 &self,
2867 event_id: impl Into<OwnedEventId>,
2868 ) -> Result<MatrixUri> {
2869 let via = self.route().await?;
2872 Ok(self.room_id().matrix_event_uri_via(event_id, via))
2873 }
2874
2875 pub async fn load_user_receipt(
2888 &self,
2889 receipt_type: ReceiptType,
2890 thread: ReceiptThread,
2891 user_id: &UserId,
2892 ) -> Result<Option<(OwnedEventId, Receipt)>> {
2893 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
2894 }
2895
2896 pub async fn load_event_receipts(
2909 &self,
2910 receipt_type: ReceiptType,
2911 thread: ReceiptThread,
2912 event_id: &EventId,
2913 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
2914 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
2915 }
2916
2917 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
2922 let room_id = self.room_id();
2923 let user_id = self.own_user_id();
2924 let room_info = self.clone_info();
2925 let member_count = room_info.active_members_count();
2926
2927 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
2928 member.name().to_owned()
2929 } else {
2930 return Ok(None);
2931 };
2932
2933 let power_levels = self.power_levels().await.ok().map(Into::into);
2934
2935 Ok(Some(PushConditionRoomCtx {
2936 user_id: user_id.to_owned(),
2937 room_id: room_id.to_owned(),
2938 member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
2939 user_display_name,
2940 power_levels,
2941 }))
2942 }
2943
2944 pub async fn push_context(&self) -> Result<Option<PushContext>> {
2947 let Some(push_condition_room_ctx) = self.push_condition_room_ctx().await? else {
2948 debug!("Could not aggregate push context");
2949 return Ok(None);
2950 };
2951 let push_rules = self.client().account().push_rules().await?;
2952 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
2953 }
2954
2955 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
2960 Ok(self.push_context().await?.map(|ctx| ctx.for_event(event)))
2961 }
2962
2963 pub async fn invite_details(&self) -> Result<Invite> {
2966 let state = self.state();
2967
2968 if state != RoomState::Invited {
2969 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
2970 }
2971
2972 let invitee = self
2973 .get_member_no_sync(self.own_user_id())
2974 .await?
2975 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
2976 let event = invitee.event();
2977 let inviter_id = event.sender();
2978 let inviter = self.get_member_no_sync(inviter_id).await?;
2979 Ok(Invite { invitee, inviter })
2980 }
2981
2982 pub async fn member_with_sender_info(
2990 &self,
2991 user_id: &UserId,
2992 ) -> Result<RoomMemberWithSenderInfo> {
2993 let Some(member) = self.get_member_no_sync(user_id).await? else {
2994 return Err(Error::InsufficientData);
2995 };
2996
2997 let sender_member =
2998 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
2999 Some(member)
3001 } else if self.are_members_synced() {
3002 None
3004 } else if self.sync_members().await.is_ok() {
3005 self.get_member_no_sync(member.event().sender()).await?
3007 } else {
3008 None
3009 };
3010
3011 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3012 }
3013
3014 pub async fn forget(&self) -> Result<()> {
3020 let state = self.state();
3021 match state {
3022 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3023 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3024 "Left / Banned",
3025 state,
3026 ))));
3027 }
3028 RoomState::Left | RoomState::Banned => {}
3029 }
3030
3031 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3032 let _response = self.client.send(request).await?;
3033
3034 if self.inner.direct_targets_length() != 0 {
3036 if let Err(e) = self.set_is_direct(false).await {
3037 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3040 }
3041 }
3042
3043 self.client.base_client().forget_room(self.inner.room_id()).await?;
3044
3045 Ok(())
3046 }
3047
3048 fn ensure_room_joined(&self) -> Result<()> {
3049 let state = self.state();
3050 if state == RoomState::Joined {
3051 Ok(())
3052 } else {
3053 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3054 }
3055 }
3056
3057 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3059 if !matches!(self.state(), RoomState::Joined) {
3060 return None;
3061 }
3062
3063 let notification_settings = self.client().notification_settings().await;
3064
3065 let notification_mode =
3067 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3068
3069 if notification_mode.is_some() {
3070 notification_mode
3071 } else if let Ok(is_encrypted) =
3072 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3073 {
3074 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3079 let default_mode = notification_settings
3080 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3081 .await;
3082 Some(default_mode)
3083 } else {
3084 None
3085 }
3086 }
3087
3088 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3099 if !matches!(self.state(), RoomState::Joined) {
3100 return None;
3101 }
3102
3103 let notification_settings = self.client().notification_settings().await;
3104
3105 let mode =
3107 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3108
3109 if let Some(mode) = mode {
3110 self.update_cached_user_defined_notification_mode(mode);
3111 }
3112
3113 mode
3114 }
3115
3116 pub async fn report_content(
3129 &self,
3130 event_id: OwnedEventId,
3131 score: Option<ReportedContentScore>,
3132 reason: Option<String>,
3133 ) -> Result<report_content::v3::Response> {
3134 let state = self.state();
3135 if state != RoomState::Joined {
3136 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3137 }
3138
3139 let request = report_content::v3::Request::new(
3140 self.inner.room_id().to_owned(),
3141 event_id,
3142 score.map(Into::into),
3143 reason,
3144 );
3145 Ok(self.client.send(request).await?)
3146 }
3147
3148 pub async fn report_room(&self, reason: Option<String>) -> Result<report_room::v3::Response> {
3159 let mut request = report_room::v3::Request::new(self.inner.room_id().to_owned());
3160 request.reason = reason;
3161
3162 Ok(self.client.send(request).await?)
3163 }
3164
3165 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3171 if self.is_marked_unread() == unread {
3172 return Ok(());
3174 }
3175
3176 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3177
3178 let content = MarkedUnreadEventContent::new(unread);
3179
3180 let request = set_room_account_data::v3::Request::new(
3181 user_id.to_owned(),
3182 self.inner.room_id().to_owned(),
3183 &content,
3184 )?;
3185
3186 self.client.send(request).await?;
3187 Ok(())
3188 }
3189
3190 pub async fn event_cache(
3193 &self,
3194 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3195 self.client.event_cache().for_room(self.room_id()).await
3196 }
3197
3198 pub async fn send_call_notification_if_needed(&self) -> Result<bool> {
3213 debug!("Sending call notification for room {} if needed", self.inner.room_id());
3214
3215 if self.has_active_room_call() {
3216 warn!("Room {} has active room call, not sending a new notify event.", self.room_id());
3217 return Ok(false);
3218 }
3219
3220 let can_user_trigger_room_notification =
3221 self.power_levels().await?.user_can_trigger_room_notification(self.own_user_id());
3222
3223 if !can_user_trigger_room_notification {
3224 warn!(
3225 "User can't send notifications to everyone in the room {}. \
3226 Not sending a new notify event.",
3227 self.room_id()
3228 );
3229 return Ok(false);
3230 }
3231
3232 let notify_type = if self.is_direct().await.unwrap_or(false) {
3233 NotifyType::Ring
3234 } else {
3235 NotifyType::Notify
3236 };
3237
3238 debug!("Sending `m.call.notify` event with notify type: {notify_type:?}");
3239
3240 self.send_call_notification(
3241 self.room_id().to_string().to_owned(),
3242 ApplicationType::Call,
3243 notify_type,
3244 Mentions::with_room_mention(),
3245 )
3246 .await?;
3247
3248 Ok(true)
3249 }
3250
3251 pub(crate) async fn get_user_beacon_info(
3258 &self,
3259 user_id: &UserId,
3260 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3261 let raw_event = self
3262 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3263 .await?
3264 .ok_or(BeaconError::NotFound)?;
3265
3266 match raw_event.deserialize()? {
3267 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3268 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3269 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3270 }
3271 }
3272
3273 pub async fn start_live_location_share(
3286 &self,
3287 duration_millis: u64,
3288 description: Option<String>,
3289 ) -> Result<send_state_event::v3::Response> {
3290 self.ensure_room_joined()?;
3291
3292 self.send_state_event_for_key(
3293 self.own_user_id(),
3294 BeaconInfoEventContent::new(
3295 description,
3296 Duration::from_millis(duration_millis),
3297 true,
3298 None,
3299 ),
3300 )
3301 .await
3302 }
3303
3304 pub async fn stop_live_location_share(
3311 &self,
3312 ) -> Result<send_state_event::v3::Response, BeaconError> {
3313 self.ensure_room_joined()?;
3314
3315 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3316 beacon_info_event.content.stop();
3317 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3318 }
3319
3320 pub async fn send_location_beacon(
3332 &self,
3333 geo_uri: String,
3334 ) -> Result<send_message_event::v3::Response, BeaconError> {
3335 self.ensure_room_joined()?;
3336
3337 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3338
3339 if beacon_info_event.content.is_live() {
3340 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3341 Ok(self.send(content).await?)
3342 } else {
3343 Err(BeaconError::NotLive)
3344 }
3345 }
3346
3347 pub async fn send_call_notification(
3359 &self,
3360 call_id: String,
3361 application: ApplicationType,
3362 notify_type: NotifyType,
3363 mentions: Mentions,
3364 ) -> Result<()> {
3365 let call_notify_event_content =
3366 CallNotifyEventContent::new(call_id, application, notify_type, mentions);
3367 self.send(call_notify_event_content).await?;
3368 Ok(())
3369 }
3370
3371 pub async fn save_composer_draft(
3374 &self,
3375 draft: ComposerDraft,
3376 thread_root: Option<&EventId>,
3377 ) -> Result<()> {
3378 self.client
3379 .state_store()
3380 .set_kv_data(
3381 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3382 StateStoreDataValue::ComposerDraft(draft),
3383 )
3384 .await?;
3385 Ok(())
3386 }
3387
3388 pub async fn load_composer_draft(
3391 &self,
3392 thread_root: Option<&EventId>,
3393 ) -> Result<Option<ComposerDraft>> {
3394 let data = self
3395 .client
3396 .state_store()
3397 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3398 .await?;
3399 Ok(data.and_then(|d| d.into_composer_draft()))
3400 }
3401
3402 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3405 self.client
3406 .state_store()
3407 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3408 .await?;
3409 Ok(())
3410 }
3411
3412 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3415 let response = self
3416 .client
3417 .send(get_state_events_for_key::v3::Request::new(
3418 self.room_id().to_owned(),
3419 StateEventType::RoomPinnedEvents,
3420 "".to_owned(),
3421 ))
3422 .await;
3423
3424 match response {
3425 Ok(response) => {
3426 Ok(Some(response.content.deserialize_as::<RoomPinnedEventsEventContent>()?.pinned))
3427 }
3428 Err(http_error) => match http_error.as_client_api_error() {
3429 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3430 _ => Err(http_error.into()),
3431 },
3432 }
3433 }
3434
3435 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3443 ObservableLiveLocation::new(&self.client, self.room_id())
3444 }
3445
3446 pub async fn subscribe_to_knock_requests(
3460 &self,
3461 ) -> Result<(impl Stream<Item = Vec<KnockRequest>>, JoinHandle<()>)> {
3462 let this = Arc::new(self.clone());
3463
3464 let room_member_events_observer =
3465 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3466
3467 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3468 let mut seen_request_ids_stream = self
3469 .seen_knock_request_ids_map
3470 .subscribe()
3471 .await
3472 .map(|values| values.unwrap_or_default());
3473
3474 let mut room_info_stream = self.subscribe_info();
3475
3476 let clear_seen_ids_handle = spawn({
3479 let this = self.clone();
3480 async move {
3481 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3482 while member_updates_stream.recv().await.is_ok() {
3483 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3485 warn!("Failed to remove seen knock requests: {err}")
3486 }
3487 }
3488 }
3489 });
3490
3491 let combined_stream = stream! {
3492 match this.get_current_join_requests(¤t_seen_ids).await {
3494 Ok(initial_requests) => yield initial_requests,
3495 Err(err) => warn!("Failed to get initial requests to join: {err}")
3496 }
3497
3498 let mut requests_stream = room_member_events_observer.subscribe();
3499 let mut seen_ids = current_seen_ids.clone();
3500
3501 loop {
3502 tokio::select! {
3505 Some((event, _)) = requests_stream.next() => {
3506 if let Some(event) = event.as_original() {
3507 let emit = if event.prev_content().is_some() {
3509 matches!(event.membership_change(),
3510 MembershipChange::Banned |
3511 MembershipChange::Knocked |
3512 MembershipChange::KnockAccepted |
3513 MembershipChange::KnockDenied |
3514 MembershipChange::KnockRetracted
3515 )
3516 } else {
3517 true
3520 };
3521
3522 if emit {
3523 match this.get_current_join_requests(&seen_ids).await {
3524 Ok(requests) => yield requests,
3525 Err(err) => {
3526 warn!("Failed to get updated knock requests on new member event: {err}")
3527 }
3528 }
3529 }
3530 }
3531 }
3532
3533 Some(new_seen_ids) = seen_request_ids_stream.next() => {
3534 seen_ids = new_seen_ids;
3536
3537 match this.get_current_join_requests(&seen_ids).await {
3540 Ok(requests) => yield requests,
3541 Err(err) => {
3542 warn!("Failed to get updated knock requests on seen ids changed: {err}")
3543 }
3544 }
3545 }
3546
3547 Some(room_info) = room_info_stream.next() => {
3548 if !room_info.are_members_synced() {
3551 match this.get_current_join_requests(&seen_ids).await {
3552 Ok(requests) => yield requests,
3553 Err(err) => {
3554 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
3555 }
3556 }
3557 }
3558 }
3559 else => break,
3561 }
3562 }
3563 };
3564
3565 Ok((combined_stream, clear_seen_ids_handle))
3566 }
3567
3568 async fn get_current_join_requests(
3569 &self,
3570 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
3571 ) -> Result<Vec<KnockRequest>> {
3572 Ok(self
3573 .members(RoomMemberships::KNOCK)
3574 .await?
3575 .into_iter()
3576 .filter_map(|member| {
3577 let event_id = member.event().event_id()?;
3578 Some(KnockRequest::new(
3579 self,
3580 event_id,
3581 member.event().timestamp(),
3582 KnockRequestMemberInfo::from_member(&member),
3583 seen_request_ids.contains_key(event_id),
3584 ))
3585 })
3586 .collect())
3587 }
3588
3589 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
3591 RoomPrivacySettings::new(&self.inner, &self.client)
3592 }
3593
3594 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
3602 let request = opts.into_request(self.room_id());
3603
3604 let response = self.client.send(request).await?;
3605
3606 let push_ctx = self.push_context().await?;
3607 let chunk = join_all(
3608 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
3609 )
3610 .await;
3611
3612 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
3613 }
3614
3615 pub async fn relations(
3629 &self,
3630 event_id: OwnedEventId,
3631 opts: RelationsOptions,
3632 ) -> Result<Relations> {
3633 opts.send(self, event_id).await
3634 }
3635}
3636
3637#[cfg(feature = "e2e-encryption")]
3638impl RoomIdentityProvider for Room {
3639 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
3640 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
3641 }
3642
3643 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
3644 Box::pin(async {
3645 let members = self
3646 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
3647 .await
3648 .unwrap_or_else(|_| Default::default());
3649
3650 let mut ret: Vec<UserIdentity> = Vec::new();
3651 for member in members {
3652 if let Some(i) = self.user_identity(member.user_id()).await {
3653 ret.push(i);
3654 }
3655 }
3656 ret
3657 })
3658 }
3659
3660 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
3661 Box::pin(async {
3662 self.client
3663 .encryption()
3664 .get_user_identity(user_id)
3665 .await
3666 .unwrap_or(None)
3667 .map(|u| u.underlying_identity())
3668 })
3669 }
3670}
3671
3672#[derive(Clone, Debug)]
3675pub(crate) struct WeakRoom {
3676 client: WeakClient,
3677 room_id: OwnedRoomId,
3678}
3679
3680impl WeakRoom {
3681 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
3683 Self { client, room_id }
3684 }
3685
3686 pub fn get(&self) -> Option<Room> {
3688 self.client.get().and_then(|client| client.get_room(&self.room_id))
3689 }
3690
3691 pub fn room_id(&self) -> &RoomId {
3693 &self.room_id
3694 }
3695}
3696
3697#[derive(Debug, Clone)]
3699pub struct Invite {
3700 pub invitee: RoomMember,
3702 pub inviter: Option<RoomMember>,
3704}
3705
3706#[derive(Error, Debug)]
3707enum InvitationError {
3708 #[error("No membership event found")]
3709 EventMissing,
3710}
3711
3712#[derive(Debug, Clone, Default)]
3714#[non_exhaustive]
3715pub struct Receipts {
3716 pub fully_read: Option<OwnedEventId>,
3718 pub public_read_receipt: Option<OwnedEventId>,
3720 pub private_read_receipt: Option<OwnedEventId>,
3722}
3723
3724impl Receipts {
3725 pub fn new() -> Self {
3727 Self::default()
3728 }
3729
3730 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3739 self.fully_read = event_id.into();
3740 self
3741 }
3742
3743 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3749 self.public_read_receipt = event_id.into();
3750 self
3751 }
3752
3753 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
3757 self.private_read_receipt = event_id.into();
3758 self
3759 }
3760
3761 pub fn is_empty(&self) -> bool {
3763 self.fully_read.is_none()
3764 && self.public_read_receipt.is_none()
3765 && self.private_read_receipt.is_none()
3766 }
3767}
3768
3769#[derive(Debug)]
3772pub enum ParentSpace {
3773 Reciprocal(Room),
3776 WithPowerlevel(Room),
3781 Illegitimate(Room),
3784 Unverifiable(OwnedRoomId),
3787}
3788
3789#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
3793pub struct ReportedContentScore(i8);
3794
3795impl ReportedContentScore {
3796 pub const MIN: Self = Self(-100);
3800
3801 pub const MAX: Self = Self(0);
3805
3806 pub fn new(value: i8) -> Option<Self> {
3815 value.try_into().ok()
3816 }
3817
3818 pub fn new_saturating(value: i8) -> Self {
3824 if value > Self::MAX {
3825 Self::MAX
3826 } else if value < Self::MIN {
3827 Self::MIN
3828 } else {
3829 Self(value)
3830 }
3831 }
3832
3833 pub fn value(&self) -> i8 {
3835 self.0
3836 }
3837}
3838
3839impl PartialEq<i8> for ReportedContentScore {
3840 fn eq(&self, other: &i8) -> bool {
3841 self.0.eq(other)
3842 }
3843}
3844
3845impl PartialEq<ReportedContentScore> for i8 {
3846 fn eq(&self, other: &ReportedContentScore) -> bool {
3847 self.eq(&other.0)
3848 }
3849}
3850
3851impl PartialOrd<i8> for ReportedContentScore {
3852 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
3853 self.0.partial_cmp(other)
3854 }
3855}
3856
3857impl PartialOrd<ReportedContentScore> for i8 {
3858 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
3859 self.partial_cmp(&other.0)
3860 }
3861}
3862
3863impl From<ReportedContentScore> for Int {
3864 fn from(value: ReportedContentScore) -> Self {
3865 value.0.into()
3866 }
3867}
3868
3869impl TryFrom<i8> for ReportedContentScore {
3870 type Error = TryFromReportedContentScoreError;
3871
3872 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
3873 if value > Self::MAX || value < Self::MIN {
3874 Err(TryFromReportedContentScoreError(()))
3875 } else {
3876 Ok(Self(value))
3877 }
3878 }
3879}
3880
3881impl TryFrom<i16> for ReportedContentScore {
3882 type Error = TryFromReportedContentScoreError;
3883
3884 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
3885 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3886 value.try_into()
3887 }
3888}
3889
3890impl TryFrom<i32> for ReportedContentScore {
3891 type Error = TryFromReportedContentScoreError;
3892
3893 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
3894 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3895 value.try_into()
3896 }
3897}
3898
3899impl TryFrom<i64> for ReportedContentScore {
3900 type Error = TryFromReportedContentScoreError;
3901
3902 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
3903 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3904 value.try_into()
3905 }
3906}
3907
3908impl TryFrom<Int> for ReportedContentScore {
3909 type Error = TryFromReportedContentScoreError;
3910
3911 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
3912 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
3913 value.try_into()
3914 }
3915}
3916
3917trait EventSource {
3918 fn get_event(
3919 &self,
3920 event_id: &EventId,
3921 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
3922}
3923
3924impl EventSource for &Room {
3925 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
3926 self.load_or_fetch_event(event_id, None).await
3927 }
3928}
3929
3930#[derive(Debug, Clone, Error)]
3933#[error("out of range conversion attempted")]
3934pub struct TryFromReportedContentScoreError(());
3935
3936#[derive(Debug)]
3939pub struct RoomMemberWithSenderInfo {
3940 pub room_member: RoomMember,
3942 pub sender_info: Option<RoomMember>,
3945}
3946
3947#[cfg(all(test, not(target_family = "wasm")))]
3948mod tests {
3949 use matrix_sdk_base::{store::ComposerDraftType, ComposerDraft};
3950 use matrix_sdk_test::{
3951 async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent,
3952 SyncResponseBuilder,
3953 };
3954 use ruma::{
3955 event_id,
3956 events::{relation::RelationType, room::member::MembershipState},
3957 int, owned_event_id, room_id, user_id,
3958 };
3959 use wiremock::{
3960 matchers::{header, method, path_regex},
3961 Mock, MockServer, ResponseTemplate,
3962 };
3963
3964 use super::ReportedContentScore;
3965 use crate::{
3966 config::RequestConfig,
3967 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
3968 test_utils::{
3969 client::mock_matrix_session,
3970 logged_in_client,
3971 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
3972 },
3973 Client,
3974 };
3975
3976 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
3977 #[async_test]
3978 async fn test_cache_invalidation_while_encrypt() {
3979 use matrix_sdk_base::store::RoomLoadSettings;
3980 use matrix_sdk_test::{message_like_event_content, DEFAULT_TEST_ROOM_ID};
3981
3982 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
3983 let session = mock_matrix_session();
3984
3985 let client = Client::builder()
3986 .homeserver_url("http://localhost:1234")
3987 .request_config(RequestConfig::new().disable_retry())
3988 .sqlite_store(&sqlite_path, None)
3989 .build()
3990 .await
3991 .unwrap();
3992 client
3993 .matrix_auth()
3994 .restore_session(session.clone(), RoomLoadSettings::default())
3995 .await
3996 .unwrap();
3997
3998 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
3999
4000 let server = MockServer::start().await;
4002 {
4003 Mock::given(method("GET"))
4004 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4005 .and(header("authorization", "Bearer 1234"))
4006 .respond_with(
4007 ResponseTemplate::new(200)
4008 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4009 )
4010 .mount(&server)
4011 .await;
4012 let response = SyncResponseBuilder::default()
4013 .add_joined_room(
4014 JoinedRoomBuilder::default()
4015 .add_state_event(StateTestEvent::Member)
4016 .add_state_event(StateTestEvent::PowerLevels)
4017 .add_state_event(StateTestEvent::Encryption),
4018 )
4019 .build_sync_response();
4020 client.base_client().receive_sync_response(response).await.unwrap();
4021 }
4022
4023 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4024
4025 room.preshare_room_key().await.unwrap();
4027
4028 {
4031 let client = Client::builder()
4032 .homeserver_url("http://localhost:1234")
4033 .request_config(RequestConfig::new().disable_retry())
4034 .sqlite_store(&sqlite_path, None)
4035 .build()
4036 .await
4037 .unwrap();
4038 client
4039 .matrix_auth()
4040 .restore_session(session.clone(), RoomLoadSettings::default())
4041 .await
4042 .unwrap();
4043 client
4044 .encryption()
4045 .enable_cross_process_store_lock("client2".to_owned())
4046 .await
4047 .unwrap();
4048
4049 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4050 assert!(guard.is_some());
4051 }
4052
4053 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4055 assert!(guard.is_some());
4056
4057 let olm = client.olm_machine().await;
4059 let olm = olm.as_ref().expect("Olm machine wasn't started");
4060
4061 let _encrypted_content = olm
4064 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4065 .await
4066 .unwrap();
4067 }
4068
4069 #[test]
4070 fn reported_content_score() {
4071 let score = ReportedContentScore::new(0).unwrap();
4073 assert_eq!(score.value(), 0);
4074 let score = ReportedContentScore::new(-50).unwrap();
4075 assert_eq!(score.value(), -50);
4076 let score = ReportedContentScore::new(-100).unwrap();
4077 assert_eq!(score.value(), -100);
4078 assert_eq!(ReportedContentScore::new(10), None);
4079 assert_eq!(ReportedContentScore::new(-110), None);
4080
4081 let score = ReportedContentScore::new_saturating(0);
4082 assert_eq!(score.value(), 0);
4083 let score = ReportedContentScore::new_saturating(-50);
4084 assert_eq!(score.value(), -50);
4085 let score = ReportedContentScore::new_saturating(-100);
4086 assert_eq!(score.value(), -100);
4087 let score = ReportedContentScore::new_saturating(10);
4088 assert_eq!(score, ReportedContentScore::MAX);
4089 let score = ReportedContentScore::new_saturating(-110);
4090 assert_eq!(score, ReportedContentScore::MIN);
4091
4092 let score = ReportedContentScore::try_from(0i16).unwrap();
4094 assert_eq!(score.value(), 0);
4095 let score = ReportedContentScore::try_from(-100i16).unwrap();
4096 assert_eq!(score.value(), -100);
4097 ReportedContentScore::try_from(10i16).unwrap_err();
4098 ReportedContentScore::try_from(-110i16).unwrap_err();
4099
4100 let score = ReportedContentScore::try_from(0i32).unwrap();
4102 assert_eq!(score.value(), 0);
4103 let score = ReportedContentScore::try_from(-100i32).unwrap();
4104 assert_eq!(score.value(), -100);
4105 ReportedContentScore::try_from(10i32).unwrap_err();
4106 ReportedContentScore::try_from(-110i32).unwrap_err();
4107
4108 let score = ReportedContentScore::try_from(0i64).unwrap();
4110 assert_eq!(score.value(), 0);
4111 let score = ReportedContentScore::try_from(-100i64).unwrap();
4112 assert_eq!(score.value(), -100);
4113 ReportedContentScore::try_from(10i64).unwrap_err();
4114 ReportedContentScore::try_from(-110i64).unwrap_err();
4115
4116 let score = ReportedContentScore::try_from(int!(0)).unwrap();
4118 assert_eq!(score.value(), 0);
4119 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4120 assert_eq!(score.value(), -100);
4121 ReportedContentScore::try_from(int!(10)).unwrap_err();
4122 ReportedContentScore::try_from(int!(-110)).unwrap_err();
4123 }
4124
4125 #[async_test]
4126 async fn test_composer_draft() {
4127 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4128
4129 let client = logged_in_client(None).await;
4130
4131 let response = SyncResponseBuilder::default()
4132 .add_joined_room(JoinedRoomBuilder::default())
4133 .build_sync_response();
4134 client.base_client().receive_sync_response(response).await.unwrap();
4135 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4136
4137 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4138
4139 let draft = ComposerDraft {
4142 plain_text: "Hello, world!".to_owned(),
4143 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4144 draft_type: ComposerDraftType::NewMessage,
4145 };
4146
4147 room.save_composer_draft(draft.clone(), None).await.unwrap();
4148
4149 let thread_root = owned_event_id!("$thread_root:b.c");
4150 let thread_draft = ComposerDraft {
4151 plain_text: "Hello, thread!".to_owned(),
4152 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4153 draft_type: ComposerDraftType::NewMessage,
4154 };
4155
4156 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4157
4158 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4160
4161 assert_eq!(
4163 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4164 Some(thread_draft.clone())
4165 );
4166
4167 room.clear_composer_draft(None).await.unwrap();
4169 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4170
4171 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4173
4174 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4176 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4177 }
4178
4179 #[async_test]
4180 async fn test_mark_join_requests_as_seen() {
4181 let server = MatrixMockServer::new().await;
4182 let client = server.client_builder().build().await;
4183 let event_id = event_id!("$a:b.c");
4184 let room_id = room_id!("!a:b.c");
4185 let user_id = user_id!("@alice:b.c");
4186
4187 let f = EventFactory::new().room(room_id);
4188 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f
4189 .member(user_id)
4190 .membership(MembershipState::Knock)
4191 .event_id(event_id)
4192 .into_raw()]);
4193 let room = server.sync_room(&client, joined_room_builder).await;
4194
4195 let seen_ids =
4197 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4198 assert!(seen_ids.is_empty());
4199
4200 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4202 .await
4203 .expect("Couldn't mark join request as seen");
4204
4205 let seen_ids =
4207 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4208 assert_eq!(seen_ids.len(), 1);
4209 assert_eq!(
4210 seen_ids.into_iter().next().expect("No next value"),
4211 (event_id.to_owned(), user_id.to_owned())
4212 )
4213 }
4214
4215 #[async_test]
4216 async fn test_own_room_membership_with_no_own_member_event() {
4217 let server = MatrixMockServer::new().await;
4218 let client = server.client_builder().build().await;
4219 let room_id = room_id!("!a:b.c");
4220
4221 let room = server.sync_joined_room(&client, room_id).await;
4222
4223 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4226 assert!(error.is_some());
4227 }
4228
4229 #[async_test]
4230 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4231 let server = MatrixMockServer::new().await;
4232 let client = server.client_builder().build().await;
4233 let room_id = room_id!("!a:b.c");
4234 let user_id = user_id!("@example:localhost");
4235
4236 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4237 let joined_room_builder =
4238 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4239 let room = server.sync_room(&client, joined_room_builder).await;
4240
4241 let ret = room
4243 .member_with_sender_info(client.user_id().unwrap())
4244 .await
4245 .expect("Room member info should be available");
4246
4247 assert_eq!(ret.room_member.event().user_id(), user_id);
4249
4250 assert!(ret.sender_info.is_none());
4252 }
4253
4254 #[async_test]
4255 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4256 let server = MatrixMockServer::new().await;
4257 let client = server.client_builder().build().await;
4258 let room_id = room_id!("!a:b.c");
4259 let user_id = user_id!("@example:localhost");
4260
4261 let f = EventFactory::new().room(room_id).sender(user_id);
4262 let joined_room_builder =
4263 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4264 let room = server.sync_room(&client, joined_room_builder).await;
4265
4266 let ret = room
4268 .member_with_sender_info(client.user_id().unwrap())
4269 .await
4270 .expect("Room member info should be available");
4271
4272 assert_eq!(ret.room_member.event().user_id(), user_id);
4274
4275 assert!(ret.sender_info.is_some());
4277 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
4278 }
4279
4280 #[async_test]
4281 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
4282 let server = MatrixMockServer::new().await;
4283 let client = server.client_builder().build().await;
4284 let room_id = room_id!("!a:b.c");
4285 let user_id = user_id!("@example:localhost");
4286 let sender_id = user_id!("@alice:b.c");
4287
4288 let f = EventFactory::new().room(room_id).sender(sender_id);
4289 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4290 f.member(user_id).into_raw(),
4291 f.member(sender_id).into_raw(),
4293 ]);
4294 let room = server.sync_room(&client, joined_room_builder).await;
4295
4296 let ret = room
4298 .member_with_sender_info(client.user_id().unwrap())
4299 .await
4300 .expect("Room member info should be available");
4301
4302 assert_eq!(ret.room_member.event().user_id(), user_id);
4304
4305 assert!(ret.sender_info.is_some());
4307 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
4308 }
4309
4310 #[async_test]
4311 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
4312 let server = MatrixMockServer::new().await;
4313 let client = server.client_builder().build().await;
4314 let room_id = room_id!("!a:b.c");
4315 let user_id = user_id!("@example:localhost");
4316 let sender_id = user_id!("@alice:b.c");
4317
4318 let f = EventFactory::new().room(room_id).sender(sender_id);
4319 let joined_room_builder =
4320 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4321 let room = server.sync_room(&client, joined_room_builder).await;
4322
4323 server
4325 .mock_get_members()
4326 .ok(vec![f.member(sender_id).into_raw()])
4327 .mock_once()
4328 .mount()
4329 .await;
4330
4331 let ret = room
4333 .member_with_sender_info(client.user_id().unwrap())
4334 .await
4335 .expect("Room member info should be available");
4336
4337 assert_eq!(ret.room_member.event().user_id(), user_id);
4339
4340 assert!(ret.sender_info.is_some());
4342 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
4343 }
4344
4345 #[async_test]
4346 async fn test_list_threads() {
4347 let server = MatrixMockServer::new().await;
4348 let client = server.client_builder().build().await;
4349
4350 let room_id = room_id!("!a:b.c");
4351 let sender_id = user_id!("@alice:b.c");
4352 let f = EventFactory::new().room(room_id).sender(sender_id);
4353
4354 let eid1 = event_id!("$1");
4355 let eid2 = event_id!("$2");
4356 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
4357 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
4358
4359 server
4360 .mock_room_threads()
4361 .ok(batch1.clone(), Some("prev_batch".to_owned()))
4362 .mock_once()
4363 .mount()
4364 .await;
4365 server
4366 .mock_room_threads()
4367 .match_from("prev_batch")
4368 .ok(batch2, None)
4369 .mock_once()
4370 .mount()
4371 .await;
4372
4373 let room = server.sync_joined_room(&client, room_id).await;
4374 let result =
4375 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
4376 assert_eq!(result.chunk.len(), 1);
4377 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
4378 assert!(result.prev_batch_token.is_some());
4379
4380 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
4381 let result = room.list_threads(opts).await.expect("Failed to list threads");
4382 assert_eq!(result.chunk.len(), 1);
4383 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
4384 assert!(result.prev_batch_token.is_none());
4385 }
4386
4387 #[async_test]
4388 async fn test_relations() {
4389 let server = MatrixMockServer::new().await;
4390 let client = server.client_builder().build().await;
4391
4392 let room_id = room_id!("!a:b.c");
4393 let sender_id = user_id!("@alice:b.c");
4394 let f = EventFactory::new().room(room_id).sender(sender_id);
4395
4396 let target_event_id = owned_event_id!("$target");
4397 let eid1 = event_id!("$1");
4398 let eid2 = event_id!("$2");
4399 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
4400 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
4401
4402 server
4403 .mock_room_relations()
4404 .match_target_event(target_event_id.clone())
4405 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
4406 .mock_once()
4407 .mount()
4408 .await;
4409
4410 server
4411 .mock_room_relations()
4412 .match_target_event(target_event_id.clone())
4413 .match_from("next_batch")
4414 .ok(RoomRelationsResponseTemplate::default().events(batch2))
4415 .mock_once()
4416 .mount()
4417 .await;
4418
4419 let room = server.sync_joined_room(&client, room_id).await;
4420
4421 let mut opts = RelationsOptions {
4423 include_relations: IncludeRelations::AllRelations,
4424 ..Default::default()
4425 };
4426 let result = room
4427 .relations(target_event_id.clone(), opts.clone())
4428 .await
4429 .expect("Failed to list relations the first time");
4430 assert_eq!(result.chunk.len(), 1);
4431 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
4432 assert!(result.prev_batch_token.is_none());
4433 assert!(result.next_batch_token.is_some());
4434 assert!(result.recursion_depth.is_none());
4435
4436 opts.from = result.next_batch_token;
4437 let result = room
4438 .relations(target_event_id, opts)
4439 .await
4440 .expect("Failed to list relations the second time");
4441 assert_eq!(result.chunk.len(), 1);
4442 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
4443 assert!(result.prev_batch_token.is_none());
4444 assert!(result.next_batch_token.is_none());
4445 assert!(result.recursion_depth.is_none());
4446 }
4447
4448 #[async_test]
4449 async fn test_relations_with_reltype() {
4450 let server = MatrixMockServer::new().await;
4451 let client = server.client_builder().build().await;
4452
4453 let room_id = room_id!("!a:b.c");
4454 let sender_id = user_id!("@alice:b.c");
4455 let f = EventFactory::new().room(room_id).sender(sender_id);
4456
4457 let target_event_id = owned_event_id!("$target");
4458 let eid1 = event_id!("$1");
4459 let eid2 = event_id!("$2");
4460 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
4461 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
4462
4463 server
4464 .mock_room_relations()
4465 .match_target_event(target_event_id.clone())
4466 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
4467 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
4468 .mock_once()
4469 .mount()
4470 .await;
4471
4472 server
4473 .mock_room_relations()
4474 .match_target_event(target_event_id.clone())
4475 .match_from("next_batch")
4476 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
4477 .ok(RoomRelationsResponseTemplate::default().events(batch2))
4478 .mock_once()
4479 .mount()
4480 .await;
4481
4482 let room = server.sync_joined_room(&client, room_id).await;
4483
4484 let mut opts = RelationsOptions {
4486 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
4487 ..Default::default()
4488 };
4489 let result = room
4490 .relations(target_event_id.clone(), opts.clone())
4491 .await
4492 .expect("Failed to list relations the first time");
4493 assert_eq!(result.chunk.len(), 1);
4494 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
4495 assert!(result.prev_batch_token.is_none());
4496 assert!(result.next_batch_token.is_some());
4497 assert!(result.recursion_depth.is_none());
4498
4499 opts.from = result.next_batch_token;
4500 let result = room
4501 .relations(target_event_id, opts)
4502 .await
4503 .expect("Failed to list relations the second time");
4504 assert_eq!(result.chunk.len(), 1);
4505 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
4506 assert!(result.prev_batch_token.is_none());
4507 assert!(result.next_batch_token.is_none());
4508 assert!(result.recursion_depth.is_none());
4509 }
4510}