1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39 IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44 StateChanges, StateStoreDataKey, StateStoreDataValue,
45 deserialized_responses::{
46 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47 },
48 media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49 store::{StateStoreExt, ThreadSubscriptionStatus},
50};
51#[cfg(feature = "e2e-encryption")]
52use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
53#[cfg(feature = "e2e-encryption")]
54use matrix_sdk_common::BoxFuture;
55use matrix_sdk_common::{
56 deserialized_responses::TimelineEvent,
57 executor::{JoinHandle, spawn},
58 timeout::timeout,
59};
60#[cfg(feature = "experimental-search")]
61use matrix_sdk_search::error::IndexError;
62#[cfg(feature = "experimental-search")]
63#[cfg(doc)]
64use matrix_sdk_search::index::RoomIndex;
65use mime::Mime;
66use reply::Reply;
67#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
68use ruma::events::AnySyncMessageLikeEvent;
69#[cfg(feature = "experimental-encrypted-state-events")]
70use ruma::events::AnySyncStateEvent;
71#[cfg(feature = "unstable-msc4274")]
72use ruma::events::room::message::GalleryItemType;
73#[cfg(feature = "e2e-encryption")]
74use ruma::events::{
75 AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
76};
77use ruma::{
78 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
79 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
80 api::client::{
81 config::{set_global_account_data, set_room_account_data},
82 context,
83 error::ErrorKind,
84 filter::LazyLoadOptions,
85 membership::{
86 Invite3pid, ban_user, forget_room, get_member_events,
87 invite_user::{self, v3::InvitationRecipient},
88 kick_user, leave_room, unban_user,
89 },
90 message::send_message_event,
91 read_marker::set_read_marker,
92 receipt::create_receipt,
93 redact::redact_event,
94 room::{get_room_event, report_content, report_room},
95 state::{get_state_event_for_key, send_state_event},
96 tag::{create_tag, delete_tag},
97 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
98 typing::create_typing_event::{self, v3::Typing},
99 },
100 assign,
101 events::{
102 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
103 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
104 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
105 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
106 StaticStateEventContent, SyncStateEvent,
107 beacon::BeaconEventContent,
108 beacon_info::BeaconInfoEventContent,
109 direct::DirectEventContent,
110 marked_unread::MarkedUnreadEventContent,
111 receipt::{Receipt, ReceiptThread, ReceiptType},
112 room::{
113 ImageInfo, MediaSource, ThumbnailInfo,
114 avatar::{self, RoomAvatarEventContent},
115 encryption::RoomEncryptionEventContent,
116 history_visibility::HistoryVisibility,
117 member::{MembershipChange, RoomMemberEventContent, SyncRoomMemberEvent},
118 message::{
119 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
120 ImageMessageEventContent, MessageType, RoomMessageEventContent,
121 TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
122 UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
123 },
124 name::RoomNameEventContent,
125 pinned_events::RoomPinnedEventsEventContent,
126 power_levels::{
127 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
128 },
129 server_acl::RoomServerAclEventContent,
130 topic::RoomTopicEventContent,
131 },
132 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
133 tag::{TagInfo, TagName},
134 typing::SyncTypingEvent,
135 },
136 int,
137 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
138 serde::Raw,
139 time::Instant,
140};
141#[cfg(feature = "experimental-encrypted-state-events")]
142use ruma::{
143 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
144 serde::JsonCastable,
145};
146use serde::de::DeserializeOwned;
147use thiserror::Error;
148use tokio::{join, sync::broadcast};
149use tracing::{debug, error, info, instrument, trace, warn};
150
151use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
152pub use self::{
153 member::{RoomMember, RoomMemberRole},
154 messages::{
155 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
156 Relations, RelationsOptions, ThreadRoots,
157 },
158};
159#[cfg(feature = "e2e-encryption")]
160use crate::encryption::backups::BackupState;
161#[cfg(doc)]
162use crate::event_cache::EventCache;
163#[cfg(feature = "experimental-encrypted-state-events")]
164use crate::room::futures::{SendRawStateEvent, SendStateEvent};
165use crate::{
166 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
167 attachment::{AttachmentConfig, AttachmentInfo},
168 client::WeakClient,
169 config::RequestConfig,
170 error::{BeaconError, WrongRoomState},
171 event_cache::{self, EventCacheDropHandles, RoomEventCache},
172 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
173 live_location_share::ObservableLiveLocation,
174 media::{MediaFormat, MediaRequestParameters},
175 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
176 room::{
177 knock_requests::{KnockRequest, KnockRequestMemberInfo},
178 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
179 privacy_settings::RoomPrivacySettings,
180 },
181 sync::RoomUpdate,
182 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
183};
184
185pub mod edit;
186pub mod futures;
187pub mod identity_status_changes;
188pub mod knock_requests;
190mod member;
191mod messages;
192pub mod power_levels;
193pub mod reply;
194
195pub mod calls;
196
197pub mod privacy_settings;
199
200#[cfg(feature = "e2e-encryption")]
201pub(crate) mod shared_room_history;
202
203#[derive(Debug, Clone)]
206pub struct Room {
207 inner: BaseRoom,
208 pub(crate) client: Client,
209}
210
211impl Deref for Room {
212 type Target = BaseRoom;
213
214 fn deref(&self) -> &Self::Target {
215 &self.inner
216 }
217}
218
219const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
220const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224pub struct ThreadSubscription {
225 pub automatic: bool,
228}
229
230#[derive(Debug)]
232pub struct PushContext {
233 push_condition_room_ctx: PushConditionRoomCtx,
235
236 push_rules: Ruleset,
239}
240
241impl PushContext {
242 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
244 Self { push_condition_room_ctx, push_rules }
245 }
246
247 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
249 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
250 }
251
252 #[doc(hidden)]
255 #[instrument(skip_all)]
256 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
257 let rules = self
258 .push_rules
259 .iter()
260 .filter_map(|r| {
261 if !r.enabled() {
262 return None;
263 }
264
265 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
266
267 let conditions = match r {
268 AnyPushRuleRef::Override(r) => {
269 format!("{:?}", r.conditions)
270 }
271 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
272 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
273 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
274 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
275 _ => "<unknown push rule kind>".to_owned(),
276 };
277
278 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
279 })
280 .collect::<Vec<_>>()
281 .join("\n");
282 trace!("rules:\n\n{rules}\n\n");
283
284 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
285
286 if let Some(found) = found {
287 trace!("rule {} matched", found.rule_id());
288 found.actions().to_owned()
289 } else {
290 trace!("no match");
291 Vec::new()
292 }
293 }
294}
295
296macro_rules! make_media_type {
297 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
298 let (body, formatted, filename) = match $caption {
302 Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
303 None => ($filename, None, None),
304 };
305
306 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
307
308 match $content_type.type_() {
309 mime::IMAGE => {
310 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
311 mimetype: Some($content_type.as_ref().to_owned()),
312 thumbnail_source,
313 thumbnail_info
314 });
315 let content = assign!(ImageMessageEventContent::new(body, $source), {
316 info: Some(Box::new(info)),
317 formatted,
318 filename
319 });
320 <$t>::Image(content)
321 }
322
323 mime::AUDIO => {
324 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
325 formatted,
326 filename
327 });
328
329 if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
330 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
331 let waveform = waveform_vec
332 .iter()
333 .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
334 .map(Into::into)
335 .collect();
336 content.audio =
337 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
338 }
339
340 if matches!($info, Some(AttachmentInfo::Voice(_))) {
341 content.voice = Some(UnstableVoiceContentBlock::new());
342 }
343
344 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
345 audio_info.mimetype = Some($content_type.as_ref().to_owned());
346 let content = content.info(Box::new(audio_info));
347
348 <$t>::Audio(content)
349 }
350
351 mime::VIDEO => {
352 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
353 mimetype: Some($content_type.as_ref().to_owned()),
354 thumbnail_source,
355 thumbnail_info
356 });
357 let content = assign!(VideoMessageEventContent::new(body, $source), {
358 info: Some(Box::new(info)),
359 formatted,
360 filename
361 });
362 <$t>::Video(content)
363 }
364
365 _ => {
366 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
367 mimetype: Some($content_type.as_ref().to_owned()),
368 thumbnail_source,
369 thumbnail_info
370 });
371 let content = assign!(FileMessageEventContent::new(body, $source), {
372 info: Some(Box::new(info)),
373 formatted,
374 filename,
375 });
376 <$t>::File(content)
377 }
378 }
379 }};
380}
381
382impl Room {
383 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
390 Self { inner: room, client }
391 }
392
393 #[doc(alias = "reject_invitation")]
399 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
400 async fn leave_impl(&self) -> (Result<()>, &Room) {
401 let state = self.state();
402 if state == RoomState::Left {
403 return (
404 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
405 "Joined or Invited",
406 state,
407 )))),
408 self,
409 );
410 }
411
412 let should_forget = matches!(self.state(), RoomState::Invited);
415
416 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
417 let response = self.client.send(request).await;
418
419 if let Err(error) = response {
422 #[allow(clippy::collapsible_match)]
423 let ignore_error = if let Some(error) = error.client_api_error_kind() {
424 match error {
425 ErrorKind::Forbidden { .. } => true,
428 _ => false,
429 }
430 } else {
431 false
432 };
433
434 error!(?error, ignore_error, should_forget, "Failed to leave the room");
435
436 if !ignore_error {
437 return (Err(error.into()), self);
438 }
439 }
440
441 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
442 return (Err(e.into()), self);
443 }
444
445 if should_forget {
446 trace!("Trying to forget the room");
447
448 if let Err(error) = self.forget().await {
449 error!(?error, "Failed to forget the room");
450 }
451 }
452
453 (Ok(()), self)
454 }
455
456 pub async fn leave(&self) -> Result<()> {
464 let mut rooms: Vec<Room> = vec![self.clone()];
465 let mut current_room = self;
466
467 while let Some(predecessor) = current_room.predecessor_room() {
468 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
469
470 if let Some(predecessor_room) = maybe_predecessor_room {
471 rooms.push(predecessor_room.clone());
472 current_room = rooms.last().expect("Room just pushed so can't be empty");
473 } else {
474 warn!("Cannot find predecessor room");
475 break;
476 }
477 }
478
479 let batch_size = 5;
480
481 let rooms_futures: Vec<_> = rooms
482 .iter()
483 .filter_map(|room| match room.state() {
484 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
485 Some(room.leave_impl())
486 }
487 RoomState::Banned | RoomState::Left => None,
488 })
489 .collect();
490
491 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
492
493 let mut maybe_this_room_failed_with: Option<Error> = None;
494
495 while let Some(result) = futures_stream.next().await {
496 if let (Err(e), room) = result {
497 if room.room_id() == self.room_id() {
498 maybe_this_room_failed_with = Some(e);
499 } else {
500 warn!("Failure while attempting to leave predecessor room: {e:?}");
501 }
502 }
503 }
504
505 maybe_this_room_failed_with.map_or(Ok(()), Err)
506 }
507
508 #[doc(alias = "accept_invitation")]
512 pub async fn join(&self) -> Result<()> {
513 let prev_room_state = self.inner.state();
514
515 if prev_room_state == RoomState::Joined {
516 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
517 "Invited or Left",
518 prev_room_state,
519 ))));
520 }
521
522 self.client.join_room_by_id(self.room_id()).await?;
523
524 Ok(())
525 }
526
527 pub fn client(&self) -> Client {
531 self.client.clone()
532 }
533
534 pub fn is_synced(&self) -> bool {
537 self.inner.is_state_fully_synced()
538 }
539
540 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
570 let Some(url) = self.avatar_url() else { return Ok(None) };
571 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
572 Ok(Some(self.client.media().get_media_content(&request, true).await?))
573 }
574
575 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
604 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
605 let room_id = self.inner.room_id();
606 let request = options.into_request(room_id);
607 let http_response = self.client.send(request).await?;
608
609 let push_ctx = self.push_context().await?;
610 let chunk = join_all(
611 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
612 )
613 .await;
614
615 Ok(Messages {
616 start: http_response.start,
617 end: http_response.end,
618 chunk,
619 state: http_response.state,
620 })
621 }
622
623 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
633 where
634 Ev: SyncEvent + DeserializeOwned + Send + 'static,
635 H: EventHandler<Ev, Ctx>,
636 {
637 self.client.add_room_event_handler(self.room_id(), handler)
638 }
639
640 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
645 self.client.subscribe_to_room_updates(self.room_id())
646 }
647
648 pub fn subscribe_to_typing_notifications(
654 &self,
655 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
656 let (sender, receiver) = broadcast::channel(16);
657 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
658 let own_user_id = self.own_user_id().to_owned();
659 move |event: SyncTypingEvent| async move {
660 let typing_user_ids = event
662 .content
663 .user_ids
664 .into_iter()
665 .filter(|user_id| *user_id != own_user_id)
666 .collect();
667 let _ = sender.send(typing_user_ids);
669 }
670 });
671 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
672 (drop_guard, receiver)
673 }
674
675 #[cfg(feature = "e2e-encryption")]
698 pub async fn subscribe_to_identity_status_changes(
699 &self,
700 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
701 IdentityStatusChanges::create_stream(self.clone()).await
702 }
703
704 #[cfg(not(feature = "experimental-encrypted-state-events"))]
709 #[allow(clippy::unused_async)] async fn try_decrypt_event(
711 &self,
712 event: Raw<AnyTimelineEvent>,
713 push_ctx: Option<&PushContext>,
714 ) -> TimelineEvent {
715 #[cfg(feature = "e2e-encryption")]
716 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
717 SyncMessageLikeEvent::Original(_),
718 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
719 && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
720 {
721 return event;
722 }
723
724 let mut event = TimelineEvent::from_plaintext(event.cast());
725 if let Some(push_ctx) = push_ctx {
726 event.set_push_actions(push_ctx.for_event(event.raw()).await);
727 }
728
729 event
730 }
731
732 #[cfg(feature = "experimental-encrypted-state-events")]
737 #[allow(clippy::unused_async)] async fn try_decrypt_event(
739 &self,
740 event: Raw<AnyTimelineEvent>,
741 push_ctx: Option<&PushContext>,
742 ) -> TimelineEvent {
743 match event.deserialize_as::<AnySyncTimelineEvent>() {
745 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
746 SyncMessageLikeEvent::Original(_),
747 ))) => {
748 if let Ok(event) = self
749 .decrypt_event(
750 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
751 push_ctx,
752 )
753 .await
754 {
755 return event;
756 }
757 }
758 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
759 SyncStateEvent::Original(_),
760 ))) => {
761 if let Ok(event) = self
762 .decrypt_event(
763 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
764 push_ctx,
765 )
766 .await
767 {
768 return event;
769 }
770 }
771 _ => {}
772 }
773
774 let mut event = TimelineEvent::from_plaintext(event.cast());
775 if let Some(push_ctx) = push_ctx {
776 event.set_push_actions(push_ctx.for_event(event.raw()).await);
777 }
778
779 event
780 }
781
782 pub async fn event(
787 &self,
788 event_id: &EventId,
789 request_config: Option<RequestConfig>,
790 ) -> Result<TimelineEvent> {
791 let request =
792 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
793
794 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
795 let push_ctx = self.push_context().await?;
796 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
797
798 if let Ok((cache, _handles)) = self.event_cache().await {
800 cache.save_events([event.clone()]).await;
801 }
802
803 Ok(event)
804 }
805
806 pub async fn load_or_fetch_event(
813 &self,
814 event_id: &EventId,
815 request_config: Option<RequestConfig>,
816 ) -> Result<TimelineEvent> {
817 match self.event_cache().await {
818 Ok((event_cache, _drop_handles)) => {
819 if let Some(event) = event_cache.find_event(event_id).await? {
820 return Ok(event);
821 }
822 }
824 Err(err) => {
825 debug!("error when getting the event cache: {err}");
826 }
827 }
828 self.event(event_id, request_config).await
829 }
830
831 pub async fn event_with_context(
834 &self,
835 event_id: &EventId,
836 lazy_load_members: bool,
837 context_size: UInt,
838 request_config: Option<RequestConfig>,
839 ) -> Result<EventWithContextResponse> {
840 let mut request =
841 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
842
843 request.limit = context_size;
844
845 if lazy_load_members {
846 request.filter.lazy_load_options =
847 LazyLoadOptions::Enabled { include_redundant_members: false };
848 }
849
850 let response = self.client.send(request).with_request_config(request_config).await?;
851
852 let push_ctx = self.push_context().await?;
853 let push_ctx = push_ctx.as_ref();
854 let target_event = if let Some(event) = response.event {
855 Some(self.try_decrypt_event(event, push_ctx).await)
856 } else {
857 None
858 };
859
860 let (events_before, events_after) = join!(
864 join_all(
865 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
866 ),
867 join_all(
868 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
869 ),
870 );
871
872 if let Ok((cache, _handles)) = self.event_cache().await {
874 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
875 if let Some(event) = &target_event {
876 events_to_save.push(event.clone());
877 }
878
879 for event in &events_before {
880 events_to_save.push(event.clone());
881 }
882
883 for event in &events_after {
884 events_to_save.push(event.clone());
885 }
886
887 cache.save_events(events_to_save).await;
888 }
889
890 Ok(EventWithContextResponse {
891 event: target_event,
892 events_before,
893 events_after,
894 state: response.state,
895 prev_batch_token: response.start,
896 next_batch_token: response.end,
897 })
898 }
899
900 pub(crate) async fn request_members(&self) -> Result<()> {
901 self.client
902 .locks()
903 .members_request_deduplicated_handler
904 .run(self.room_id().to_owned(), async move {
905 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
906 let response = self
907 .client
908 .send(request.clone())
909 .with_request_config(
910 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
913 )
914 .await?;
915
916 Box::pin(self.client.base_client().receive_all_members(
918 self.room_id(),
919 &request,
920 &response,
921 ))
922 .await?;
923
924 Ok(())
925 })
926 .await
927 }
928
929 pub async fn request_encryption_state(&self) -> Result<()> {
934 if !self.inner.encryption_state().is_unknown() {
935 return Ok(());
936 }
937
938 self.client
939 .locks()
940 .encryption_state_deduplicated_handler
941 .run(self.room_id().to_owned(), async move {
942 let request = get_state_event_for_key::v3::Request::new(
944 self.room_id().to_owned(),
945 StateEventType::RoomEncryption,
946 "".to_owned(),
947 );
948 let response = match self.client.send(request).await {
949 Ok(response) => Some(
950 response
951 .into_content()
952 .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
953 ),
954 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
955 Err(err) => return Err(err.into()),
956 };
957
958 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
959
960 let mut room_info = self.clone_info();
963 room_info.mark_encryption_state_synced();
964 room_info.set_encryption_event(response.clone());
965 let mut changes = StateChanges::default();
966 changes.add_room(room_info.clone());
967
968 self.client.state_store().save_changes(&changes).await?;
969 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
970
971 Ok(())
972 })
973 .await
974 }
975
976 pub fn encryption_state(&self) -> EncryptionState {
981 self.inner.encryption_state()
982 }
983
984 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
990 self.request_encryption_state().await?;
991
992 Ok(self.encryption_state())
993 }
994
995 #[cfg(feature = "e2e-encryption")]
997 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
998 let encryption = self.client.encryption();
999
1000 let this_device_is_verified = match encryption.get_own_device().await {
1001 Ok(Some(device)) => device.is_verified_with_cross_signing(),
1002
1003 _ => true,
1005 };
1006
1007 let backup_exists_on_server =
1008 encryption.backups().exists_on_server().await.unwrap_or(false);
1009
1010 CryptoContextInfo {
1011 device_creation_ts: encryption.device_creation_timestamp().await,
1012 this_device_is_verified,
1013 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1014 backup_exists_on_server,
1015 }
1016 }
1017
1018 fn are_events_visible(&self) -> bool {
1019 if let RoomState::Invited = self.inner.state() {
1020 return matches!(
1021 self.inner.history_visibility_or_default(),
1022 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1023 );
1024 }
1025
1026 true
1027 }
1028
1029 pub async fn sync_members(&self) -> Result<()> {
1035 if !self.are_events_visible() {
1036 return Ok(());
1037 }
1038
1039 if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1040 }
1041
1042 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1056 self.sync_members().await?;
1057 self.get_member_no_sync(user_id).await
1058 }
1059
1060 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1074 Ok(self
1075 .inner
1076 .get_member(user_id)
1077 .await?
1078 .map(|member| RoomMember::new(self.client.clone(), member)))
1079 }
1080
1081 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1090 self.sync_members().await?;
1091 self.members_no_sync(memberships).await
1092 }
1093
1094 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1103 Ok(self
1104 .inner
1105 .members(memberships)
1106 .await?
1107 .into_iter()
1108 .map(|member| RoomMember::new(self.client.clone(), member))
1109 .collect())
1110 }
1111
1112 pub async fn set_own_member_display_name(
1117 &self,
1118 display_name: Option<String>,
1119 ) -> Result<send_state_event::v3::Response> {
1120 let user_id = self.own_user_id();
1121 let member_event =
1122 self.get_state_event_static_for_key::<RoomMemberEventContent, _>(user_id).await?;
1123
1124 let Some(RawSyncOrStrippedState::Sync(raw_event)) = member_event else {
1125 return Err(Error::InsufficientData);
1126 };
1127
1128 let event = raw_event.deserialize()?;
1129
1130 let mut content = match event {
1131 SyncStateEvent::Original(original_event) => original_event.content,
1132 SyncStateEvent::Redacted(redacted_event) => {
1133 RoomMemberEventContent::new(redacted_event.content.membership)
1134 }
1135 };
1136
1137 content.displayname = display_name;
1138 self.send_state_event_for_key(user_id, content).await
1139 }
1140
1141 pub async fn get_state_events(
1143 &self,
1144 event_type: StateEventType,
1145 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1146 self.client
1147 .state_store()
1148 .get_state_events(self.room_id(), event_type)
1149 .await
1150 .map_err(Into::into)
1151 }
1152
1153 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1170 where
1171 C: StaticEventContent<IsPrefix = ruma::events::False>
1172 + StaticStateEventContent
1173 + RedactContent,
1174 C::Redacted: RedactedStateEventContent,
1175 {
1176 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1177 }
1178
1179 pub async fn get_state_events_for_keys(
1182 &self,
1183 event_type: StateEventType,
1184 state_keys: &[&str],
1185 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1186 self.client
1187 .state_store()
1188 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1189 .await
1190 .map_err(Into::into)
1191 }
1192
1193 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1213 &self,
1214 state_keys: I,
1215 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1216 where
1217 C: StaticEventContent<IsPrefix = ruma::events::False>
1218 + StaticStateEventContent
1219 + RedactContent,
1220 C::StateKey: Borrow<K>,
1221 C::Redacted: RedactedStateEventContent,
1222 K: AsRef<str> + Sized + Sync + 'a,
1223 I: IntoIterator<Item = &'a K> + Send,
1224 I::IntoIter: Send,
1225 {
1226 Ok(self
1227 .client
1228 .state_store()
1229 .get_state_events_for_keys_static(self.room_id(), state_keys)
1230 .await?)
1231 }
1232
1233 pub async fn get_state_event(
1235 &self,
1236 event_type: StateEventType,
1237 state_key: &str,
1238 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1239 self.client
1240 .state_store()
1241 .get_state_event(self.room_id(), event_type, state_key)
1242 .await
1243 .map_err(Into::into)
1244 }
1245
1246 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1265 where
1266 C: StaticEventContent<IsPrefix = ruma::events::False>
1267 + StaticStateEventContent<StateKey = EmptyStateKey>
1268 + RedactContent,
1269 C::Redacted: RedactedStateEventContent,
1270 {
1271 self.get_state_event_static_for_key(&EmptyStateKey).await
1272 }
1273
1274 pub async fn get_state_event_static_for_key<C, K>(
1294 &self,
1295 state_key: &K,
1296 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1297 where
1298 C: StaticEventContent<IsPrefix = ruma::events::False>
1299 + StaticStateEventContent
1300 + RedactContent,
1301 C::StateKey: Borrow<K>,
1302 C::Redacted: RedactedStateEventContent,
1303 K: AsRef<str> + ?Sized + Sync,
1304 {
1305 Ok(self
1306 .client
1307 .state_store()
1308 .get_state_event_static_for_key(self.room_id(), state_key)
1309 .await?)
1310 }
1311
1312 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1316 Ok(self
1321 .get_state_events_static::<SpaceParentEventContent>()
1322 .await?
1323 .into_iter()
1324 .filter_map(|parent_event| match parent_event.deserialize() {
1326 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1327 Some((e.state_key.to_owned(), e.sender))
1328 }
1329 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1330 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1331 Err(e) => {
1332 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1333 None
1334 }
1335 })
1336 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1338 let Some(parent_room) = self.client.get_room(&state_key) else {
1339 return Ok(ParentSpace::Unverifiable(state_key));
1342 };
1343 if let Some(child_event) = parent_room
1346 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1347 .await?
1348 {
1349 match child_event.deserialize() {
1350 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1351 return Ok(ParentSpace::Reciprocal(parent_room));
1354 }
1355 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1356 Ok(SyncOrStrippedState::Stripped(_)) => {}
1357 Err(e) => {
1358 info!(
1359 room_id = ?self.room_id(), parent_room_id = ?state_key,
1360 "Could not deserialize m.space.child: {e}"
1361 );
1362 }
1363 }
1364 }
1369
1370 let Some(member) = parent_room.get_member(&sender).await? else {
1373 return Ok(ParentSpace::Illegitimate(parent_room));
1375 };
1376
1377 if member.can_send_state(StateEventType::SpaceChild) {
1378 Ok(ParentSpace::WithPowerlevel(parent_room))
1380 } else {
1381 Ok(ParentSpace::Illegitimate(parent_room))
1382 }
1383 })
1384 .collect::<FuturesUnordered<_>>())
1385 }
1386
1387 pub async fn account_data(
1389 &self,
1390 data_type: RoomAccountDataEventType,
1391 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1392 self.client
1393 .state_store()
1394 .get_room_account_data_event(self.room_id(), data_type)
1395 .await
1396 .map_err(Into::into)
1397 }
1398
1399 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1418 where
1419 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1420 {
1421 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1422 }
1423
1424 #[cfg(feature = "e2e-encryption")]
1429 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1430 let user_ids = self
1431 .client
1432 .state_store()
1433 .get_user_ids(self.room_id(), RoomMemberships::empty())
1434 .await?;
1435
1436 for user_id in user_ids {
1437 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1438 let any_unverified = devices.devices().any(|d| !d.is_verified());
1439
1440 if any_unverified {
1441 return Ok(false);
1442 }
1443 }
1444
1445 Ok(true)
1446 }
1447
1448 pub async fn set_account_data<T>(
1463 &self,
1464 content: T,
1465 ) -> Result<set_room_account_data::v3::Response>
1466 where
1467 T: RoomAccountDataEventContent,
1468 {
1469 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1470
1471 let request = set_room_account_data::v3::Request::new(
1472 own_user.to_owned(),
1473 self.room_id().to_owned(),
1474 &content,
1475 )?;
1476
1477 Ok(self.client.send(request).await?)
1478 }
1479
1480 pub async fn set_account_data_raw(
1505 &self,
1506 event_type: RoomAccountDataEventType,
1507 content: Raw<AnyRoomAccountDataEventContent>,
1508 ) -> Result<set_room_account_data::v3::Response> {
1509 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1510
1511 let request = set_room_account_data::v3::Request::new_raw(
1512 own_user.to_owned(),
1513 self.room_id().to_owned(),
1514 event_type,
1515 content,
1516 );
1517
1518 Ok(self.client.send(request).await?)
1519 }
1520
1521 pub async fn set_tag(
1552 &self,
1553 tag: TagName,
1554 tag_info: TagInfo,
1555 ) -> Result<create_tag::v3::Response> {
1556 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1557 let request = create_tag::v3::Request::new(
1558 user_id.to_owned(),
1559 self.inner.room_id().to_owned(),
1560 tag.to_string(),
1561 tag_info,
1562 );
1563 Ok(self.client.send(request).await?)
1564 }
1565
1566 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1573 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1574 let request = delete_tag::v3::Request::new(
1575 user_id.to_owned(),
1576 self.inner.room_id().to_owned(),
1577 tag.to_string(),
1578 );
1579 Ok(self.client.send(request).await?)
1580 }
1581
1582 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1592 if is_favourite {
1593 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1594
1595 self.set_tag(TagName::Favorite, tag_info).await?;
1596
1597 if self.is_low_priority() {
1598 self.remove_tag(TagName::LowPriority).await?;
1599 }
1600 } else {
1601 self.remove_tag(TagName::Favorite).await?;
1602 }
1603 Ok(())
1604 }
1605
1606 pub async fn set_is_low_priority(
1616 &self,
1617 is_low_priority: bool,
1618 tag_order: Option<f64>,
1619 ) -> Result<()> {
1620 if is_low_priority {
1621 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1622
1623 self.set_tag(TagName::LowPriority, tag_info).await?;
1624
1625 if self.is_favourite() {
1626 self.remove_tag(TagName::Favorite).await?;
1627 }
1628 } else {
1629 self.remove_tag(TagName::LowPriority).await?;
1630 }
1631 Ok(())
1632 }
1633
1634 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1643 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1644
1645 let mut content = self
1646 .client
1647 .account()
1648 .account_data::<DirectEventContent>()
1649 .await?
1650 .map(|c| c.deserialize())
1651 .transpose()?
1652 .unwrap_or_default();
1653
1654 let this_room_id = self.inner.room_id();
1655
1656 if is_direct {
1657 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1658 room_members.retain(|member| member.user_id() != self.own_user_id());
1659
1660 for member in room_members {
1661 let entry = content.entry(member.user_id().into()).or_default();
1662 if !entry.iter().any(|room_id| room_id == this_room_id) {
1663 entry.push(this_room_id.to_owned());
1664 }
1665 }
1666 } else {
1667 for (_, list) in content.iter_mut() {
1668 list.retain(|room_id| *room_id != this_room_id);
1669 }
1670
1671 content.retain(|_, list| !list.is_empty());
1673 }
1674
1675 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1676
1677 self.client.send(request).await?;
1678 Ok(())
1679 }
1680
1681 #[cfg(feature = "e2e-encryption")]
1689 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1690 pub async fn decrypt_event(
1691 &self,
1692 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1693 push_ctx: Option<&PushContext>,
1694 ) -> Result<TimelineEvent> {
1695 let machine = self.client.olm_machine().await;
1696 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1697
1698 match machine
1699 .try_decrypt_room_event(
1700 event.cast_ref(),
1701 self.inner.room_id(),
1702 self.client.decryption_settings(),
1703 )
1704 .await?
1705 {
1706 RoomEventDecryptionResult::Decrypted(decrypted) => {
1707 let push_actions = if let Some(push_ctx) = push_ctx {
1708 Some(push_ctx.for_event(&decrypted.event).await)
1709 } else {
1710 None
1711 };
1712 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1713 }
1714 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1715 self.client
1716 .encryption()
1717 .backups()
1718 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1719 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1720 }
1721 }
1722 }
1723
1724 #[cfg(feature = "experimental-encrypted-state-events")]
1732 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1733 &self,
1734 event: &Raw<T>,
1735 push_ctx: Option<&PushContext>,
1736 ) -> Result<TimelineEvent> {
1737 let machine = self.client.olm_machine().await;
1738 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1739
1740 match machine
1741 .try_decrypt_room_event(
1742 event.cast_ref(),
1743 self.inner.room_id(),
1744 self.client.decryption_settings(),
1745 )
1746 .await?
1747 {
1748 RoomEventDecryptionResult::Decrypted(decrypted) => {
1749 let push_actions = if let Some(push_ctx) = push_ctx {
1750 Some(push_ctx.for_event(&decrypted.event).await)
1751 } else {
1752 None
1753 };
1754 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1755 }
1756 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1757 self.client
1758 .encryption()
1759 .backups()
1760 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1761 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1764 }
1765 }
1766 }
1767
1768 #[cfg(feature = "e2e-encryption")]
1781 pub async fn get_encryption_info(
1782 &self,
1783 session_id: &str,
1784 sender: &UserId,
1785 ) -> Option<Arc<EncryptionInfo>> {
1786 let machine = self.client.olm_machine().await;
1787 let machine = machine.as_ref()?;
1788 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1789 }
1790
1791 #[cfg(feature = "e2e-encryption")]
1804 pub async fn discard_room_key(&self) -> Result<()> {
1805 let machine = self.client.olm_machine().await;
1806 if let Some(machine) = machine.as_ref() {
1807 machine.discard_room_key(self.inner.room_id()).await?;
1808 Ok(())
1809 } else {
1810 Err(Error::NoOlmMachine)
1811 }
1812 }
1813
1814 #[instrument(skip_all)]
1822 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1823 let request = assign!(
1824 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1825 { reason: reason.map(ToOwned::to_owned) }
1826 );
1827 self.client.send(request).await?;
1828 Ok(())
1829 }
1830
1831 #[instrument(skip_all)]
1839 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1840 let request = assign!(
1841 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1842 { reason: reason.map(ToOwned::to_owned) }
1843 );
1844 self.client.send(request).await?;
1845 Ok(())
1846 }
1847
1848 #[instrument(skip_all)]
1857 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1858 let request = assign!(
1859 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1860 { reason: reason.map(ToOwned::to_owned) }
1861 );
1862 self.client.send(request).await?;
1863 Ok(())
1864 }
1865
1866 #[instrument(skip_all)]
1872 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1873 #[cfg(feature = "e2e-encryption")]
1874 if self.client.inner.enable_share_history_on_invite {
1875 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1876 }
1877
1878 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1879 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1880 self.client.send(request).await?;
1881
1882 self.mark_members_missing();
1886
1887 Ok(())
1888 }
1889
1890 #[instrument(skip_all)]
1896 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1897 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1898 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1899 self.client.send(request).await?;
1900
1901 self.mark_members_missing();
1905
1906 Ok(())
1907 }
1908
1909 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1944 self.ensure_room_joined()?;
1945
1946 let send = if let Some(typing_time) =
1949 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1950 {
1951 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1952 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1956 } else {
1957 !typing
1959 }
1960 } else {
1961 typing
1964 };
1965
1966 if send {
1967 self.send_typing_notice(typing).await?;
1968 }
1969
1970 Ok(())
1971 }
1972
1973 #[instrument(name = "typing_notice", skip(self))]
1974 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1975 let typing = if typing {
1976 self.client
1977 .inner
1978 .typing_notice_times
1979 .write()
1980 .unwrap()
1981 .insert(self.room_id().to_owned(), Instant::now());
1982 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1983 } else {
1984 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1985 Typing::No
1986 };
1987
1988 let request = create_typing_event::v3::Request::new(
1989 self.own_user_id().to_owned(),
1990 self.room_id().to_owned(),
1991 typing,
1992 );
1993
1994 self.client.send(request).await?;
1995
1996 Ok(())
1997 }
1998
1999 #[instrument(skip_all)]
2016 pub async fn send_single_receipt(
2017 &self,
2018 receipt_type: create_receipt::v3::ReceiptType,
2019 thread: ReceiptThread,
2020 event_id: OwnedEventId,
2021 ) -> Result<()> {
2022 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
2025
2026 self.client
2027 .inner
2028 .locks
2029 .read_receipt_deduplicated_handler
2030 .run((request_key, event_id.clone()), async {
2031 let is_unthreaded = thread == ReceiptThread::Unthreaded;
2033
2034 let mut request = create_receipt::v3::Request::new(
2035 self.room_id().to_owned(),
2036 receipt_type,
2037 event_id,
2038 );
2039 request.thread = thread;
2040
2041 self.client.send(request).await?;
2042
2043 if is_unthreaded {
2044 self.set_unread_flag(false).await?;
2045 }
2046
2047 Ok(())
2048 })
2049 .await
2050 }
2051
2052 #[instrument(skip_all)]
2062 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2063 if receipts.is_empty() {
2064 return Ok(());
2065 }
2066
2067 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2068 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2069 fully_read,
2070 read_receipt: public_read_receipt,
2071 private_read_receipt,
2072 });
2073
2074 self.client.send(request).await?;
2075
2076 self.set_unread_flag(false).await?;
2077
2078 Ok(())
2079 }
2080
2081 #[allow(unused_variables, unused_mut)]
2085 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2086 use ruma::{
2087 EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2088 };
2089 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2090
2091 if !self.latest_encryption_state().await?.is_encrypted() {
2092 let mut content =
2093 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2094 #[cfg(feature = "experimental-encrypted-state-events")]
2095 if encrypted_state_events {
2096 content = content.with_encrypted_state();
2097 }
2098 self.send_state_event(content).await?;
2099
2100 let res = timeout(
2107 async {
2108 loop {
2109 self.client.inner.sync_beat.listen().await;
2111 let _state_store_lock =
2112 self.client.base_client().state_store_lock().lock().await;
2113
2114 if !self.inner.encryption_state().is_unknown() {
2115 break;
2116 }
2117 }
2118 },
2119 SYNC_WAIT_TIME,
2120 )
2121 .await;
2122
2123 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2124
2125 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2127 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2128 debug!("room successfully marked as encrypted");
2129 return Ok(());
2130 }
2131
2132 #[cfg(feature = "experimental-encrypted-state-events")]
2134 if res.is_ok() && {
2135 if encrypted_state_events {
2136 self.inner.encryption_state().is_state_encrypted()
2137 } else {
2138 self.inner.encryption_state().is_encrypted()
2139 }
2140 } {
2141 debug!("room successfully marked as encrypted");
2142 return Ok(());
2143 }
2144
2145 debug!("still not marked as encrypted, marking encryption state as missing");
2150
2151 let mut room_info = self.clone_info();
2152 room_info.mark_encryption_state_missing();
2153 let mut changes = StateChanges::default();
2154 changes.add_room(room_info.clone());
2155
2156 self.client.state_store().save_changes(&changes).await?;
2157 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2158 }
2159
2160 Ok(())
2161 }
2162
2163 #[instrument(skip_all)]
2195 pub async fn enable_encryption(&self) -> Result<()> {
2196 self.enable_encryption_inner(false).await
2197 }
2198
2199 #[instrument(skip_all)]
2232 #[cfg(feature = "experimental-encrypted-state-events")]
2233 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2234 self.enable_encryption_inner(true).await
2235 }
2236
2237 #[cfg(feature = "e2e-encryption")]
2246 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2247 async fn preshare_room_key(&self) -> Result<()> {
2248 self.ensure_room_joined()?;
2249
2250 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2252 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2253
2254 self.client
2255 .locks()
2256 .group_session_deduplicated_handler
2257 .run(self.room_id().to_owned(), async move {
2258 {
2259 let members = self
2260 .client
2261 .state_store()
2262 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2263 .await?;
2264 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2265 };
2266
2267 let response = self.share_room_key().await;
2268
2269 if let Err(r) = response {
2273 let machine = self.client.olm_machine().await;
2274 if let Some(machine) = machine.as_ref() {
2275 machine.discard_room_key(self.room_id()).await?;
2276 }
2277 return Err(r);
2278 }
2279
2280 Ok(())
2281 })
2282 .await
2283 }
2284
2285 #[cfg(feature = "e2e-encryption")]
2291 #[instrument(skip_all)]
2292 async fn share_room_key(&self) -> Result<()> {
2293 self.ensure_room_joined()?;
2294
2295 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2296
2297 for request in requests {
2298 let response = self.client.send_to_device(&request).await?;
2299 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2300 }
2301
2302 Ok(())
2303 }
2304
2305 #[instrument(skip_all)]
2314 pub async fn sync_up(&self) {
2315 while !self.is_synced() && self.state() == RoomState::Joined {
2316 let wait_for_beat = self.client.inner.sync_beat.listen();
2317 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2319 }
2320 }
2321
2322 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2393 SendMessageLikeEvent::new(self, content)
2394 }
2395
2396 #[cfg(feature = "e2e-encryption")]
2399 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2400 let olm = self.client.olm_machine().await;
2401 let olm = olm.as_ref().expect("Olm machine wasn't started");
2402
2403 let members =
2404 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2405
2406 let tracked: HashMap<_, _> = olm
2407 .store()
2408 .load_tracked_users()
2409 .await?
2410 .into_iter()
2411 .map(|tracked| (tracked.user_id, tracked.dirty))
2412 .collect();
2413
2414 let members_with_unknown_devices =
2417 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2418
2419 let (req_id, request) =
2420 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2421
2422 if !request.device_keys.is_empty() {
2423 self.client.keys_query(&req_id, request.device_keys).await?;
2424 }
2425
2426 Ok(())
2427 }
2428
2429 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2473 pub fn send_raw<'a>(
2474 &'a self,
2475 event_type: &'a str,
2476 content: impl IntoRawMessageLikeEventContent,
2477 ) -> SendRawMessageLikeEvent<'a> {
2478 SendRawMessageLikeEvent::new(self, event_type, content)
2481 }
2482
2483 #[instrument(skip_all)]
2531 pub fn send_attachment<'a>(
2532 &'a self,
2533 filename: impl Into<String>,
2534 content_type: &'a Mime,
2535 data: Vec<u8>,
2536 config: AttachmentConfig,
2537 ) -> SendAttachment<'a> {
2538 SendAttachment::new(self, filename.into(), content_type, data, config)
2539 }
2540
2541 #[instrument(skip_all)]
2569 pub(super) async fn prepare_and_send_attachment<'a>(
2570 &'a self,
2571 filename: String,
2572 content_type: &'a Mime,
2573 data: Vec<u8>,
2574 mut config: AttachmentConfig,
2575 send_progress: SharedObservable<TransmissionProgress>,
2576 store_in_cache: bool,
2577 ) -> Result<send_message_event::v3::Response> {
2578 self.ensure_room_joined()?;
2579
2580 let txn_id = config.txn_id.take();
2581 let mentions = config.mentions.take();
2582
2583 let thumbnail = config.thumbnail.take();
2584
2585 let thumbnail_cache_info = if store_in_cache {
2587 thumbnail
2588 .as_ref()
2589 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2590 } else {
2591 None
2592 };
2593
2594 #[cfg(feature = "e2e-encryption")]
2595 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2596 self.client
2597 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2598 .await?
2599 } else {
2600 self.client
2601 .media()
2602 .upload_plain_media_and_thumbnail(
2603 content_type,
2604 data.clone(),
2607 thumbnail,
2608 send_progress,
2609 )
2610 .await?
2611 };
2612
2613 #[cfg(not(feature = "e2e-encryption"))]
2614 let (media_source, thumbnail) = self
2615 .client
2616 .media()
2617 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2618 .await?;
2619
2620 if store_in_cache {
2621 let media_store_lock_guard = self.client.media_store().lock().await?;
2622
2623 debug!("caching the media");
2627 let request =
2628 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2629
2630 if let Err(err) = media_store_lock_guard
2631 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2632 .await
2633 {
2634 warn!("unable to cache the media after uploading it: {err}");
2635 }
2636
2637 if let Some(((data, height, width), source)) =
2638 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2639 {
2640 debug!("caching the thumbnail");
2641
2642 let request = MediaRequestParameters {
2643 source: source.clone(),
2644 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2645 };
2646
2647 if let Err(err) = media_store_lock_guard
2648 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2649 .await
2650 {
2651 warn!("unable to cache the media after uploading it: {err}");
2652 }
2653 }
2654 }
2655
2656 let content = self
2657 .make_media_event(
2658 Room::make_attachment_type(
2659 content_type,
2660 filename,
2661 media_source,
2662 config.caption,
2663 config.info,
2664 thumbnail,
2665 ),
2666 mentions,
2667 config.reply,
2668 )
2669 .await?;
2670
2671 let mut fut = self.send(content);
2672 if let Some(txn_id) = txn_id {
2673 fut = fut.with_transaction_id(txn_id);
2674 }
2675
2676 fut.await.map(|result| result.response)
2677 }
2678
2679 #[allow(clippy::too_many_arguments)]
2682 pub(crate) fn make_attachment_type(
2683 content_type: &Mime,
2684 filename: String,
2685 source: MediaSource,
2686 caption: Option<TextMessageEventContent>,
2687 info: Option<AttachmentInfo>,
2688 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2689 ) -> MessageType {
2690 make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2691 }
2692
2693 pub(crate) async fn make_media_event(
2696 &self,
2697 msg_type: MessageType,
2698 mentions: Option<Mentions>,
2699 reply: Option<Reply>,
2700 ) -> Result<RoomMessageEventContent> {
2701 let mut content = RoomMessageEventContent::new(msg_type);
2702 if let Some(mentions) = mentions {
2703 content = content.add_mentions(mentions);
2704 }
2705 if let Some(reply) = reply {
2706 content = self.make_reply_event(content.into(), reply).await?;
2709 }
2710 Ok(content)
2711 }
2712
2713 #[cfg(feature = "unstable-msc4274")]
2716 #[allow(clippy::too_many_arguments)]
2717 pub(crate) fn make_gallery_item_type(
2718 content_type: &Mime,
2719 filename: String,
2720 source: MediaSource,
2721 caption: Option<TextMessageEventContent>,
2722 info: Option<AttachmentInfo>,
2723 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2724 ) -> GalleryItemType {
2725 make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2726 }
2727
2728 pub async fn update_power_levels(
2737 &self,
2738 updates: Vec<(&UserId, Int)>,
2739 ) -> Result<send_state_event::v3::Response> {
2740 let mut power_levels = self.power_levels().await?;
2741
2742 for (user_id, new_level) in updates {
2743 if new_level == power_levels.users_default {
2744 power_levels.users.remove(user_id);
2745 } else {
2746 power_levels.users.insert(user_id.to_owned(), new_level);
2747 }
2748 }
2749
2750 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2751 }
2752
2753 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2758 let mut power_levels = self.power_levels().await?;
2759 power_levels.apply(changes)?;
2760 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2761 Ok(())
2762 }
2763
2764 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2768 let creators = self.creators().unwrap_or_default();
2769 let rules = self.clone_info().room_version_rules_or_default();
2770
2771 let default_power_levels =
2772 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2773 let changes = RoomPowerLevelChanges::from(default_power_levels);
2774 self.apply_power_level_changes(changes).await?;
2775 Ok(self.power_levels().await?)
2776 }
2777
2778 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2783 let power_level = self.get_user_power_level(user_id).await?;
2784 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2785 }
2786
2787 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2792 let event = self.power_levels().await?;
2793 Ok(event.for_user(user_id))
2794 }
2795
2796 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2799 let power_levels = self.power_levels().await.ok();
2800 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2801 if let Some(power_levels) = power_levels {
2802 for (id, level) in power_levels.users.into_iter() {
2803 user_power_levels.insert(id, level.into());
2804 }
2805 }
2806 user_power_levels
2807 }
2808
2809 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2811 self.send_state_event(RoomNameEventContent::new(name)).await
2812 }
2813
2814 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2816 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2817 }
2818
2819 pub async fn set_avatar_url(
2825 &self,
2826 url: &MxcUri,
2827 info: Option<avatar::ImageInfo>,
2828 ) -> Result<send_state_event::v3::Response> {
2829 self.ensure_room_joined()?;
2830
2831 let mut room_avatar_event = RoomAvatarEventContent::new();
2832 room_avatar_event.url = Some(url.to_owned());
2833 room_avatar_event.info = info.map(Box::new);
2834
2835 self.send_state_event(room_avatar_event).await
2836 }
2837
2838 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2840 self.send_state_event(RoomAvatarEventContent::new()).await
2841 }
2842
2843 pub async fn upload_avatar(
2851 &self,
2852 mime: &Mime,
2853 data: Vec<u8>,
2854 info: Option<avatar::ImageInfo>,
2855 ) -> Result<send_state_event::v3::Response> {
2856 self.ensure_room_joined()?;
2857
2858 let upload_response = self.client.media().upload(mime, data, None).await?;
2859 let mut info = info.unwrap_or_default();
2860 info.blurhash = upload_response.blurhash;
2861 info.mimetype = Some(mime.to_string());
2862
2863 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2864 }
2865
2866 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2910 #[instrument(skip_all)]
2911 pub async fn send_state_event(
2912 &self,
2913 content: impl StateEventContent<StateKey = EmptyStateKey>,
2914 ) -> Result<send_state_event::v3::Response> {
2915 self.send_state_event_for_key(&EmptyStateKey, content).await
2916 }
2917
2918 #[cfg(feature = "experimental-encrypted-state-events")]
2969 #[instrument(skip_all)]
2970 pub fn send_state_event<'a>(
2971 &'a self,
2972 content: impl StateEventContent<StateKey = EmptyStateKey>,
2973 ) -> SendStateEvent<'a> {
2974 self.send_state_event_for_key(&EmptyStateKey, content)
2975 }
2976
2977 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3018 pub async fn send_state_event_for_key<C, K>(
3019 &self,
3020 state_key: &K,
3021 content: C,
3022 ) -> Result<send_state_event::v3::Response>
3023 where
3024 C: StateEventContent,
3025 C::StateKey: Borrow<K>,
3026 K: AsRef<str> + ?Sized,
3027 {
3028 self.ensure_room_joined()?;
3029 let request =
3030 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3031 let response = self.client.send(request).await?;
3032 Ok(response)
3033 }
3034
3035 #[cfg(feature = "experimental-encrypted-state-events")]
3084 pub fn send_state_event_for_key<'a, C, K>(
3085 &'a self,
3086 state_key: &K,
3087 content: C,
3088 ) -> SendStateEvent<'a>
3089 where
3090 C: StateEventContent,
3091 C::StateKey: Borrow<K>,
3092 K: AsRef<str> + ?Sized,
3093 {
3094 SendStateEvent::new(self, state_key, content)
3095 }
3096
3097 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3132 #[instrument(skip_all)]
3133 pub async fn send_state_event_raw(
3134 &self,
3135 event_type: &str,
3136 state_key: &str,
3137 content: impl IntoRawStateEventContent,
3138 ) -> Result<send_state_event::v3::Response> {
3139 self.ensure_room_joined()?;
3140
3141 let request = send_state_event::v3::Request::new_raw(
3142 self.room_id().to_owned(),
3143 event_type.into(),
3144 state_key.to_owned(),
3145 content.into_raw_state_event_content(),
3146 );
3147
3148 Ok(self.client.send(request).await?)
3149 }
3150
3151 #[cfg(feature = "experimental-encrypted-state-events")]
3193 #[instrument(skip_all)]
3194 pub fn send_state_event_raw<'a>(
3195 &'a self,
3196 event_type: &'a str,
3197 state_key: &'a str,
3198 content: impl IntoRawStateEventContent,
3199 ) -> SendRawStateEvent<'a> {
3200 SendRawStateEvent::new(self, event_type, state_key, content)
3201 }
3202
3203 #[instrument(skip_all)]
3238 pub async fn redact(
3239 &self,
3240 event_id: &EventId,
3241 reason: Option<&str>,
3242 txn_id: Option<OwnedTransactionId>,
3243 ) -> HttpResult<redact_event::v3::Response> {
3244 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3245 let request = assign!(
3246 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3247 { reason: reason.map(ToOwned::to_owned) }
3248 );
3249
3250 self.client.send(request).await
3251 }
3252
3253 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3262 let acl_ev = self
3263 .get_state_event_static::<RoomServerAclEventContent>()
3264 .await?
3265 .and_then(|ev| ev.deserialize().ok());
3266 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3267 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3268 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3269 });
3270
3271 let members: Vec<_> = self
3275 .members_no_sync(RoomMemberships::JOIN)
3276 .await?
3277 .into_iter()
3278 .filter(|member| {
3279 let server = member.user_id().server_name();
3280 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3281 })
3282 .collect();
3283
3284 let max = members
3287 .iter()
3288 .max_by_key(|member| member.power_level())
3289 .filter(|max| max.power_level() >= int!(50))
3290 .map(|member| member.user_id().server_name());
3291
3292 let servers = members
3294 .iter()
3295 .map(|member| member.user_id().server_name())
3296 .filter(|server| max.filter(|max| max == server).is_none())
3297 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3298 *servers.entry(server).or_default() += 1;
3299 servers
3300 });
3301 let mut servers: Vec<_> = servers.into_iter().collect();
3302 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3303
3304 Ok(max
3305 .into_iter()
3306 .chain(servers.into_iter().map(|(name, _)| name))
3307 .take(3)
3308 .map(ToOwned::to_owned)
3309 .collect())
3310 }
3311
3312 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3319 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3320 return Ok(alias.matrix_to_uri());
3321 }
3322
3323 let via = self.route().await?;
3324 Ok(self.room_id().matrix_to_uri_via(via))
3325 }
3326
3327 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3338 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3339 return Ok(alias.matrix_uri(join));
3340 }
3341
3342 let via = self.route().await?;
3343 Ok(self.room_id().matrix_uri_via(via, join))
3344 }
3345
3346 pub async fn matrix_to_event_permalink(
3360 &self,
3361 event_id: impl Into<OwnedEventId>,
3362 ) -> Result<MatrixToUri> {
3363 let via = self.route().await?;
3366 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3367 }
3368
3369 pub async fn matrix_event_permalink(
3383 &self,
3384 event_id: impl Into<OwnedEventId>,
3385 ) -> Result<MatrixUri> {
3386 let via = self.route().await?;
3389 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3390 }
3391
3392 pub async fn load_user_receipt(
3405 &self,
3406 receipt_type: ReceiptType,
3407 thread: ReceiptThread,
3408 user_id: &UserId,
3409 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3410 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3411 }
3412
3413 pub async fn load_event_receipts(
3426 &self,
3427 receipt_type: ReceiptType,
3428 thread: ReceiptThread,
3429 event_id: &EventId,
3430 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3431 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3432 }
3433
3434 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3439 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3440 }
3441
3442 pub(crate) async fn push_condition_room_ctx_internal(
3449 &self,
3450 with_threads_subscriptions: bool,
3451 ) -> Result<Option<PushConditionRoomCtx>> {
3452 let room_id = self.room_id();
3453 let user_id = self.own_user_id();
3454 let room_info = self.clone_info();
3455 let member_count = room_info.active_members_count();
3456
3457 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3458 member.name().to_owned()
3459 } else {
3460 return Ok(None);
3461 };
3462
3463 let power_levels = match self.power_levels().await {
3464 Ok(power_levels) => Some(power_levels.into()),
3465 Err(error) => {
3466 if matches!(room_info.state(), RoomState::Joined) {
3467 error!("Could not compute power levels for push conditions: {error}");
3470 }
3471 None
3472 }
3473 };
3474
3475 let mut ctx = assign!(PushConditionRoomCtx::new(
3476 room_id.to_owned(),
3477 UInt::new(member_count).unwrap_or(UInt::MAX),
3478 user_id.to_owned(),
3479 user_display_name,
3480 ),
3481 {
3482 power_levels,
3483 });
3484
3485 if with_threads_subscriptions {
3486 let this = self.clone();
3487 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3488 let room = this.clone();
3489 Box::pin(async move {
3490 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3491 maybe_sub.is_some()
3492 } else {
3493 false
3494 }
3495 })
3496 });
3497 }
3498
3499 Ok(Some(ctx))
3500 }
3501
3502 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3505 self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3506 }
3507
3508 #[instrument(skip(self))]
3512 pub(crate) async fn push_context_internal(
3513 &self,
3514 with_threads_subscriptions: bool,
3515 ) -> Result<Option<PushContext>> {
3516 let Some(push_condition_room_ctx) =
3517 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3518 else {
3519 debug!("Could not aggregate push context");
3520 return Ok(None);
3521 };
3522 let push_rules = self.client().account().push_rules().await?;
3523 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3524 }
3525
3526 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3531 if let Some(ctx) = self.push_context().await? {
3532 Ok(Some(ctx.for_event(event).await))
3533 } else {
3534 Ok(None)
3535 }
3536 }
3537
3538 pub async fn invite_details(&self) -> Result<Invite> {
3541 let state = self.state();
3542
3543 if state != RoomState::Invited {
3544 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3545 }
3546
3547 let invitee = self
3548 .get_member_no_sync(self.own_user_id())
3549 .await?
3550 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3551 let event = invitee.event();
3552 let inviter_id = event.sender();
3553 let inviter = self.get_member_no_sync(inviter_id).await?;
3554 Ok(Invite { invitee, inviter })
3555 }
3556
3557 pub async fn member_with_sender_info(
3565 &self,
3566 user_id: &UserId,
3567 ) -> Result<RoomMemberWithSenderInfo> {
3568 let Some(member) = self.get_member_no_sync(user_id).await? else {
3569 return Err(Error::InsufficientData);
3570 };
3571
3572 let sender_member =
3573 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3574 Some(member)
3576 } else if self.are_members_synced() {
3577 None
3579 } else if self.sync_members().await.is_ok() {
3580 self.get_member_no_sync(member.event().sender()).await?
3582 } else {
3583 None
3584 };
3585
3586 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3587 }
3588
3589 pub async fn forget(&self) -> Result<()> {
3595 let state = self.state();
3596 match state {
3597 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3598 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3599 "Left / Banned",
3600 state,
3601 ))));
3602 }
3603 RoomState::Left | RoomState::Banned => {}
3604 }
3605
3606 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3607 let _response = self.client.send(request).await?;
3608
3609 if self.inner.direct_targets_length() != 0
3611 && let Err(e) = self.set_is_direct(false).await
3612 {
3613 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3616 }
3617
3618 self.client.base_client().forget_room(self.inner.room_id()).await?;
3619
3620 Ok(())
3621 }
3622
3623 fn ensure_room_joined(&self) -> Result<()> {
3624 let state = self.state();
3625 if state == RoomState::Joined {
3626 Ok(())
3627 } else {
3628 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3629 }
3630 }
3631
3632 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3634 if !matches!(self.state(), RoomState::Joined) {
3635 return None;
3636 }
3637
3638 let notification_settings = self.client().notification_settings().await;
3639
3640 let notification_mode =
3642 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3643
3644 if notification_mode.is_some() {
3645 notification_mode
3646 } else if let Ok(is_encrypted) =
3647 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3648 {
3649 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3654 let default_mode = notification_settings
3655 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3656 .await;
3657 Some(default_mode)
3658 } else {
3659 None
3660 }
3661 }
3662
3663 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3674 if !matches!(self.state(), RoomState::Joined) {
3675 return None;
3676 }
3677
3678 let notification_settings = self.client().notification_settings().await;
3679
3680 let mode =
3682 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3683
3684 if let Some(mode) = mode {
3685 self.update_cached_user_defined_notification_mode(mode);
3686 }
3687
3688 mode
3689 }
3690
3691 pub async fn report_content(
3704 &self,
3705 event_id: OwnedEventId,
3706 score: Option<ReportedContentScore>,
3707 reason: Option<String>,
3708 ) -> Result<report_content::v3::Response> {
3709 let state = self.state();
3710 if state != RoomState::Joined {
3711 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3712 }
3713
3714 let request = report_content::v3::Request::new(
3715 self.inner.room_id().to_owned(),
3716 event_id,
3717 score.map(Into::into),
3718 reason,
3719 );
3720 Ok(self.client.send(request).await?)
3721 }
3722
3723 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3734 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3735
3736 Ok(self.client.send(request).await?)
3737 }
3738
3739 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3745 if self.is_marked_unread() == unread {
3746 return Ok(());
3748 }
3749
3750 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3751
3752 let content = MarkedUnreadEventContent::new(unread);
3753
3754 let request = set_room_account_data::v3::Request::new(
3755 user_id.to_owned(),
3756 self.inner.room_id().to_owned(),
3757 &content,
3758 )?;
3759
3760 self.client.send(request).await?;
3761 Ok(())
3762 }
3763
3764 pub async fn event_cache(
3767 &self,
3768 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3769 self.client.event_cache().for_room(self.room_id()).await
3770 }
3771
3772 pub(crate) async fn get_user_beacon_info(
3779 &self,
3780 user_id: &UserId,
3781 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3782 let raw_event = self
3783 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3784 .await?
3785 .ok_or(BeaconError::NotFound)?;
3786
3787 match raw_event.deserialize()? {
3788 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3789 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3790 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3791 }
3792 }
3793
3794 pub async fn start_live_location_share(
3807 &self,
3808 duration_millis: u64,
3809 description: Option<String>,
3810 ) -> Result<send_state_event::v3::Response> {
3811 self.ensure_room_joined()?;
3812
3813 self.send_state_event_for_key(
3814 self.own_user_id(),
3815 BeaconInfoEventContent::new(
3816 description,
3817 Duration::from_millis(duration_millis),
3818 true,
3819 None,
3820 ),
3821 )
3822 .await
3823 }
3824
3825 pub async fn stop_live_location_share(
3832 &self,
3833 ) -> Result<send_state_event::v3::Response, BeaconError> {
3834 self.ensure_room_joined()?;
3835
3836 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3837 beacon_info_event.content.stop();
3838 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3839 }
3840
3841 pub async fn send_location_beacon(
3853 &self,
3854 geo_uri: String,
3855 ) -> Result<send_message_event::v3::Response, BeaconError> {
3856 self.ensure_room_joined()?;
3857
3858 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3859
3860 if beacon_info_event.content.is_live() {
3861 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3862 Ok(self.send(content).await?.response)
3863 } else {
3864 Err(BeaconError::NotLive)
3865 }
3866 }
3867
3868 pub async fn save_composer_draft(
3871 &self,
3872 draft: ComposerDraft,
3873 thread_root: Option<&EventId>,
3874 ) -> Result<()> {
3875 self.client
3876 .state_store()
3877 .set_kv_data(
3878 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3879 StateStoreDataValue::ComposerDraft(draft),
3880 )
3881 .await?;
3882 Ok(())
3883 }
3884
3885 pub async fn load_composer_draft(
3888 &self,
3889 thread_root: Option<&EventId>,
3890 ) -> Result<Option<ComposerDraft>> {
3891 let data = self
3892 .client
3893 .state_store()
3894 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3895 .await?;
3896 Ok(data.and_then(|d| d.into_composer_draft()))
3897 }
3898
3899 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3902 self.client
3903 .state_store()
3904 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3905 .await?;
3906 Ok(())
3907 }
3908
3909 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3912 let response = self
3913 .client
3914 .send(get_state_event_for_key::v3::Request::new(
3915 self.room_id().to_owned(),
3916 StateEventType::RoomPinnedEvents,
3917 "".to_owned(),
3918 ))
3919 .await;
3920
3921 match response {
3922 Ok(response) => Ok(Some(
3923 response
3924 .into_content()
3925 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3926 .pinned,
3927 )),
3928 Err(http_error) => match http_error.as_client_api_error() {
3929 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3930 _ => Err(http_error.into()),
3931 },
3932 }
3933 }
3934
3935 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3943 ObservableLiveLocation::new(&self.client, self.room_id())
3944 }
3945
3946 pub async fn subscribe_to_knock_requests(
3960 &self,
3961 ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
3962 let this = Arc::new(self.clone());
3963
3964 let room_member_events_observer =
3965 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3966
3967 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3968 let mut seen_request_ids_stream = self
3969 .seen_knock_request_ids_map
3970 .subscribe()
3971 .await
3972 .map(|values| values.unwrap_or_default());
3973
3974 let mut room_info_stream = self.subscribe_info();
3975
3976 let clear_seen_ids_handle = spawn({
3979 let this = self.clone();
3980 async move {
3981 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3982 while member_updates_stream.recv().await.is_ok() {
3983 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3985 warn!("Failed to remove seen knock requests: {err}")
3986 }
3987 }
3988 }
3989 });
3990
3991 let combined_stream = stream! {
3992 match this.get_current_join_requests(¤t_seen_ids).await {
3994 Ok(initial_requests) => yield initial_requests,
3995 Err(err) => warn!("Failed to get initial requests to join: {err}")
3996 }
3997
3998 let mut requests_stream = room_member_events_observer.subscribe();
3999 let mut seen_ids = current_seen_ids.clone();
4000
4001 loop {
4002 tokio::select! {
4005 Some((event, _)) = requests_stream.next() => {
4006 if let Some(event) = event.as_original() {
4007 let emit = if event.prev_content().is_some() {
4009 matches!(event.membership_change(),
4010 MembershipChange::Banned |
4011 MembershipChange::Knocked |
4012 MembershipChange::KnockAccepted |
4013 MembershipChange::KnockDenied |
4014 MembershipChange::KnockRetracted
4015 )
4016 } else {
4017 true
4020 };
4021
4022 if emit {
4023 match this.get_current_join_requests(&seen_ids).await {
4024 Ok(requests) => yield requests,
4025 Err(err) => {
4026 warn!("Failed to get updated knock requests on new member event: {err}")
4027 }
4028 }
4029 }
4030 }
4031 }
4032
4033 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4034 seen_ids = new_seen_ids;
4036
4037 match this.get_current_join_requests(&seen_ids).await {
4040 Ok(requests) => yield requests,
4041 Err(err) => {
4042 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4043 }
4044 }
4045 }
4046
4047 Some(room_info) = room_info_stream.next() => {
4048 if !room_info.are_members_synced() {
4051 match this.get_current_join_requests(&seen_ids).await {
4052 Ok(requests) => yield requests,
4053 Err(err) => {
4054 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4055 }
4056 }
4057 }
4058 }
4059 else => break,
4061 }
4062 }
4063 };
4064
4065 Ok((combined_stream, clear_seen_ids_handle))
4066 }
4067
4068 async fn get_current_join_requests(
4069 &self,
4070 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4071 ) -> Result<Vec<KnockRequest>> {
4072 Ok(self
4073 .members(RoomMemberships::KNOCK)
4074 .await?
4075 .into_iter()
4076 .filter_map(|member| {
4077 let event_id = member.event().event_id()?;
4078 Some(KnockRequest::new(
4079 self,
4080 event_id,
4081 member.event().timestamp(),
4082 KnockRequestMemberInfo::from_member(&member),
4083 seen_request_ids.contains_key(event_id),
4084 ))
4085 })
4086 .collect())
4087 }
4088
4089 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4091 RoomPrivacySettings::new(&self.inner, &self.client)
4092 }
4093
4094 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4102 let request = opts.into_request(self.room_id());
4103
4104 let response = self.client.send(request).await?;
4105
4106 let push_ctx = self.push_context().await?;
4107 let chunk = join_all(
4108 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4109 )
4110 .await;
4111
4112 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4113 }
4114
4115 pub async fn relations(
4129 &self,
4130 event_id: OwnedEventId,
4131 opts: RelationsOptions,
4132 ) -> Result<Relations> {
4133 let relations = opts.send(self, event_id).await;
4134
4135 if let Ok(Relations { chunk, .. }) = &relations
4137 && let Ok((cache, _handles)) = self.event_cache().await
4138 {
4139 cache.save_events(chunk.clone()).await;
4140 }
4141
4142 relations
4143 }
4144
4145 #[cfg(feature = "experimental-search")]
4148 pub async fn search(
4149 &self,
4150 query: &str,
4151 max_number_of_results: usize,
4152 pagination_offset: Option<usize>,
4153 ) -> Result<Vec<OwnedEventId>, IndexError> {
4154 let mut search_index_guard = self.client.search_index().lock().await;
4155 search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4156 }
4157
4158 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4180 pub async fn subscribe_thread(
4181 &self,
4182 thread_root: OwnedEventId,
4183 automatic: Option<OwnedEventId>,
4184 ) -> Result<()> {
4185 let is_automatic = automatic.is_some();
4186
4187 match self
4188 .client
4189 .send(subscribe_thread::unstable::Request::new(
4190 self.room_id().to_owned(),
4191 thread_root.clone(),
4192 automatic,
4193 ))
4194 .await
4195 {
4196 Ok(_response) => {
4197 trace!("Server acknowledged the thread subscription; saving in db");
4198
4199 self.client
4201 .state_store()
4202 .upsert_thread_subscriptions(vec![(
4203 self.room_id(),
4204 &thread_root,
4205 StoredThreadSubscription {
4206 status: ThreadSubscriptionStatus::Subscribed {
4207 automatic: is_automatic,
4208 },
4209 bump_stamp: None,
4210 },
4211 )])
4212 .await?;
4213
4214 Ok(())
4215 }
4216
4217 Err(err) => {
4218 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4219 trace!("Thread subscription skipped: {err}");
4224 Ok(())
4225 } else {
4226 Err(err.into())
4228 }
4229 }
4230 }
4231 }
4232
4233 pub async fn subscribe_thread_if_needed(
4239 &self,
4240 thread_root: &EventId,
4241 automatic: Option<OwnedEventId>,
4242 ) -> Result<()> {
4243 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4244 if !prev_sub.automatic || automatic.is_some() {
4247 return Ok(());
4250 }
4251 }
4252 self.subscribe_thread(thread_root.to_owned(), automatic).await
4253 }
4254
4255 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4267 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4268 self.client
4269 .send(unsubscribe_thread::unstable::Request::new(
4270 self.room_id().to_owned(),
4271 thread_root.clone(),
4272 ))
4273 .await?;
4274
4275 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4276
4277 self.client
4279 .state_store()
4280 .upsert_thread_subscriptions(vec![(
4281 self.room_id(),
4282 &thread_root,
4283 StoredThreadSubscription {
4284 status: ThreadSubscriptionStatus::Unsubscribed,
4285 bump_stamp: None,
4286 },
4287 )])
4288 .await?;
4289
4290 Ok(())
4291 }
4292
4293 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4310 pub async fn fetch_thread_subscription(
4311 &self,
4312 thread_root: OwnedEventId,
4313 ) -> Result<Option<ThreadSubscription>> {
4314 let result = self
4315 .client
4316 .send(get_thread_subscription::unstable::Request::new(
4317 self.room_id().to_owned(),
4318 thread_root.clone(),
4319 ))
4320 .await;
4321
4322 let subscription = match result {
4323 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4324 Err(http_error) => match http_error.as_client_api_error() {
4325 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4326 _ => return Err(http_error.into()),
4327 },
4328 };
4329
4330 if let Some(sub) = &subscription {
4332 self.client
4333 .state_store()
4334 .upsert_thread_subscriptions(vec![(
4335 self.room_id(),
4336 &thread_root,
4337 StoredThreadSubscription {
4338 status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4339 bump_stamp: None,
4340 },
4341 )])
4342 .await?;
4343 } else {
4344 self.client
4346 .state_store()
4347 .remove_thread_subscription(self.room_id(), &thread_root)
4348 .await?;
4349 }
4350
4351 Ok(subscription)
4352 }
4353
4354 pub async fn load_or_fetch_thread_subscription(
4361 &self,
4362 thread_root: &EventId,
4363 ) -> Result<Option<ThreadSubscription>> {
4364 if self.client.thread_subscription_catchup().is_outdated() {
4366 return self.fetch_thread_subscription(thread_root.to_owned()).await;
4367 }
4368
4369 Ok(self
4371 .client
4372 .state_store()
4373 .load_thread_subscription(self.room_id(), thread_root)
4374 .await
4375 .map(|maybe_sub| {
4376 maybe_sub.and_then(|stored| match stored.status {
4377 ThreadSubscriptionStatus::Unsubscribed => None,
4378 ThreadSubscriptionStatus::Subscribed { automatic } => {
4379 Some(ThreadSubscription { automatic })
4380 }
4381 })
4382 })?)
4383 }
4384}
4385
4386#[cfg(feature = "e2e-encryption")]
4387impl RoomIdentityProvider for Room {
4388 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4389 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4390 }
4391
4392 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4393 Box::pin(async {
4394 let members = self
4395 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4396 .await
4397 .unwrap_or_else(|_| Default::default());
4398
4399 let mut ret: Vec<UserIdentity> = Vec::new();
4400 for member in members {
4401 if let Some(i) = self.user_identity(member.user_id()).await {
4402 ret.push(i);
4403 }
4404 }
4405 ret
4406 })
4407 }
4408
4409 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4410 Box::pin(async {
4411 self.client
4412 .encryption()
4413 .get_user_identity(user_id)
4414 .await
4415 .unwrap_or(None)
4416 .map(|u| u.underlying_identity())
4417 })
4418 }
4419}
4420
4421#[derive(Clone, Debug)]
4424pub(crate) struct WeakRoom {
4425 client: WeakClient,
4426 room_id: OwnedRoomId,
4427}
4428
4429impl WeakRoom {
4430 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4432 Self { client, room_id }
4433 }
4434
4435 pub fn get(&self) -> Option<Room> {
4437 self.client.get().and_then(|client| client.get_room(&self.room_id))
4438 }
4439
4440 pub fn room_id(&self) -> &RoomId {
4442 &self.room_id
4443 }
4444}
4445
4446#[derive(Debug, Clone)]
4448pub struct Invite {
4449 pub invitee: RoomMember,
4451 pub inviter: Option<RoomMember>,
4453}
4454
4455#[derive(Error, Debug)]
4456enum InvitationError {
4457 #[error("No membership event found")]
4458 EventMissing,
4459}
4460
4461#[derive(Debug, Clone, Default)]
4463#[non_exhaustive]
4464pub struct Receipts {
4465 pub fully_read: Option<OwnedEventId>,
4467 pub public_read_receipt: Option<OwnedEventId>,
4469 pub private_read_receipt: Option<OwnedEventId>,
4471}
4472
4473impl Receipts {
4474 pub fn new() -> Self {
4476 Self::default()
4477 }
4478
4479 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4488 self.fully_read = event_id.into();
4489 self
4490 }
4491
4492 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4498 self.public_read_receipt = event_id.into();
4499 self
4500 }
4501
4502 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4506 self.private_read_receipt = event_id.into();
4507 self
4508 }
4509
4510 pub fn is_empty(&self) -> bool {
4512 self.fully_read.is_none()
4513 && self.public_read_receipt.is_none()
4514 && self.private_read_receipt.is_none()
4515 }
4516}
4517
4518#[derive(Debug)]
4521pub enum ParentSpace {
4522 Reciprocal(Room),
4525 WithPowerlevel(Room),
4530 Illegitimate(Room),
4533 Unverifiable(OwnedRoomId),
4536}
4537
4538#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4542pub struct ReportedContentScore(i8);
4543
4544impl ReportedContentScore {
4545 pub const MIN: Self = Self(-100);
4549
4550 pub const MAX: Self = Self(0);
4554
4555 pub fn new(value: i8) -> Option<Self> {
4564 value.try_into().ok()
4565 }
4566
4567 pub fn new_saturating(value: i8) -> Self {
4573 if value > Self::MAX {
4574 Self::MAX
4575 } else if value < Self::MIN {
4576 Self::MIN
4577 } else {
4578 Self(value)
4579 }
4580 }
4581
4582 pub fn value(&self) -> i8 {
4584 self.0
4585 }
4586}
4587
4588impl PartialEq<i8> for ReportedContentScore {
4589 fn eq(&self, other: &i8) -> bool {
4590 self.0.eq(other)
4591 }
4592}
4593
4594impl PartialEq<ReportedContentScore> for i8 {
4595 fn eq(&self, other: &ReportedContentScore) -> bool {
4596 self.eq(&other.0)
4597 }
4598}
4599
4600impl PartialOrd<i8> for ReportedContentScore {
4601 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4602 self.0.partial_cmp(other)
4603 }
4604}
4605
4606impl PartialOrd<ReportedContentScore> for i8 {
4607 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4608 self.partial_cmp(&other.0)
4609 }
4610}
4611
4612impl From<ReportedContentScore> for Int {
4613 fn from(value: ReportedContentScore) -> Self {
4614 value.0.into()
4615 }
4616}
4617
4618impl TryFrom<i8> for ReportedContentScore {
4619 type Error = TryFromReportedContentScoreError;
4620
4621 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4622 if value > Self::MAX || value < Self::MIN {
4623 Err(TryFromReportedContentScoreError(()))
4624 } else {
4625 Ok(Self(value))
4626 }
4627 }
4628}
4629
4630impl TryFrom<i16> for ReportedContentScore {
4631 type Error = TryFromReportedContentScoreError;
4632
4633 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4634 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4635 value.try_into()
4636 }
4637}
4638
4639impl TryFrom<i32> for ReportedContentScore {
4640 type Error = TryFromReportedContentScoreError;
4641
4642 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4643 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4644 value.try_into()
4645 }
4646}
4647
4648impl TryFrom<i64> for ReportedContentScore {
4649 type Error = TryFromReportedContentScoreError;
4650
4651 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4652 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4653 value.try_into()
4654 }
4655}
4656
4657impl TryFrom<Int> for ReportedContentScore {
4658 type Error = TryFromReportedContentScoreError;
4659
4660 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4661 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4662 value.try_into()
4663 }
4664}
4665
4666trait EventSource {
4667 fn get_event(
4668 &self,
4669 event_id: &EventId,
4670 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4671}
4672
4673impl EventSource for &Room {
4674 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4675 self.load_or_fetch_event(event_id, None).await
4676 }
4677}
4678
4679#[derive(Debug, Clone, Error)]
4682#[error("out of range conversion attempted")]
4683pub struct TryFromReportedContentScoreError(());
4684
4685#[derive(Debug)]
4688pub struct RoomMemberWithSenderInfo {
4689 pub room_member: RoomMember,
4691 pub sender_info: Option<RoomMember>,
4694}
4695
4696#[cfg(all(test, not(target_family = "wasm")))]
4697mod tests {
4698 use std::collections::BTreeMap;
4699
4700 use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4701 use matrix_sdk_test::{
4702 JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
4703 event_factory::EventFactory, test_json,
4704 };
4705 use ruma::{
4706 RoomVersionId, event_id,
4707 events::{relation::RelationType, room::member::MembershipState},
4708 int, owned_event_id, room_id, user_id,
4709 };
4710 use wiremock::{
4711 Mock, MockServer, ResponseTemplate,
4712 matchers::{header, method, path_regex},
4713 };
4714
4715 use super::ReportedContentScore;
4716 use crate::{
4717 Client,
4718 config::RequestConfig,
4719 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4720 test_utils::{
4721 client::mock_matrix_session,
4722 logged_in_client,
4723 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4724 },
4725 };
4726
4727 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4728 #[async_test]
4729 async fn test_cache_invalidation_while_encrypt() {
4730 use matrix_sdk_base::store::RoomLoadSettings;
4731 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4732
4733 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4734 let session = mock_matrix_session();
4735
4736 let client = Client::builder()
4737 .homeserver_url("http://localhost:1234")
4738 .request_config(RequestConfig::new().disable_retry())
4739 .sqlite_store(&sqlite_path, None)
4740 .build()
4741 .await
4742 .unwrap();
4743 client
4744 .matrix_auth()
4745 .restore_session(session.clone(), RoomLoadSettings::default())
4746 .await
4747 .unwrap();
4748
4749 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4750
4751 let server = MockServer::start().await;
4753 {
4754 Mock::given(method("GET"))
4755 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4756 .and(header("authorization", "Bearer 1234"))
4757 .respond_with(
4758 ResponseTemplate::new(200)
4759 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4760 )
4761 .mount(&server)
4762 .await;
4763 let response = SyncResponseBuilder::default()
4764 .add_joined_room(
4765 JoinedRoomBuilder::default()
4766 .add_state_event(StateTestEvent::Member)
4767 .add_state_event(StateTestEvent::PowerLevels)
4768 .add_state_event(StateTestEvent::Encryption),
4769 )
4770 .build_sync_response();
4771 client.base_client().receive_sync_response(response).await.unwrap();
4772 }
4773
4774 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4775
4776 room.preshare_room_key().await.unwrap();
4778
4779 {
4782 let client = Client::builder()
4783 .homeserver_url("http://localhost:1234")
4784 .request_config(RequestConfig::new().disable_retry())
4785 .sqlite_store(&sqlite_path, None)
4786 .build()
4787 .await
4788 .unwrap();
4789 client
4790 .matrix_auth()
4791 .restore_session(session.clone(), RoomLoadSettings::default())
4792 .await
4793 .unwrap();
4794 client
4795 .encryption()
4796 .enable_cross_process_store_lock("client2".to_owned())
4797 .await
4798 .unwrap();
4799
4800 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4801 assert!(guard.is_some());
4802 }
4803
4804 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4806 assert!(guard.is_some());
4807
4808 let olm = client.olm_machine().await;
4810 let olm = olm.as_ref().expect("Olm machine wasn't started");
4811
4812 let _encrypted_content = olm
4815 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4816 .await
4817 .unwrap();
4818 }
4819
4820 #[test]
4821 fn reported_content_score() {
4822 let score = ReportedContentScore::new(0).unwrap();
4824 assert_eq!(score.value(), 0);
4825 let score = ReportedContentScore::new(-50).unwrap();
4826 assert_eq!(score.value(), -50);
4827 let score = ReportedContentScore::new(-100).unwrap();
4828 assert_eq!(score.value(), -100);
4829 assert_eq!(ReportedContentScore::new(10), None);
4830 assert_eq!(ReportedContentScore::new(-110), None);
4831
4832 let score = ReportedContentScore::new_saturating(0);
4833 assert_eq!(score.value(), 0);
4834 let score = ReportedContentScore::new_saturating(-50);
4835 assert_eq!(score.value(), -50);
4836 let score = ReportedContentScore::new_saturating(-100);
4837 assert_eq!(score.value(), -100);
4838 let score = ReportedContentScore::new_saturating(10);
4839 assert_eq!(score, ReportedContentScore::MAX);
4840 let score = ReportedContentScore::new_saturating(-110);
4841 assert_eq!(score, ReportedContentScore::MIN);
4842
4843 let score = ReportedContentScore::try_from(0i16).unwrap();
4845 assert_eq!(score.value(), 0);
4846 let score = ReportedContentScore::try_from(-100i16).unwrap();
4847 assert_eq!(score.value(), -100);
4848 ReportedContentScore::try_from(10i16).unwrap_err();
4849 ReportedContentScore::try_from(-110i16).unwrap_err();
4850
4851 let score = ReportedContentScore::try_from(0i32).unwrap();
4853 assert_eq!(score.value(), 0);
4854 let score = ReportedContentScore::try_from(-100i32).unwrap();
4855 assert_eq!(score.value(), -100);
4856 ReportedContentScore::try_from(10i32).unwrap_err();
4857 ReportedContentScore::try_from(-110i32).unwrap_err();
4858
4859 let score = ReportedContentScore::try_from(0i64).unwrap();
4861 assert_eq!(score.value(), 0);
4862 let score = ReportedContentScore::try_from(-100i64).unwrap();
4863 assert_eq!(score.value(), -100);
4864 ReportedContentScore::try_from(10i64).unwrap_err();
4865 ReportedContentScore::try_from(-110i64).unwrap_err();
4866
4867 let score = ReportedContentScore::try_from(int!(0)).unwrap();
4869 assert_eq!(score.value(), 0);
4870 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4871 assert_eq!(score.value(), -100);
4872 ReportedContentScore::try_from(int!(10)).unwrap_err();
4873 ReportedContentScore::try_from(int!(-110)).unwrap_err();
4874 }
4875
4876 #[async_test]
4877 async fn test_composer_draft() {
4878 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4879
4880 let client = logged_in_client(None).await;
4881
4882 let response = SyncResponseBuilder::default()
4883 .add_joined_room(JoinedRoomBuilder::default())
4884 .build_sync_response();
4885 client.base_client().receive_sync_response(response).await.unwrap();
4886 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4887
4888 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4889
4890 let draft = ComposerDraft {
4893 plain_text: "Hello, world!".to_owned(),
4894 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4895 draft_type: ComposerDraftType::NewMessage,
4896 attachments: vec![DraftAttachment {
4897 filename: "cat.txt".to_owned(),
4898 content: matrix_sdk_base::DraftAttachmentContent::File {
4899 data: b"meow".to_vec(),
4900 mimetype: Some("text/plain".to_owned()),
4901 size: Some(5),
4902 },
4903 }],
4904 };
4905
4906 room.save_composer_draft(draft.clone(), None).await.unwrap();
4907
4908 let thread_root = owned_event_id!("$thread_root:b.c");
4909 let thread_draft = ComposerDraft {
4910 plain_text: "Hello, thread!".to_owned(),
4911 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4912 draft_type: ComposerDraftType::NewMessage,
4913 attachments: vec![DraftAttachment {
4914 filename: "dog.txt".to_owned(),
4915 content: matrix_sdk_base::DraftAttachmentContent::File {
4916 data: b"wuv".to_vec(),
4917 mimetype: Some("text/plain".to_owned()),
4918 size: Some(4),
4919 },
4920 }],
4921 };
4922
4923 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4924
4925 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4927
4928 assert_eq!(
4930 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4931 Some(thread_draft.clone())
4932 );
4933
4934 room.clear_composer_draft(None).await.unwrap();
4936 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4937
4938 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4940
4941 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4943 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4944 }
4945
4946 #[async_test]
4947 async fn test_mark_join_requests_as_seen() {
4948 let server = MatrixMockServer::new().await;
4949 let client = server.client_builder().build().await;
4950 let event_id = event_id!("$a:b.c");
4951 let room_id = room_id!("!a:b.c");
4952 let user_id = user_id!("@alice:b.c");
4953
4954 let f = EventFactory::new().room(room_id);
4955 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4956 f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4957 ]);
4958 let room = server.sync_room(&client, joined_room_builder).await;
4959
4960 let seen_ids =
4962 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4963 assert!(seen_ids.is_empty());
4964
4965 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4967 .await
4968 .expect("Couldn't mark join request as seen");
4969
4970 let seen_ids =
4972 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4973 assert_eq!(seen_ids.len(), 1);
4974 assert_eq!(
4975 seen_ids.into_iter().next().expect("No next value"),
4976 (event_id.to_owned(), user_id.to_owned())
4977 )
4978 }
4979
4980 #[async_test]
4981 async fn test_own_room_membership_with_no_own_member_event() {
4982 let server = MatrixMockServer::new().await;
4983 let client = server.client_builder().build().await;
4984 let room_id = room_id!("!a:b.c");
4985
4986 let room = server.sync_joined_room(&client, room_id).await;
4987
4988 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4991 assert!(error.is_some());
4992 }
4993
4994 #[async_test]
4995 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4996 let server = MatrixMockServer::new().await;
4997 let client = server.client_builder().build().await;
4998 let room_id = room_id!("!a:b.c");
4999 let user_id = user_id!("@example:localhost");
5000
5001 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
5002 let joined_room_builder =
5003 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5004 let room = server.sync_room(&client, joined_room_builder).await;
5005
5006 let ret = room
5008 .member_with_sender_info(client.user_id().unwrap())
5009 .await
5010 .expect("Room member info should be available");
5011
5012 assert_eq!(ret.room_member.event().user_id(), user_id);
5014
5015 assert!(ret.sender_info.is_none());
5017 }
5018
5019 #[async_test]
5020 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
5021 let server = MatrixMockServer::new().await;
5022 let client = server.client_builder().build().await;
5023 let room_id = room_id!("!a:b.c");
5024 let user_id = user_id!("@example:localhost");
5025
5026 let f = EventFactory::new().room(room_id).sender(user_id);
5027 let joined_room_builder =
5028 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5029 let room = server.sync_room(&client, joined_room_builder).await;
5030
5031 let ret = room
5033 .member_with_sender_info(client.user_id().unwrap())
5034 .await
5035 .expect("Room member info should be available");
5036
5037 assert_eq!(ret.room_member.event().user_id(), user_id);
5039
5040 assert!(ret.sender_info.is_some());
5042 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5043 }
5044
5045 #[async_test]
5046 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5047 let server = MatrixMockServer::new().await;
5048 let client = server.client_builder().build().await;
5049 let room_id = room_id!("!a:b.c");
5050 let user_id = user_id!("@example:localhost");
5051 let sender_id = user_id!("@alice:b.c");
5052
5053 let f = EventFactory::new().room(room_id).sender(sender_id);
5054 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5055 f.member(user_id).into(),
5056 f.member(sender_id).into(),
5058 ]);
5059 let room = server.sync_room(&client, joined_room_builder).await;
5060
5061 let ret = room
5063 .member_with_sender_info(client.user_id().unwrap())
5064 .await
5065 .expect("Room member info should be available");
5066
5067 assert_eq!(ret.room_member.event().user_id(), user_id);
5069
5070 assert!(ret.sender_info.is_some());
5072 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5073 }
5074
5075 #[async_test]
5076 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5077 let server = MatrixMockServer::new().await;
5078 let client = server.client_builder().build().await;
5079 let room_id = room_id!("!a:b.c");
5080 let user_id = user_id!("@example:localhost");
5081 let sender_id = user_id!("@alice:b.c");
5082
5083 let f = EventFactory::new().room(room_id).sender(sender_id);
5084 let joined_room_builder =
5085 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5086 let room = server.sync_room(&client, joined_room_builder).await;
5087
5088 server
5090 .mock_get_members()
5091 .ok(vec![f.member(sender_id).into_raw()])
5092 .mock_once()
5093 .mount()
5094 .await;
5095
5096 let ret = room
5098 .member_with_sender_info(client.user_id().unwrap())
5099 .await
5100 .expect("Room member info should be available");
5101
5102 assert_eq!(ret.room_member.event().user_id(), user_id);
5104
5105 assert!(ret.sender_info.is_some());
5107 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5108 }
5109
5110 #[async_test]
5111 async fn test_list_threads() {
5112 let server = MatrixMockServer::new().await;
5113 let client = server.client_builder().build().await;
5114
5115 let room_id = room_id!("!a:b.c");
5116 let sender_id = user_id!("@alice:b.c");
5117 let f = EventFactory::new().room(room_id).sender(sender_id);
5118
5119 let eid1 = event_id!("$1");
5120 let eid2 = event_id!("$2");
5121 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5122 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5123
5124 server
5125 .mock_room_threads()
5126 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5127 .mock_once()
5128 .mount()
5129 .await;
5130 server
5131 .mock_room_threads()
5132 .match_from("prev_batch")
5133 .ok(batch2, None)
5134 .mock_once()
5135 .mount()
5136 .await;
5137
5138 let room = server.sync_joined_room(&client, room_id).await;
5139 let result =
5140 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5141 assert_eq!(result.chunk.len(), 1);
5142 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5143 assert!(result.prev_batch_token.is_some());
5144
5145 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5146 let result = room.list_threads(opts).await.expect("Failed to list threads");
5147 assert_eq!(result.chunk.len(), 1);
5148 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5149 assert!(result.prev_batch_token.is_none());
5150 }
5151
5152 #[async_test]
5153 async fn test_relations() {
5154 let server = MatrixMockServer::new().await;
5155 let client = server.client_builder().build().await;
5156
5157 let room_id = room_id!("!a:b.c");
5158 let sender_id = user_id!("@alice:b.c");
5159 let f = EventFactory::new().room(room_id).sender(sender_id);
5160
5161 let target_event_id = owned_event_id!("$target");
5162 let eid1 = event_id!("$1");
5163 let eid2 = event_id!("$2");
5164 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5165 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5166
5167 server
5168 .mock_room_relations()
5169 .match_target_event(target_event_id.clone())
5170 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5171 .mock_once()
5172 .mount()
5173 .await;
5174
5175 server
5176 .mock_room_relations()
5177 .match_target_event(target_event_id.clone())
5178 .match_from("next_batch")
5179 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5180 .mock_once()
5181 .mount()
5182 .await;
5183
5184 let room = server.sync_joined_room(&client, room_id).await;
5185
5186 let mut opts = RelationsOptions {
5188 include_relations: IncludeRelations::AllRelations,
5189 ..Default::default()
5190 };
5191 let result = room
5192 .relations(target_event_id.clone(), opts.clone())
5193 .await
5194 .expect("Failed to list relations the first time");
5195 assert_eq!(result.chunk.len(), 1);
5196 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5197 assert!(result.prev_batch_token.is_none());
5198 assert!(result.next_batch_token.is_some());
5199 assert!(result.recursion_depth.is_none());
5200
5201 opts.from = result.next_batch_token;
5202 let result = room
5203 .relations(target_event_id, opts)
5204 .await
5205 .expect("Failed to list relations the second time");
5206 assert_eq!(result.chunk.len(), 1);
5207 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5208 assert!(result.prev_batch_token.is_none());
5209 assert!(result.next_batch_token.is_none());
5210 assert!(result.recursion_depth.is_none());
5211 }
5212
5213 #[async_test]
5214 async fn test_relations_with_reltype() {
5215 let server = MatrixMockServer::new().await;
5216 let client = server.client_builder().build().await;
5217
5218 let room_id = room_id!("!a:b.c");
5219 let sender_id = user_id!("@alice:b.c");
5220 let f = EventFactory::new().room(room_id).sender(sender_id);
5221
5222 let target_event_id = owned_event_id!("$target");
5223 let eid1 = event_id!("$1");
5224 let eid2 = event_id!("$2");
5225 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5226 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5227
5228 server
5229 .mock_room_relations()
5230 .match_target_event(target_event_id.clone())
5231 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5232 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5233 .mock_once()
5234 .mount()
5235 .await;
5236
5237 server
5238 .mock_room_relations()
5239 .match_target_event(target_event_id.clone())
5240 .match_from("next_batch")
5241 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5242 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5243 .mock_once()
5244 .mount()
5245 .await;
5246
5247 let room = server.sync_joined_room(&client, room_id).await;
5248
5249 let mut opts = RelationsOptions {
5251 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5252 ..Default::default()
5253 };
5254 let result = room
5255 .relations(target_event_id.clone(), opts.clone())
5256 .await
5257 .expect("Failed to list relations the first time");
5258 assert_eq!(result.chunk.len(), 1);
5259 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5260 assert!(result.prev_batch_token.is_none());
5261 assert!(result.next_batch_token.is_some());
5262 assert!(result.recursion_depth.is_none());
5263
5264 opts.from = result.next_batch_token;
5265 let result = room
5266 .relations(target_event_id, opts)
5267 .await
5268 .expect("Failed to list relations the second time");
5269 assert_eq!(result.chunk.len(), 1);
5270 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5271 assert!(result.prev_batch_token.is_none());
5272 assert!(result.next_batch_token.is_none());
5273 assert!(result.recursion_depth.is_none());
5274 }
5275
5276 #[async_test]
5277 async fn test_power_levels_computation() {
5278 let server = MatrixMockServer::new().await;
5279 let client = server.client_builder().build().await;
5280
5281 let room_id = room_id!("!a:b.c");
5282 let sender_id = client.user_id().expect("No session id");
5283 let f = EventFactory::new().room(room_id).sender(sender_id);
5284 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5285
5286 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5288 let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5289 let room_member_event = f.member(sender_id).into();
5290
5291 let room = server
5293 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5294 .await;
5295 let ctx = room
5296 .push_condition_room_ctx()
5297 .await
5298 .expect("Failed to get push condition context")
5299 .expect("Could not get push condition context");
5300
5301 assert!(ctx.power_levels.is_none());
5303
5304 let room = server
5306 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5307 .await;
5308 let ctx = room
5309 .push_condition_room_ctx()
5310 .await
5311 .expect("Failed to get push condition context")
5312 .expect("Could not get push condition context");
5313
5314 assert!(ctx.power_levels.is_none());
5316
5317 let room = server
5319 .sync_room(
5320 &client,
5321 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5322 )
5323 .await;
5324 let ctx = room
5325 .push_condition_room_ctx()
5326 .await
5327 .expect("Failed to get push condition context")
5328 .expect("Could not get push condition context");
5329
5330 assert!(ctx.power_levels.is_some());
5332 }
5333}