1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39 IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44 StateChanges, StateStoreDataKey, StateStoreDataValue,
45 deserialized_responses::{
46 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47 },
48 media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49 store::{StateStoreExt, ThreadSubscriptionStatus},
50};
51#[cfg(feature = "e2e-encryption")]
52use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
53#[cfg(feature = "e2e-encryption")]
54use matrix_sdk_common::BoxFuture;
55use matrix_sdk_common::{
56 deserialized_responses::TimelineEvent,
57 executor::{JoinHandle, spawn},
58 timeout::timeout,
59};
60#[cfg(feature = "experimental-search")]
61use matrix_sdk_search::error::IndexError;
62#[cfg(feature = "experimental-search")]
63#[cfg(doc)]
64use matrix_sdk_search::index::RoomIndex;
65use mime::Mime;
66use reply::Reply;
67#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
68use ruma::events::AnySyncMessageLikeEvent;
69#[cfg(feature = "experimental-encrypted-state-events")]
70use ruma::events::AnySyncStateEvent;
71#[cfg(feature = "unstable-msc4274")]
72use ruma::events::room::message::GalleryItemType;
73#[cfg(feature = "e2e-encryption")]
74use ruma::events::{
75 AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
76};
77use ruma::{
78 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
79 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
80 api::client::{
81 config::{set_global_account_data, set_room_account_data},
82 context,
83 error::ErrorKind,
84 filter::LazyLoadOptions,
85 membership::{
86 Invite3pid, ban_user, forget_room, get_member_events,
87 invite_user::{self, v3::InvitationRecipient},
88 kick_user, leave_room, unban_user,
89 },
90 message::send_message_event,
91 read_marker::set_read_marker,
92 receipt::create_receipt,
93 redact::redact_event,
94 room::{get_room_event, report_content, report_room},
95 state::{get_state_event_for_key, send_state_event},
96 tag::{create_tag, delete_tag},
97 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
98 typing::create_typing_event::{self, v3::Typing},
99 },
100 assign,
101 events::{
102 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
103 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
104 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
105 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
106 StaticStateEventContent, SyncStateEvent,
107 beacon::BeaconEventContent,
108 beacon_info::BeaconInfoEventContent,
109 direct::DirectEventContent,
110 marked_unread::MarkedUnreadEventContent,
111 receipt::{Receipt, ReceiptThread, ReceiptType},
112 room::{
113 ImageInfo, MediaSource, ThumbnailInfo,
114 avatar::{self, RoomAvatarEventContent},
115 encryption::RoomEncryptionEventContent,
116 history_visibility::HistoryVisibility,
117 member::{MembershipChange, SyncRoomMemberEvent},
118 message::{
119 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
120 ImageMessageEventContent, MessageType, RoomMessageEventContent,
121 TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
122 UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
123 },
124 name::RoomNameEventContent,
125 pinned_events::RoomPinnedEventsEventContent,
126 power_levels::{
127 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
128 },
129 server_acl::RoomServerAclEventContent,
130 topic::RoomTopicEventContent,
131 },
132 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
133 tag::{TagInfo, TagName},
134 typing::SyncTypingEvent,
135 },
136 int,
137 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
138 serde::Raw,
139 time::Instant,
140};
141#[cfg(feature = "experimental-encrypted-state-events")]
142use ruma::{
143 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
144 serde::JsonCastable,
145};
146use serde::de::DeserializeOwned;
147use thiserror::Error;
148use tokio::{join, sync::broadcast};
149use tracing::{debug, error, info, instrument, trace, warn};
150
151use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
152pub use self::{
153 member::{RoomMember, RoomMemberRole},
154 messages::{
155 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
156 Relations, RelationsOptions, ThreadRoots,
157 },
158};
159#[cfg(feature = "e2e-encryption")]
160use crate::encryption::backups::BackupState;
161#[cfg(doc)]
162use crate::event_cache::EventCache;
163#[cfg(feature = "experimental-encrypted-state-events")]
164use crate::room::futures::{SendRawStateEvent, SendStateEvent};
165use crate::{
166 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
167 attachment::{AttachmentConfig, AttachmentInfo},
168 client::WeakClient,
169 config::RequestConfig,
170 error::{BeaconError, WrongRoomState},
171 event_cache::{self, EventCacheDropHandles, RoomEventCache},
172 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
173 live_location_share::ObservableLiveLocation,
174 media::{MediaFormat, MediaRequestParameters},
175 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
176 room::{
177 knock_requests::{KnockRequest, KnockRequestMemberInfo},
178 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
179 privacy_settings::RoomPrivacySettings,
180 },
181 sync::RoomUpdate,
182 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
183};
184
185pub mod edit;
186pub mod futures;
187pub mod identity_status_changes;
188pub mod knock_requests;
190mod member;
191mod messages;
192pub mod power_levels;
193pub mod reply;
194
195pub mod calls;
196
197pub mod privacy_settings;
199
200#[cfg(feature = "e2e-encryption")]
201pub(crate) mod shared_room_history;
202
203#[derive(Debug, Clone)]
206pub struct Room {
207 inner: BaseRoom,
208 pub(crate) client: Client,
209}
210
211impl Deref for Room {
212 type Target = BaseRoom;
213
214 fn deref(&self) -> &Self::Target {
215 &self.inner
216 }
217}
218
219const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
220const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224pub struct ThreadSubscription {
225 pub automatic: bool,
228}
229
230#[derive(Debug)]
232pub struct PushContext {
233 push_condition_room_ctx: PushConditionRoomCtx,
235
236 push_rules: Ruleset,
239}
240
241impl PushContext {
242 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
244 Self { push_condition_room_ctx, push_rules }
245 }
246
247 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
249 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
250 }
251
252 #[doc(hidden)]
255 #[instrument(skip_all)]
256 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
257 let rules = self
258 .push_rules
259 .iter()
260 .filter_map(|r| {
261 if !r.enabled() {
262 return None;
263 }
264
265 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
266
267 let conditions = match r {
268 AnyPushRuleRef::Override(r) => {
269 format!("{:?}", r.conditions)
270 }
271 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
272 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
273 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
274 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
275 _ => "<unknown push rule kind>".to_owned(),
276 };
277
278 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
279 })
280 .collect::<Vec<_>>()
281 .join("\n");
282 trace!("rules:\n\n{rules}\n\n");
283
284 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
285
286 if let Some(found) = found {
287 trace!("rule {} matched", found.rule_id());
288 found.actions().to_owned()
289 } else {
290 trace!("no match");
291 Vec::new()
292 }
293 }
294}
295
296macro_rules! make_media_type {
297 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
298 let (body, formatted, filename) = match $caption {
302 Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
303 None => ($filename, None, None),
304 };
305
306 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
307
308 match $content_type.type_() {
309 mime::IMAGE => {
310 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
311 mimetype: Some($content_type.as_ref().to_owned()),
312 thumbnail_source,
313 thumbnail_info
314 });
315 let content = assign!(ImageMessageEventContent::new(body, $source), {
316 info: Some(Box::new(info)),
317 formatted,
318 filename
319 });
320 <$t>::Image(content)
321 }
322
323 mime::AUDIO => {
324 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
325 formatted,
326 filename
327 });
328
329 if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
330 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
331 let waveform = waveform_vec
332 .iter()
333 .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
334 .map(Into::into)
335 .collect();
336 content.audio =
337 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
338 }
339
340 if matches!($info, Some(AttachmentInfo::Voice(_))) {
341 content.voice = Some(UnstableVoiceContentBlock::new());
342 }
343
344 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
345 audio_info.mimetype = Some($content_type.as_ref().to_owned());
346 let content = content.info(Box::new(audio_info));
347
348 <$t>::Audio(content)
349 }
350
351 mime::VIDEO => {
352 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
353 mimetype: Some($content_type.as_ref().to_owned()),
354 thumbnail_source,
355 thumbnail_info
356 });
357 let content = assign!(VideoMessageEventContent::new(body, $source), {
358 info: Some(Box::new(info)),
359 formatted,
360 filename
361 });
362 <$t>::Video(content)
363 }
364
365 _ => {
366 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
367 mimetype: Some($content_type.as_ref().to_owned()),
368 thumbnail_source,
369 thumbnail_info
370 });
371 let content = assign!(FileMessageEventContent::new(body, $source), {
372 info: Some(Box::new(info)),
373 formatted,
374 filename,
375 });
376 <$t>::File(content)
377 }
378 }
379 }};
380}
381
382impl Room {
383 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
390 Self { inner: room, client }
391 }
392
393 #[doc(alias = "reject_invitation")]
399 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
400 async fn leave_impl(&self) -> (Result<()>, &Room) {
401 let state = self.state();
402 if state == RoomState::Left {
403 return (
404 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
405 "Joined or Invited",
406 state,
407 )))),
408 self,
409 );
410 }
411
412 let should_forget = matches!(self.state(), RoomState::Invited);
415
416 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
417 let response = self.client.send(request).await;
418
419 if let Err(error) = response {
422 #[allow(clippy::collapsible_match)]
423 let ignore_error = if let Some(error) = error.client_api_error_kind() {
424 match error {
425 ErrorKind::Forbidden { .. } => true,
428 _ => false,
429 }
430 } else {
431 false
432 };
433
434 error!(?error, ignore_error, should_forget, "Failed to leave the room");
435
436 if !ignore_error {
437 return (Err(error.into()), self);
438 }
439 }
440
441 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
442 return (Err(e.into()), self);
443 }
444
445 if should_forget {
446 trace!("Trying to forget the room");
447
448 if let Err(error) = self.forget().await {
449 error!(?error, "Failed to forget the room");
450 }
451 }
452
453 (Ok(()), self)
454 }
455
456 pub async fn leave(&self) -> Result<()> {
464 let mut rooms: Vec<Room> = vec![self.clone()];
465 let mut current_room = self;
466
467 while let Some(predecessor) = current_room.predecessor_room() {
468 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
469
470 if let Some(predecessor_room) = maybe_predecessor_room {
471 rooms.push(predecessor_room.clone());
472 current_room = rooms.last().expect("Room just pushed so can't be empty");
473 } else {
474 warn!("Cannot find predecessor room");
475 break;
476 }
477 }
478
479 let batch_size = 5;
480
481 let rooms_futures: Vec<_> = rooms
482 .iter()
483 .filter_map(|room| match room.state() {
484 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
485 Some(room.leave_impl())
486 }
487 RoomState::Banned | RoomState::Left => None,
488 })
489 .collect();
490
491 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
492
493 let mut maybe_this_room_failed_with: Option<Error> = None;
494
495 while let Some(result) = futures_stream.next().await {
496 if let (Err(e), room) = result {
497 if room.room_id() == self.room_id() {
498 maybe_this_room_failed_with = Some(e);
499 } else {
500 warn!("Failure while attempting to leave predecessor room: {e:?}");
501 }
502 }
503 }
504
505 maybe_this_room_failed_with.map_or(Ok(()), Err)
506 }
507
508 #[doc(alias = "accept_invitation")]
512 pub async fn join(&self) -> Result<()> {
513 let prev_room_state = self.inner.state();
514
515 if prev_room_state == RoomState::Joined {
516 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
517 "Invited or Left",
518 prev_room_state,
519 ))));
520 }
521
522 self.client.join_room_by_id(self.room_id()).await?;
523
524 Ok(())
525 }
526
527 pub fn client(&self) -> Client {
531 self.client.clone()
532 }
533
534 pub fn is_synced(&self) -> bool {
537 self.inner.is_state_fully_synced()
538 }
539
540 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
570 let Some(url) = self.avatar_url() else { return Ok(None) };
571 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
572 Ok(Some(self.client.media().get_media_content(&request, true).await?))
573 }
574
575 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
604 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
605 let room_id = self.inner.room_id();
606 let request = options.into_request(room_id);
607 let http_response = self.client.send(request).await?;
608
609 let push_ctx = self.push_context().await?;
610 let chunk = join_all(
611 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
612 )
613 .await;
614
615 Ok(Messages {
616 start: http_response.start,
617 end: http_response.end,
618 chunk,
619 state: http_response.state,
620 })
621 }
622
623 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
633 where
634 Ev: SyncEvent + DeserializeOwned + Send + 'static,
635 H: EventHandler<Ev, Ctx>,
636 {
637 self.client.add_room_event_handler(self.room_id(), handler)
638 }
639
640 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
645 self.client.subscribe_to_room_updates(self.room_id())
646 }
647
648 pub fn subscribe_to_typing_notifications(
654 &self,
655 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
656 let (sender, receiver) = broadcast::channel(16);
657 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
658 let own_user_id = self.own_user_id().to_owned();
659 move |event: SyncTypingEvent| async move {
660 let typing_user_ids = event
662 .content
663 .user_ids
664 .into_iter()
665 .filter(|user_id| *user_id != own_user_id)
666 .collect();
667 let _ = sender.send(typing_user_ids);
669 }
670 });
671 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
672 (drop_guard, receiver)
673 }
674
675 #[cfg(feature = "e2e-encryption")]
698 pub async fn subscribe_to_identity_status_changes(
699 &self,
700 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
701 IdentityStatusChanges::create_stream(self.clone()).await
702 }
703
704 #[cfg(not(feature = "experimental-encrypted-state-events"))]
709 #[allow(clippy::unused_async)] async fn try_decrypt_event(
711 &self,
712 event: Raw<AnyTimelineEvent>,
713 push_ctx: Option<&PushContext>,
714 ) -> TimelineEvent {
715 #[cfg(feature = "e2e-encryption")]
716 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
717 SyncMessageLikeEvent::Original(_),
718 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
719 && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
720 {
721 return event;
722 }
723
724 let mut event = TimelineEvent::from_plaintext(event.cast());
725 if let Some(push_ctx) = push_ctx {
726 event.set_push_actions(push_ctx.for_event(event.raw()).await);
727 }
728
729 event
730 }
731
732 #[cfg(feature = "experimental-encrypted-state-events")]
737 #[allow(clippy::unused_async)] async fn try_decrypt_event(
739 &self,
740 event: Raw<AnyTimelineEvent>,
741 push_ctx: Option<&PushContext>,
742 ) -> TimelineEvent {
743 match event.deserialize_as::<AnySyncTimelineEvent>() {
745 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
746 SyncMessageLikeEvent::Original(_),
747 ))) => {
748 if let Ok(event) = self
749 .decrypt_event(
750 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
751 push_ctx,
752 )
753 .await
754 {
755 return event;
756 }
757 }
758 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
759 SyncStateEvent::Original(_),
760 ))) => {
761 if let Ok(event) = self
762 .decrypt_event(
763 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
764 push_ctx,
765 )
766 .await
767 {
768 return event;
769 }
770 }
771 _ => {}
772 }
773
774 let mut event = TimelineEvent::from_plaintext(event.cast());
775 if let Some(push_ctx) = push_ctx {
776 event.set_push_actions(push_ctx.for_event(event.raw()).await);
777 }
778
779 event
780 }
781
782 pub async fn event(
787 &self,
788 event_id: &EventId,
789 request_config: Option<RequestConfig>,
790 ) -> Result<TimelineEvent> {
791 let request =
792 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
793
794 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
795 let push_ctx = self.push_context().await?;
796 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
797
798 if let Ok((cache, _handles)) = self.event_cache().await {
800 cache.save_events([event.clone()]).await;
801 }
802
803 Ok(event)
804 }
805
806 pub async fn load_or_fetch_event(
813 &self,
814 event_id: &EventId,
815 request_config: Option<RequestConfig>,
816 ) -> Result<TimelineEvent> {
817 match self.event_cache().await {
818 Ok((event_cache, _drop_handles)) => {
819 if let Some(event) = event_cache.find_event(event_id).await? {
820 return Ok(event);
821 }
822 }
824 Err(err) => {
825 debug!("error when getting the event cache: {err}");
826 }
827 }
828 self.event(event_id, request_config).await
829 }
830
831 pub async fn event_with_context(
834 &self,
835 event_id: &EventId,
836 lazy_load_members: bool,
837 context_size: UInt,
838 request_config: Option<RequestConfig>,
839 ) -> Result<EventWithContextResponse> {
840 let mut request =
841 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
842
843 request.limit = context_size;
844
845 if lazy_load_members {
846 request.filter.lazy_load_options =
847 LazyLoadOptions::Enabled { include_redundant_members: false };
848 }
849
850 let response = self.client.send(request).with_request_config(request_config).await?;
851
852 let push_ctx = self.push_context().await?;
853 let push_ctx = push_ctx.as_ref();
854 let target_event = if let Some(event) = response.event {
855 Some(self.try_decrypt_event(event, push_ctx).await)
856 } else {
857 None
858 };
859
860 let (events_before, events_after) = join!(
864 join_all(
865 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
866 ),
867 join_all(
868 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
869 ),
870 );
871
872 if let Ok((cache, _handles)) = self.event_cache().await {
874 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
875 if let Some(event) = &target_event {
876 events_to_save.push(event.clone());
877 }
878
879 for event in &events_before {
880 events_to_save.push(event.clone());
881 }
882
883 for event in &events_after {
884 events_to_save.push(event.clone());
885 }
886
887 cache.save_events(events_to_save).await;
888 }
889
890 Ok(EventWithContextResponse {
891 event: target_event,
892 events_before,
893 events_after,
894 state: response.state,
895 prev_batch_token: response.start,
896 next_batch_token: response.end,
897 })
898 }
899
900 pub(crate) async fn request_members(&self) -> Result<()> {
901 self.client
902 .locks()
903 .members_request_deduplicated_handler
904 .run(self.room_id().to_owned(), async move {
905 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
906 let response = self
907 .client
908 .send(request.clone())
909 .with_request_config(
910 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
913 )
914 .await?;
915
916 Box::pin(self.client.base_client().receive_all_members(
918 self.room_id(),
919 &request,
920 &response,
921 ))
922 .await?;
923
924 Ok(())
925 })
926 .await
927 }
928
929 pub async fn request_encryption_state(&self) -> Result<()> {
934 if !self.inner.encryption_state().is_unknown() {
935 return Ok(());
936 }
937
938 self.client
939 .locks()
940 .encryption_state_deduplicated_handler
941 .run(self.room_id().to_owned(), async move {
942 let request = get_state_event_for_key::v3::Request::new(
944 self.room_id().to_owned(),
945 StateEventType::RoomEncryption,
946 "".to_owned(),
947 );
948 let response = match self.client.send(request).await {
949 Ok(response) => Some(
950 response
951 .into_content()
952 .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
953 ),
954 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
955 Err(err) => return Err(err.into()),
956 };
957
958 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
959
960 let mut room_info = self.clone_info();
963 room_info.mark_encryption_state_synced();
964 room_info.set_encryption_event(response.clone());
965 let mut changes = StateChanges::default();
966 changes.add_room(room_info.clone());
967
968 self.client.state_store().save_changes(&changes).await?;
969 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
970
971 Ok(())
972 })
973 .await
974 }
975
976 pub fn encryption_state(&self) -> EncryptionState {
981 self.inner.encryption_state()
982 }
983
984 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
990 self.request_encryption_state().await?;
991
992 Ok(self.encryption_state())
993 }
994
995 #[cfg(feature = "e2e-encryption")]
997 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
998 let encryption = self.client.encryption();
999
1000 let this_device_is_verified = match encryption.get_own_device().await {
1001 Ok(Some(device)) => device.is_verified_with_cross_signing(),
1002
1003 _ => true,
1005 };
1006
1007 let backup_exists_on_server =
1008 encryption.backups().exists_on_server().await.unwrap_or(false);
1009
1010 CryptoContextInfo {
1011 device_creation_ts: encryption.device_creation_timestamp().await,
1012 this_device_is_verified,
1013 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1014 backup_exists_on_server,
1015 }
1016 }
1017
1018 fn are_events_visible(&self) -> bool {
1019 if let RoomState::Invited = self.inner.state() {
1020 return matches!(
1021 self.inner.history_visibility_or_default(),
1022 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1023 );
1024 }
1025
1026 true
1027 }
1028
1029 pub async fn sync_members(&self) -> Result<()> {
1035 if !self.are_events_visible() {
1036 return Ok(());
1037 }
1038
1039 if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1040 }
1041
1042 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1056 self.sync_members().await?;
1057 self.get_member_no_sync(user_id).await
1058 }
1059
1060 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1074 Ok(self
1075 .inner
1076 .get_member(user_id)
1077 .await?
1078 .map(|member| RoomMember::new(self.client.clone(), member)))
1079 }
1080
1081 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1090 self.sync_members().await?;
1091 self.members_no_sync(memberships).await
1092 }
1093
1094 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1103 Ok(self
1104 .inner
1105 .members(memberships)
1106 .await?
1107 .into_iter()
1108 .map(|member| RoomMember::new(self.client.clone(), member))
1109 .collect())
1110 }
1111
1112 pub async fn get_state_events(
1114 &self,
1115 event_type: StateEventType,
1116 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1117 self.client
1118 .state_store()
1119 .get_state_events(self.room_id(), event_type)
1120 .await
1121 .map_err(Into::into)
1122 }
1123
1124 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1141 where
1142 C: StaticEventContent<IsPrefix = ruma::events::False>
1143 + StaticStateEventContent
1144 + RedactContent,
1145 C::Redacted: RedactedStateEventContent,
1146 {
1147 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1148 }
1149
1150 pub async fn get_state_events_for_keys(
1153 &self,
1154 event_type: StateEventType,
1155 state_keys: &[&str],
1156 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1157 self.client
1158 .state_store()
1159 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1160 .await
1161 .map_err(Into::into)
1162 }
1163
1164 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1184 &self,
1185 state_keys: I,
1186 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1187 where
1188 C: StaticEventContent<IsPrefix = ruma::events::False>
1189 + StaticStateEventContent
1190 + RedactContent,
1191 C::StateKey: Borrow<K>,
1192 C::Redacted: RedactedStateEventContent,
1193 K: AsRef<str> + Sized + Sync + 'a,
1194 I: IntoIterator<Item = &'a K> + Send,
1195 I::IntoIter: Send,
1196 {
1197 Ok(self
1198 .client
1199 .state_store()
1200 .get_state_events_for_keys_static(self.room_id(), state_keys)
1201 .await?)
1202 }
1203
1204 pub async fn get_state_event(
1206 &self,
1207 event_type: StateEventType,
1208 state_key: &str,
1209 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1210 self.client
1211 .state_store()
1212 .get_state_event(self.room_id(), event_type, state_key)
1213 .await
1214 .map_err(Into::into)
1215 }
1216
1217 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1236 where
1237 C: StaticEventContent<IsPrefix = ruma::events::False>
1238 + StaticStateEventContent<StateKey = EmptyStateKey>
1239 + RedactContent,
1240 C::Redacted: RedactedStateEventContent,
1241 {
1242 self.get_state_event_static_for_key(&EmptyStateKey).await
1243 }
1244
1245 pub async fn get_state_event_static_for_key<C, K>(
1265 &self,
1266 state_key: &K,
1267 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1268 where
1269 C: StaticEventContent<IsPrefix = ruma::events::False>
1270 + StaticStateEventContent
1271 + RedactContent,
1272 C::StateKey: Borrow<K>,
1273 C::Redacted: RedactedStateEventContent,
1274 K: AsRef<str> + ?Sized + Sync,
1275 {
1276 Ok(self
1277 .client
1278 .state_store()
1279 .get_state_event_static_for_key(self.room_id(), state_key)
1280 .await?)
1281 }
1282
1283 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1287 Ok(self
1292 .get_state_events_static::<SpaceParentEventContent>()
1293 .await?
1294 .into_iter()
1295 .filter_map(|parent_event| match parent_event.deserialize() {
1297 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1298 Some((e.state_key.to_owned(), e.sender))
1299 }
1300 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1301 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1302 Err(e) => {
1303 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1304 None
1305 }
1306 })
1307 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1309 let Some(parent_room) = self.client.get_room(&state_key) else {
1310 return Ok(ParentSpace::Unverifiable(state_key));
1313 };
1314 if let Some(child_event) = parent_room
1317 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1318 .await?
1319 {
1320 match child_event.deserialize() {
1321 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1322 return Ok(ParentSpace::Reciprocal(parent_room));
1325 }
1326 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1327 Ok(SyncOrStrippedState::Stripped(_)) => {}
1328 Err(e) => {
1329 info!(
1330 room_id = ?self.room_id(), parent_room_id = ?state_key,
1331 "Could not deserialize m.space.child: {e}"
1332 );
1333 }
1334 }
1335 }
1340
1341 let Some(member) = parent_room.get_member(&sender).await? else {
1344 return Ok(ParentSpace::Illegitimate(parent_room));
1346 };
1347
1348 if member.can_send_state(StateEventType::SpaceChild) {
1349 Ok(ParentSpace::WithPowerlevel(parent_room))
1351 } else {
1352 Ok(ParentSpace::Illegitimate(parent_room))
1353 }
1354 })
1355 .collect::<FuturesUnordered<_>>())
1356 }
1357
1358 pub async fn account_data(
1360 &self,
1361 data_type: RoomAccountDataEventType,
1362 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1363 self.client
1364 .state_store()
1365 .get_room_account_data_event(self.room_id(), data_type)
1366 .await
1367 .map_err(Into::into)
1368 }
1369
1370 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1389 where
1390 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1391 {
1392 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1393 }
1394
1395 #[cfg(feature = "e2e-encryption")]
1400 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1401 let user_ids = self
1402 .client
1403 .state_store()
1404 .get_user_ids(self.room_id(), RoomMemberships::empty())
1405 .await?;
1406
1407 for user_id in user_ids {
1408 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1409 let any_unverified = devices.devices().any(|d| !d.is_verified());
1410
1411 if any_unverified {
1412 return Ok(false);
1413 }
1414 }
1415
1416 Ok(true)
1417 }
1418
1419 pub async fn set_account_data<T>(
1434 &self,
1435 content: T,
1436 ) -> Result<set_room_account_data::v3::Response>
1437 where
1438 T: RoomAccountDataEventContent,
1439 {
1440 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1441
1442 let request = set_room_account_data::v3::Request::new(
1443 own_user.to_owned(),
1444 self.room_id().to_owned(),
1445 &content,
1446 )?;
1447
1448 Ok(self.client.send(request).await?)
1449 }
1450
1451 pub async fn set_account_data_raw(
1476 &self,
1477 event_type: RoomAccountDataEventType,
1478 content: Raw<AnyRoomAccountDataEventContent>,
1479 ) -> Result<set_room_account_data::v3::Response> {
1480 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1481
1482 let request = set_room_account_data::v3::Request::new_raw(
1483 own_user.to_owned(),
1484 self.room_id().to_owned(),
1485 event_type,
1486 content,
1487 );
1488
1489 Ok(self.client.send(request).await?)
1490 }
1491
1492 pub async fn set_tag(
1523 &self,
1524 tag: TagName,
1525 tag_info: TagInfo,
1526 ) -> Result<create_tag::v3::Response> {
1527 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1528 let request = create_tag::v3::Request::new(
1529 user_id.to_owned(),
1530 self.inner.room_id().to_owned(),
1531 tag.to_string(),
1532 tag_info,
1533 );
1534 Ok(self.client.send(request).await?)
1535 }
1536
1537 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1544 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1545 let request = delete_tag::v3::Request::new(
1546 user_id.to_owned(),
1547 self.inner.room_id().to_owned(),
1548 tag.to_string(),
1549 );
1550 Ok(self.client.send(request).await?)
1551 }
1552
1553 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1563 if is_favourite {
1564 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1565
1566 self.set_tag(TagName::Favorite, tag_info).await?;
1567
1568 if self.is_low_priority() {
1569 self.remove_tag(TagName::LowPriority).await?;
1570 }
1571 } else {
1572 self.remove_tag(TagName::Favorite).await?;
1573 }
1574 Ok(())
1575 }
1576
1577 pub async fn set_is_low_priority(
1587 &self,
1588 is_low_priority: bool,
1589 tag_order: Option<f64>,
1590 ) -> Result<()> {
1591 if is_low_priority {
1592 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1593
1594 self.set_tag(TagName::LowPriority, tag_info).await?;
1595
1596 if self.is_favourite() {
1597 self.remove_tag(TagName::Favorite).await?;
1598 }
1599 } else {
1600 self.remove_tag(TagName::LowPriority).await?;
1601 }
1602 Ok(())
1603 }
1604
1605 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1614 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1615
1616 let mut content = self
1617 .client
1618 .account()
1619 .account_data::<DirectEventContent>()
1620 .await?
1621 .map(|c| c.deserialize())
1622 .transpose()?
1623 .unwrap_or_default();
1624
1625 let this_room_id = self.inner.room_id();
1626
1627 if is_direct {
1628 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1629 room_members.retain(|member| member.user_id() != self.own_user_id());
1630
1631 for member in room_members {
1632 let entry = content.entry(member.user_id().into()).or_default();
1633 if !entry.iter().any(|room_id| room_id == this_room_id) {
1634 entry.push(this_room_id.to_owned());
1635 }
1636 }
1637 } else {
1638 for (_, list) in content.iter_mut() {
1639 list.retain(|room_id| *room_id != this_room_id);
1640 }
1641
1642 content.retain(|_, list| !list.is_empty());
1644 }
1645
1646 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1647
1648 self.client.send(request).await?;
1649 Ok(())
1650 }
1651
1652 #[cfg(feature = "e2e-encryption")]
1660 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1661 pub async fn decrypt_event(
1662 &self,
1663 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1664 push_ctx: Option<&PushContext>,
1665 ) -> Result<TimelineEvent> {
1666 let machine = self.client.olm_machine().await;
1667 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1668
1669 match machine
1670 .try_decrypt_room_event(
1671 event.cast_ref(),
1672 self.inner.room_id(),
1673 self.client.decryption_settings(),
1674 )
1675 .await?
1676 {
1677 RoomEventDecryptionResult::Decrypted(decrypted) => {
1678 let push_actions = if let Some(push_ctx) = push_ctx {
1679 Some(push_ctx.for_event(&decrypted.event).await)
1680 } else {
1681 None
1682 };
1683 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1684 }
1685 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1686 self.client
1687 .encryption()
1688 .backups()
1689 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1690 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1691 }
1692 }
1693 }
1694
1695 #[cfg(feature = "experimental-encrypted-state-events")]
1703 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1704 &self,
1705 event: &Raw<T>,
1706 push_ctx: Option<&PushContext>,
1707 ) -> Result<TimelineEvent> {
1708 let machine = self.client.olm_machine().await;
1709 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1710
1711 match machine
1712 .try_decrypt_room_event(
1713 event.cast_ref(),
1714 self.inner.room_id(),
1715 self.client.decryption_settings(),
1716 )
1717 .await?
1718 {
1719 RoomEventDecryptionResult::Decrypted(decrypted) => {
1720 let push_actions = if let Some(push_ctx) = push_ctx {
1721 Some(push_ctx.for_event(&decrypted.event).await)
1722 } else {
1723 None
1724 };
1725 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1726 }
1727 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1728 self.client
1729 .encryption()
1730 .backups()
1731 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1732 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1735 }
1736 }
1737 }
1738
1739 #[cfg(feature = "e2e-encryption")]
1752 pub async fn get_encryption_info(
1753 &self,
1754 session_id: &str,
1755 sender: &UserId,
1756 ) -> Option<Arc<EncryptionInfo>> {
1757 let machine = self.client.olm_machine().await;
1758 let machine = machine.as_ref()?;
1759 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1760 }
1761
1762 #[cfg(feature = "e2e-encryption")]
1775 pub async fn discard_room_key(&self) -> Result<()> {
1776 let machine = self.client.olm_machine().await;
1777 if let Some(machine) = machine.as_ref() {
1778 machine.discard_room_key(self.inner.room_id()).await?;
1779 Ok(())
1780 } else {
1781 Err(Error::NoOlmMachine)
1782 }
1783 }
1784
1785 #[instrument(skip_all)]
1793 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1794 let request = assign!(
1795 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1796 { reason: reason.map(ToOwned::to_owned) }
1797 );
1798 self.client.send(request).await?;
1799 Ok(())
1800 }
1801
1802 #[instrument(skip_all)]
1810 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1811 let request = assign!(
1812 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1813 { reason: reason.map(ToOwned::to_owned) }
1814 );
1815 self.client.send(request).await?;
1816 Ok(())
1817 }
1818
1819 #[instrument(skip_all)]
1828 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1829 let request = assign!(
1830 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1831 { reason: reason.map(ToOwned::to_owned) }
1832 );
1833 self.client.send(request).await?;
1834 Ok(())
1835 }
1836
1837 #[instrument(skip_all)]
1843 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1844 #[cfg(feature = "e2e-encryption")]
1845 if self.client.inner.enable_share_history_on_invite {
1846 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1847 }
1848
1849 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1850 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1851 self.client.send(request).await?;
1852
1853 self.mark_members_missing();
1857
1858 Ok(())
1859 }
1860
1861 #[instrument(skip_all)]
1867 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1868 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1869 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1870 self.client.send(request).await?;
1871
1872 self.mark_members_missing();
1876
1877 Ok(())
1878 }
1879
1880 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1915 self.ensure_room_joined()?;
1916
1917 let send = if let Some(typing_time) =
1920 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1921 {
1922 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1923 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1927 } else {
1928 !typing
1930 }
1931 } else {
1932 typing
1935 };
1936
1937 if send {
1938 self.send_typing_notice(typing).await?;
1939 }
1940
1941 Ok(())
1942 }
1943
1944 #[instrument(name = "typing_notice", skip(self))]
1945 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1946 let typing = if typing {
1947 self.client
1948 .inner
1949 .typing_notice_times
1950 .write()
1951 .unwrap()
1952 .insert(self.room_id().to_owned(), Instant::now());
1953 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1954 } else {
1955 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1956 Typing::No
1957 };
1958
1959 let request = create_typing_event::v3::Request::new(
1960 self.own_user_id().to_owned(),
1961 self.room_id().to_owned(),
1962 typing,
1963 );
1964
1965 self.client.send(request).await?;
1966
1967 Ok(())
1968 }
1969
1970 #[instrument(skip_all)]
1987 pub async fn send_single_receipt(
1988 &self,
1989 receipt_type: create_receipt::v3::ReceiptType,
1990 thread: ReceiptThread,
1991 event_id: OwnedEventId,
1992 ) -> Result<()> {
1993 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1996
1997 self.client
1998 .inner
1999 .locks
2000 .read_receipt_deduplicated_handler
2001 .run((request_key, event_id.clone()), async {
2002 let is_unthreaded = thread == ReceiptThread::Unthreaded;
2004
2005 let mut request = create_receipt::v3::Request::new(
2006 self.room_id().to_owned(),
2007 receipt_type,
2008 event_id,
2009 );
2010 request.thread = thread;
2011
2012 self.client.send(request).await?;
2013
2014 if is_unthreaded {
2015 self.set_unread_flag(false).await?;
2016 }
2017
2018 Ok(())
2019 })
2020 .await
2021 }
2022
2023 #[instrument(skip_all)]
2033 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2034 if receipts.is_empty() {
2035 return Ok(());
2036 }
2037
2038 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2039 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2040 fully_read,
2041 read_receipt: public_read_receipt,
2042 private_read_receipt,
2043 });
2044
2045 self.client.send(request).await?;
2046
2047 self.set_unread_flag(false).await?;
2048
2049 Ok(())
2050 }
2051
2052 #[allow(unused_variables, unused_mut)]
2056 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2057 use ruma::{
2058 EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2059 };
2060 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2061
2062 if !self.latest_encryption_state().await?.is_encrypted() {
2063 let mut content =
2064 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2065 #[cfg(feature = "experimental-encrypted-state-events")]
2066 if encrypted_state_events {
2067 content = content.with_encrypted_state();
2068 }
2069 self.send_state_event(content).await?;
2070
2071 let res = timeout(
2078 async {
2079 loop {
2080 self.client.inner.sync_beat.listen().await;
2082 let _state_store_lock =
2083 self.client.base_client().state_store_lock().lock().await;
2084
2085 if !self.inner.encryption_state().is_unknown() {
2086 break;
2087 }
2088 }
2089 },
2090 SYNC_WAIT_TIME,
2091 )
2092 .await;
2093
2094 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2095
2096 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2098 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2099 debug!("room successfully marked as encrypted");
2100 return Ok(());
2101 }
2102
2103 #[cfg(feature = "experimental-encrypted-state-events")]
2105 if res.is_ok() && {
2106 if encrypted_state_events {
2107 self.inner.encryption_state().is_state_encrypted()
2108 } else {
2109 self.inner.encryption_state().is_encrypted()
2110 }
2111 } {
2112 debug!("room successfully marked as encrypted");
2113 return Ok(());
2114 }
2115
2116 debug!("still not marked as encrypted, marking encryption state as missing");
2121
2122 let mut room_info = self.clone_info();
2123 room_info.mark_encryption_state_missing();
2124 let mut changes = StateChanges::default();
2125 changes.add_room(room_info.clone());
2126
2127 self.client.state_store().save_changes(&changes).await?;
2128 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2129 }
2130
2131 Ok(())
2132 }
2133
2134 #[instrument(skip_all)]
2166 pub async fn enable_encryption(&self) -> Result<()> {
2167 self.enable_encryption_inner(false).await
2168 }
2169
2170 #[instrument(skip_all)]
2203 #[cfg(feature = "experimental-encrypted-state-events")]
2204 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2205 self.enable_encryption_inner(true).await
2206 }
2207
2208 #[cfg(feature = "e2e-encryption")]
2217 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2218 async fn preshare_room_key(&self) -> Result<()> {
2219 self.ensure_room_joined()?;
2220
2221 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2223 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2224
2225 self.client
2226 .locks()
2227 .group_session_deduplicated_handler
2228 .run(self.room_id().to_owned(), async move {
2229 {
2230 let members = self
2231 .client
2232 .state_store()
2233 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2234 .await?;
2235 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2236 };
2237
2238 let response = self.share_room_key().await;
2239
2240 if let Err(r) = response {
2244 let machine = self.client.olm_machine().await;
2245 if let Some(machine) = machine.as_ref() {
2246 machine.discard_room_key(self.room_id()).await?;
2247 }
2248 return Err(r);
2249 }
2250
2251 Ok(())
2252 })
2253 .await
2254 }
2255
2256 #[cfg(feature = "e2e-encryption")]
2262 #[instrument(skip_all)]
2263 async fn share_room_key(&self) -> Result<()> {
2264 self.ensure_room_joined()?;
2265
2266 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2267
2268 for request in requests {
2269 let response = self.client.send_to_device(&request).await?;
2270 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2271 }
2272
2273 Ok(())
2274 }
2275
2276 #[instrument(skip_all)]
2285 pub async fn sync_up(&self) {
2286 while !self.is_synced() && self.state() == RoomState::Joined {
2287 let wait_for_beat = self.client.inner.sync_beat.listen();
2288 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2290 }
2291 }
2292
2293 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2364 SendMessageLikeEvent::new(self, content)
2365 }
2366
2367 #[cfg(feature = "e2e-encryption")]
2370 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2371 let olm = self.client.olm_machine().await;
2372 let olm = olm.as_ref().expect("Olm machine wasn't started");
2373
2374 let members =
2375 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2376
2377 let tracked: HashMap<_, _> = olm
2378 .store()
2379 .load_tracked_users()
2380 .await?
2381 .into_iter()
2382 .map(|tracked| (tracked.user_id, tracked.dirty))
2383 .collect();
2384
2385 let members_with_unknown_devices =
2388 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2389
2390 let (req_id, request) =
2391 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2392
2393 if !request.device_keys.is_empty() {
2394 self.client.keys_query(&req_id, request.device_keys).await?;
2395 }
2396
2397 Ok(())
2398 }
2399
2400 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2444 pub fn send_raw<'a>(
2445 &'a self,
2446 event_type: &'a str,
2447 content: impl IntoRawMessageLikeEventContent,
2448 ) -> SendRawMessageLikeEvent<'a> {
2449 SendRawMessageLikeEvent::new(self, event_type, content)
2452 }
2453
2454 #[instrument(skip_all)]
2502 pub fn send_attachment<'a>(
2503 &'a self,
2504 filename: impl Into<String>,
2505 content_type: &'a Mime,
2506 data: Vec<u8>,
2507 config: AttachmentConfig,
2508 ) -> SendAttachment<'a> {
2509 SendAttachment::new(self, filename.into(), content_type, data, config)
2510 }
2511
2512 #[instrument(skip_all)]
2540 pub(super) async fn prepare_and_send_attachment<'a>(
2541 &'a self,
2542 filename: String,
2543 content_type: &'a Mime,
2544 data: Vec<u8>,
2545 mut config: AttachmentConfig,
2546 send_progress: SharedObservable<TransmissionProgress>,
2547 store_in_cache: bool,
2548 ) -> Result<send_message_event::v3::Response> {
2549 self.ensure_room_joined()?;
2550
2551 let txn_id = config.txn_id.take();
2552 let mentions = config.mentions.take();
2553
2554 let thumbnail = config.thumbnail.take();
2555
2556 let thumbnail_cache_info = if store_in_cache {
2558 thumbnail
2559 .as_ref()
2560 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2561 } else {
2562 None
2563 };
2564
2565 #[cfg(feature = "e2e-encryption")]
2566 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2567 self.client
2568 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2569 .await?
2570 } else {
2571 self.client
2572 .media()
2573 .upload_plain_media_and_thumbnail(
2574 content_type,
2575 data.clone(),
2578 thumbnail,
2579 send_progress,
2580 )
2581 .await?
2582 };
2583
2584 #[cfg(not(feature = "e2e-encryption"))]
2585 let (media_source, thumbnail) = self
2586 .client
2587 .media()
2588 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2589 .await?;
2590
2591 if store_in_cache {
2592 let media_store_lock_guard = self.client.media_store().lock().await?;
2593
2594 debug!("caching the media");
2598 let request =
2599 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2600
2601 if let Err(err) = media_store_lock_guard
2602 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2603 .await
2604 {
2605 warn!("unable to cache the media after uploading it: {err}");
2606 }
2607
2608 if let Some(((data, height, width), source)) =
2609 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2610 {
2611 debug!("caching the thumbnail");
2612
2613 let request = MediaRequestParameters {
2614 source: source.clone(),
2615 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2616 };
2617
2618 if let Err(err) = media_store_lock_guard
2619 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2620 .await
2621 {
2622 warn!("unable to cache the media after uploading it: {err}");
2623 }
2624 }
2625 }
2626
2627 let content = self
2628 .make_media_event(
2629 Room::make_attachment_type(
2630 content_type,
2631 filename,
2632 media_source,
2633 config.caption,
2634 config.info,
2635 thumbnail,
2636 ),
2637 mentions,
2638 config.reply,
2639 )
2640 .await?;
2641
2642 let mut fut = self.send(content);
2643 if let Some(txn_id) = txn_id {
2644 fut = fut.with_transaction_id(txn_id);
2645 }
2646
2647 fut.await.map(|result| result.response)
2648 }
2649
2650 #[allow(clippy::too_many_arguments)]
2653 pub(crate) fn make_attachment_type(
2654 content_type: &Mime,
2655 filename: String,
2656 source: MediaSource,
2657 caption: Option<TextMessageEventContent>,
2658 info: Option<AttachmentInfo>,
2659 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2660 ) -> MessageType {
2661 make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2662 }
2663
2664 pub(crate) async fn make_media_event(
2667 &self,
2668 msg_type: MessageType,
2669 mentions: Option<Mentions>,
2670 reply: Option<Reply>,
2671 ) -> Result<RoomMessageEventContent> {
2672 let mut content = RoomMessageEventContent::new(msg_type);
2673 if let Some(mentions) = mentions {
2674 content = content.add_mentions(mentions);
2675 }
2676 if let Some(reply) = reply {
2677 content = self.make_reply_event(content.into(), reply).await?;
2680 }
2681 Ok(content)
2682 }
2683
2684 #[cfg(feature = "unstable-msc4274")]
2687 #[allow(clippy::too_many_arguments)]
2688 pub(crate) fn make_gallery_item_type(
2689 content_type: &Mime,
2690 filename: String,
2691 source: MediaSource,
2692 caption: Option<TextMessageEventContent>,
2693 info: Option<AttachmentInfo>,
2694 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2695 ) -> GalleryItemType {
2696 make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2697 }
2698
2699 pub async fn update_power_levels(
2708 &self,
2709 updates: Vec<(&UserId, Int)>,
2710 ) -> Result<send_state_event::v3::Response> {
2711 let mut power_levels = self.power_levels().await?;
2712
2713 for (user_id, new_level) in updates {
2714 if new_level == power_levels.users_default {
2715 power_levels.users.remove(user_id);
2716 } else {
2717 power_levels.users.insert(user_id.to_owned(), new_level);
2718 }
2719 }
2720
2721 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2722 }
2723
2724 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2729 let mut power_levels = self.power_levels().await?;
2730 power_levels.apply(changes)?;
2731 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2732 Ok(())
2733 }
2734
2735 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2739 let creators = self.creators().unwrap_or_default();
2740 let rules = self.clone_info().room_version_rules_or_default();
2741
2742 let default_power_levels =
2743 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2744 let changes = RoomPowerLevelChanges::from(default_power_levels);
2745 self.apply_power_level_changes(changes).await?;
2746 Ok(self.power_levels().await?)
2747 }
2748
2749 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2754 let power_level = self.get_user_power_level(user_id).await?;
2755 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2756 }
2757
2758 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2763 let event = self.power_levels().await?;
2764 Ok(event.for_user(user_id))
2765 }
2766
2767 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2770 let power_levels = self.power_levels().await.ok();
2771 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2772 if let Some(power_levels) = power_levels {
2773 for (id, level) in power_levels.users.into_iter() {
2774 user_power_levels.insert(id, level.into());
2775 }
2776 }
2777 user_power_levels
2778 }
2779
2780 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2782 self.send_state_event(RoomNameEventContent::new(name)).await
2783 }
2784
2785 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2787 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2788 }
2789
2790 pub async fn set_avatar_url(
2796 &self,
2797 url: &MxcUri,
2798 info: Option<avatar::ImageInfo>,
2799 ) -> Result<send_state_event::v3::Response> {
2800 self.ensure_room_joined()?;
2801
2802 let mut room_avatar_event = RoomAvatarEventContent::new();
2803 room_avatar_event.url = Some(url.to_owned());
2804 room_avatar_event.info = info.map(Box::new);
2805
2806 self.send_state_event(room_avatar_event).await
2807 }
2808
2809 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2811 self.send_state_event(RoomAvatarEventContent::new()).await
2812 }
2813
2814 pub async fn upload_avatar(
2822 &self,
2823 mime: &Mime,
2824 data: Vec<u8>,
2825 info: Option<avatar::ImageInfo>,
2826 ) -> Result<send_state_event::v3::Response> {
2827 self.ensure_room_joined()?;
2828
2829 let upload_response = self.client.media().upload(mime, data, None).await?;
2830 let mut info = info.unwrap_or_default();
2831 info.blurhash = upload_response.blurhash;
2832 info.mimetype = Some(mime.to_string());
2833
2834 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2835 }
2836
2837 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2881 #[instrument(skip_all)]
2882 pub async fn send_state_event(
2883 &self,
2884 content: impl StateEventContent<StateKey = EmptyStateKey>,
2885 ) -> Result<send_state_event::v3::Response> {
2886 self.send_state_event_for_key(&EmptyStateKey, content).await
2887 }
2888
2889 #[cfg(feature = "experimental-encrypted-state-events")]
2940 #[instrument(skip_all)]
2941 pub fn send_state_event<'a>(
2942 &'a self,
2943 content: impl StateEventContent<StateKey = EmptyStateKey>,
2944 ) -> SendStateEvent<'a> {
2945 self.send_state_event_for_key(&EmptyStateKey, content)
2946 }
2947
2948 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2989 pub async fn send_state_event_for_key<C, K>(
2990 &self,
2991 state_key: &K,
2992 content: C,
2993 ) -> Result<send_state_event::v3::Response>
2994 where
2995 C: StateEventContent,
2996 C::StateKey: Borrow<K>,
2997 K: AsRef<str> + ?Sized,
2998 {
2999 self.ensure_room_joined()?;
3000 let request =
3001 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3002 let response = self.client.send(request).await?;
3003 Ok(response)
3004 }
3005
3006 #[cfg(feature = "experimental-encrypted-state-events")]
3055 pub fn send_state_event_for_key<'a, C, K>(
3056 &'a self,
3057 state_key: &K,
3058 content: C,
3059 ) -> SendStateEvent<'a>
3060 where
3061 C: StateEventContent,
3062 C::StateKey: Borrow<K>,
3063 K: AsRef<str> + ?Sized,
3064 {
3065 SendStateEvent::new(self, state_key, content)
3066 }
3067
3068 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3103 #[instrument(skip_all)]
3104 pub async fn send_state_event_raw(
3105 &self,
3106 event_type: &str,
3107 state_key: &str,
3108 content: impl IntoRawStateEventContent,
3109 ) -> Result<send_state_event::v3::Response> {
3110 self.ensure_room_joined()?;
3111
3112 let request = send_state_event::v3::Request::new_raw(
3113 self.room_id().to_owned(),
3114 event_type.into(),
3115 state_key.to_owned(),
3116 content.into_raw_state_event_content(),
3117 );
3118
3119 Ok(self.client.send(request).await?)
3120 }
3121
3122 #[cfg(feature = "experimental-encrypted-state-events")]
3164 #[instrument(skip_all)]
3165 pub fn send_state_event_raw<'a>(
3166 &'a self,
3167 event_type: &'a str,
3168 state_key: &'a str,
3169 content: impl IntoRawStateEventContent,
3170 ) -> SendRawStateEvent<'a> {
3171 SendRawStateEvent::new(self, event_type, state_key, content)
3172 }
3173
3174 #[instrument(skip_all)]
3209 pub async fn redact(
3210 &self,
3211 event_id: &EventId,
3212 reason: Option<&str>,
3213 txn_id: Option<OwnedTransactionId>,
3214 ) -> HttpResult<redact_event::v3::Response> {
3215 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3216 let request = assign!(
3217 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3218 { reason: reason.map(ToOwned::to_owned) }
3219 );
3220
3221 self.client.send(request).await
3222 }
3223
3224 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3233 let acl_ev = self
3234 .get_state_event_static::<RoomServerAclEventContent>()
3235 .await?
3236 .and_then(|ev| ev.deserialize().ok());
3237 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3238 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3239 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3240 });
3241
3242 let members: Vec<_> = self
3246 .members_no_sync(RoomMemberships::JOIN)
3247 .await?
3248 .into_iter()
3249 .filter(|member| {
3250 let server = member.user_id().server_name();
3251 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3252 })
3253 .collect();
3254
3255 let max = members
3258 .iter()
3259 .max_by_key(|member| member.power_level())
3260 .filter(|max| max.power_level() >= int!(50))
3261 .map(|member| member.user_id().server_name());
3262
3263 let servers = members
3265 .iter()
3266 .map(|member| member.user_id().server_name())
3267 .filter(|server| max.filter(|max| max == server).is_none())
3268 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3269 *servers.entry(server).or_default() += 1;
3270 servers
3271 });
3272 let mut servers: Vec<_> = servers.into_iter().collect();
3273 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3274
3275 Ok(max
3276 .into_iter()
3277 .chain(servers.into_iter().map(|(name, _)| name))
3278 .take(3)
3279 .map(ToOwned::to_owned)
3280 .collect())
3281 }
3282
3283 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3290 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3291 return Ok(alias.matrix_to_uri());
3292 }
3293
3294 let via = self.route().await?;
3295 Ok(self.room_id().matrix_to_uri_via(via))
3296 }
3297
3298 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3309 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3310 return Ok(alias.matrix_uri(join));
3311 }
3312
3313 let via = self.route().await?;
3314 Ok(self.room_id().matrix_uri_via(via, join))
3315 }
3316
3317 pub async fn matrix_to_event_permalink(
3331 &self,
3332 event_id: impl Into<OwnedEventId>,
3333 ) -> Result<MatrixToUri> {
3334 let via = self.route().await?;
3337 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3338 }
3339
3340 pub async fn matrix_event_permalink(
3354 &self,
3355 event_id: impl Into<OwnedEventId>,
3356 ) -> Result<MatrixUri> {
3357 let via = self.route().await?;
3360 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3361 }
3362
3363 pub async fn load_user_receipt(
3376 &self,
3377 receipt_type: ReceiptType,
3378 thread: ReceiptThread,
3379 user_id: &UserId,
3380 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3381 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3382 }
3383
3384 pub async fn load_event_receipts(
3397 &self,
3398 receipt_type: ReceiptType,
3399 thread: ReceiptThread,
3400 event_id: &EventId,
3401 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3402 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3403 }
3404
3405 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3410 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3411 }
3412
3413 pub(crate) async fn push_condition_room_ctx_internal(
3420 &self,
3421 with_threads_subscriptions: bool,
3422 ) -> Result<Option<PushConditionRoomCtx>> {
3423 let room_id = self.room_id();
3424 let user_id = self.own_user_id();
3425 let room_info = self.clone_info();
3426 let member_count = room_info.active_members_count();
3427
3428 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3429 member.name().to_owned()
3430 } else {
3431 return Ok(None);
3432 };
3433
3434 let power_levels = match self.power_levels().await {
3435 Ok(power_levels) => Some(power_levels.into()),
3436 Err(error) => {
3437 if matches!(room_info.state(), RoomState::Joined) {
3438 error!("Could not compute power levels for push conditions: {error}");
3441 }
3442 None
3443 }
3444 };
3445
3446 let mut ctx = assign!(PushConditionRoomCtx::new(
3447 room_id.to_owned(),
3448 UInt::new(member_count).unwrap_or(UInt::MAX),
3449 user_id.to_owned(),
3450 user_display_name,
3451 ),
3452 {
3453 power_levels,
3454 });
3455
3456 if with_threads_subscriptions {
3457 let this = self.clone();
3458 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3459 let room = this.clone();
3460 Box::pin(async move {
3461 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3462 maybe_sub.is_some()
3463 } else {
3464 false
3465 }
3466 })
3467 });
3468 }
3469
3470 Ok(Some(ctx))
3471 }
3472
3473 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3476 self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3477 }
3478
3479 #[instrument(skip(self))]
3483 pub(crate) async fn push_context_internal(
3484 &self,
3485 with_threads_subscriptions: bool,
3486 ) -> Result<Option<PushContext>> {
3487 let Some(push_condition_room_ctx) =
3488 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3489 else {
3490 debug!("Could not aggregate push context");
3491 return Ok(None);
3492 };
3493 let push_rules = self.client().account().push_rules().await?;
3494 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3495 }
3496
3497 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3502 if let Some(ctx) = self.push_context().await? {
3503 Ok(Some(ctx.for_event(event).await))
3504 } else {
3505 Ok(None)
3506 }
3507 }
3508
3509 pub async fn invite_details(&self) -> Result<Invite> {
3512 let state = self.state();
3513
3514 if state != RoomState::Invited {
3515 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3516 }
3517
3518 let invitee = self
3519 .get_member_no_sync(self.own_user_id())
3520 .await?
3521 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3522 let event = invitee.event();
3523 let inviter_id = event.sender();
3524 let inviter = self.get_member_no_sync(inviter_id).await?;
3525 Ok(Invite { invitee, inviter })
3526 }
3527
3528 pub async fn member_with_sender_info(
3536 &self,
3537 user_id: &UserId,
3538 ) -> Result<RoomMemberWithSenderInfo> {
3539 let Some(member) = self.get_member_no_sync(user_id).await? else {
3540 return Err(Error::InsufficientData);
3541 };
3542
3543 let sender_member =
3544 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3545 Some(member)
3547 } else if self.are_members_synced() {
3548 None
3550 } else if self.sync_members().await.is_ok() {
3551 self.get_member_no_sync(member.event().sender()).await?
3553 } else {
3554 None
3555 };
3556
3557 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3558 }
3559
3560 pub async fn forget(&self) -> Result<()> {
3566 let state = self.state();
3567 match state {
3568 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3569 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3570 "Left / Banned",
3571 state,
3572 ))));
3573 }
3574 RoomState::Left | RoomState::Banned => {}
3575 }
3576
3577 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3578 let _response = self.client.send(request).await?;
3579
3580 if self.inner.direct_targets_length() != 0
3582 && let Err(e) = self.set_is_direct(false).await
3583 {
3584 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3587 }
3588
3589 self.client.base_client().forget_room(self.inner.room_id()).await?;
3590
3591 Ok(())
3592 }
3593
3594 fn ensure_room_joined(&self) -> Result<()> {
3595 let state = self.state();
3596 if state == RoomState::Joined {
3597 Ok(())
3598 } else {
3599 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3600 }
3601 }
3602
3603 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3605 if !matches!(self.state(), RoomState::Joined) {
3606 return None;
3607 }
3608
3609 let notification_settings = self.client().notification_settings().await;
3610
3611 let notification_mode =
3613 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3614
3615 if notification_mode.is_some() {
3616 notification_mode
3617 } else if let Ok(is_encrypted) =
3618 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3619 {
3620 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3625 let default_mode = notification_settings
3626 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3627 .await;
3628 Some(default_mode)
3629 } else {
3630 None
3631 }
3632 }
3633
3634 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3645 if !matches!(self.state(), RoomState::Joined) {
3646 return None;
3647 }
3648
3649 let notification_settings = self.client().notification_settings().await;
3650
3651 let mode =
3653 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3654
3655 if let Some(mode) = mode {
3656 self.update_cached_user_defined_notification_mode(mode);
3657 }
3658
3659 mode
3660 }
3661
3662 pub async fn report_content(
3675 &self,
3676 event_id: OwnedEventId,
3677 score: Option<ReportedContentScore>,
3678 reason: Option<String>,
3679 ) -> Result<report_content::v3::Response> {
3680 let state = self.state();
3681 if state != RoomState::Joined {
3682 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3683 }
3684
3685 let request = report_content::v3::Request::new(
3686 self.inner.room_id().to_owned(),
3687 event_id,
3688 score.map(Into::into),
3689 reason,
3690 );
3691 Ok(self.client.send(request).await?)
3692 }
3693
3694 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3705 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3706
3707 Ok(self.client.send(request).await?)
3708 }
3709
3710 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3716 if self.is_marked_unread() == unread {
3717 return Ok(());
3719 }
3720
3721 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3722
3723 let content = MarkedUnreadEventContent::new(unread);
3724
3725 let request = set_room_account_data::v3::Request::new(
3726 user_id.to_owned(),
3727 self.inner.room_id().to_owned(),
3728 &content,
3729 )?;
3730
3731 self.client.send(request).await?;
3732 Ok(())
3733 }
3734
3735 pub async fn event_cache(
3738 &self,
3739 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3740 self.client.event_cache().for_room(self.room_id()).await
3741 }
3742
3743 pub(crate) async fn get_user_beacon_info(
3750 &self,
3751 user_id: &UserId,
3752 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3753 let raw_event = self
3754 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3755 .await?
3756 .ok_or(BeaconError::NotFound)?;
3757
3758 match raw_event.deserialize()? {
3759 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3760 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3761 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3762 }
3763 }
3764
3765 pub async fn start_live_location_share(
3778 &self,
3779 duration_millis: u64,
3780 description: Option<String>,
3781 ) -> Result<send_state_event::v3::Response> {
3782 self.ensure_room_joined()?;
3783
3784 self.send_state_event_for_key(
3785 self.own_user_id(),
3786 BeaconInfoEventContent::new(
3787 description,
3788 Duration::from_millis(duration_millis),
3789 true,
3790 None,
3791 ),
3792 )
3793 .await
3794 }
3795
3796 pub async fn stop_live_location_share(
3803 &self,
3804 ) -> Result<send_state_event::v3::Response, BeaconError> {
3805 self.ensure_room_joined()?;
3806
3807 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3808 beacon_info_event.content.stop();
3809 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3810 }
3811
3812 pub async fn send_location_beacon(
3824 &self,
3825 geo_uri: String,
3826 ) -> Result<send_message_event::v3::Response, BeaconError> {
3827 self.ensure_room_joined()?;
3828
3829 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3830
3831 if beacon_info_event.content.is_live() {
3832 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3833 Ok(self.send(content).await?.response)
3834 } else {
3835 Err(BeaconError::NotLive)
3836 }
3837 }
3838
3839 pub async fn save_composer_draft(
3842 &self,
3843 draft: ComposerDraft,
3844 thread_root: Option<&EventId>,
3845 ) -> Result<()> {
3846 self.client
3847 .state_store()
3848 .set_kv_data(
3849 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3850 StateStoreDataValue::ComposerDraft(draft),
3851 )
3852 .await?;
3853 Ok(())
3854 }
3855
3856 pub async fn load_composer_draft(
3859 &self,
3860 thread_root: Option<&EventId>,
3861 ) -> Result<Option<ComposerDraft>> {
3862 let data = self
3863 .client
3864 .state_store()
3865 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3866 .await?;
3867 Ok(data.and_then(|d| d.into_composer_draft()))
3868 }
3869
3870 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3873 self.client
3874 .state_store()
3875 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3876 .await?;
3877 Ok(())
3878 }
3879
3880 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3883 let response = self
3884 .client
3885 .send(get_state_event_for_key::v3::Request::new(
3886 self.room_id().to_owned(),
3887 StateEventType::RoomPinnedEvents,
3888 "".to_owned(),
3889 ))
3890 .await;
3891
3892 match response {
3893 Ok(response) => Ok(Some(
3894 response
3895 .into_content()
3896 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3897 .pinned,
3898 )),
3899 Err(http_error) => match http_error.as_client_api_error() {
3900 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3901 _ => Err(http_error.into()),
3902 },
3903 }
3904 }
3905
3906 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3914 ObservableLiveLocation::new(&self.client, self.room_id())
3915 }
3916
3917 pub async fn subscribe_to_knock_requests(
3931 &self,
3932 ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
3933 let this = Arc::new(self.clone());
3934
3935 let room_member_events_observer =
3936 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3937
3938 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3939 let mut seen_request_ids_stream = self
3940 .seen_knock_request_ids_map
3941 .subscribe()
3942 .await
3943 .map(|values| values.unwrap_or_default());
3944
3945 let mut room_info_stream = self.subscribe_info();
3946
3947 let clear_seen_ids_handle = spawn({
3950 let this = self.clone();
3951 async move {
3952 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3953 while member_updates_stream.recv().await.is_ok() {
3954 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3956 warn!("Failed to remove seen knock requests: {err}")
3957 }
3958 }
3959 }
3960 });
3961
3962 let combined_stream = stream! {
3963 match this.get_current_join_requests(¤t_seen_ids).await {
3965 Ok(initial_requests) => yield initial_requests,
3966 Err(err) => warn!("Failed to get initial requests to join: {err}")
3967 }
3968
3969 let mut requests_stream = room_member_events_observer.subscribe();
3970 let mut seen_ids = current_seen_ids.clone();
3971
3972 loop {
3973 tokio::select! {
3976 Some((event, _)) = requests_stream.next() => {
3977 if let Some(event) = event.as_original() {
3978 let emit = if event.prev_content().is_some() {
3980 matches!(event.membership_change(),
3981 MembershipChange::Banned |
3982 MembershipChange::Knocked |
3983 MembershipChange::KnockAccepted |
3984 MembershipChange::KnockDenied |
3985 MembershipChange::KnockRetracted
3986 )
3987 } else {
3988 true
3991 };
3992
3993 if emit {
3994 match this.get_current_join_requests(&seen_ids).await {
3995 Ok(requests) => yield requests,
3996 Err(err) => {
3997 warn!("Failed to get updated knock requests on new member event: {err}")
3998 }
3999 }
4000 }
4001 }
4002 }
4003
4004 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4005 seen_ids = new_seen_ids;
4007
4008 match this.get_current_join_requests(&seen_ids).await {
4011 Ok(requests) => yield requests,
4012 Err(err) => {
4013 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4014 }
4015 }
4016 }
4017
4018 Some(room_info) = room_info_stream.next() => {
4019 if !room_info.are_members_synced() {
4022 match this.get_current_join_requests(&seen_ids).await {
4023 Ok(requests) => yield requests,
4024 Err(err) => {
4025 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4026 }
4027 }
4028 }
4029 }
4030 else => break,
4032 }
4033 }
4034 };
4035
4036 Ok((combined_stream, clear_seen_ids_handle))
4037 }
4038
4039 async fn get_current_join_requests(
4040 &self,
4041 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4042 ) -> Result<Vec<KnockRequest>> {
4043 Ok(self
4044 .members(RoomMemberships::KNOCK)
4045 .await?
4046 .into_iter()
4047 .filter_map(|member| {
4048 let event_id = member.event().event_id()?;
4049 Some(KnockRequest::new(
4050 self,
4051 event_id,
4052 member.event().timestamp(),
4053 KnockRequestMemberInfo::from_member(&member),
4054 seen_request_ids.contains_key(event_id),
4055 ))
4056 })
4057 .collect())
4058 }
4059
4060 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4062 RoomPrivacySettings::new(&self.inner, &self.client)
4063 }
4064
4065 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4073 let request = opts.into_request(self.room_id());
4074
4075 let response = self.client.send(request).await?;
4076
4077 let push_ctx = self.push_context().await?;
4078 let chunk = join_all(
4079 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4080 )
4081 .await;
4082
4083 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4084 }
4085
4086 pub async fn relations(
4100 &self,
4101 event_id: OwnedEventId,
4102 opts: RelationsOptions,
4103 ) -> Result<Relations> {
4104 let relations = opts.send(self, event_id).await;
4105
4106 if let Ok(Relations { chunk, .. }) = &relations
4108 && let Ok((cache, _handles)) = self.event_cache().await
4109 {
4110 cache.save_events(chunk.clone()).await;
4111 }
4112
4113 relations
4114 }
4115
4116 #[cfg(feature = "experimental-search")]
4119 pub async fn search(
4120 &self,
4121 query: &str,
4122 max_number_of_results: usize,
4123 pagination_offset: Option<usize>,
4124 ) -> Result<Vec<OwnedEventId>, IndexError> {
4125 let mut search_index_guard = self.client.search_index().lock().await;
4126 search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4127 }
4128
4129 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4151 pub async fn subscribe_thread(
4152 &self,
4153 thread_root: OwnedEventId,
4154 automatic: Option<OwnedEventId>,
4155 ) -> Result<()> {
4156 let is_automatic = automatic.is_some();
4157
4158 match self
4159 .client
4160 .send(subscribe_thread::unstable::Request::new(
4161 self.room_id().to_owned(),
4162 thread_root.clone(),
4163 automatic,
4164 ))
4165 .await
4166 {
4167 Ok(_response) => {
4168 trace!("Server acknowledged the thread subscription; saving in db");
4169
4170 self.client
4172 .state_store()
4173 .upsert_thread_subscription(
4174 self.room_id(),
4175 &thread_root,
4176 StoredThreadSubscription {
4177 status: ThreadSubscriptionStatus::Subscribed {
4178 automatic: is_automatic,
4179 },
4180 bump_stamp: None,
4181 },
4182 )
4183 .await?;
4184
4185 Ok(())
4186 }
4187
4188 Err(err) => {
4189 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4190 trace!("Thread subscription skipped: {err}");
4195 Ok(())
4196 } else {
4197 Err(err.into())
4199 }
4200 }
4201 }
4202 }
4203
4204 pub async fn subscribe_thread_if_needed(
4210 &self,
4211 thread_root: &EventId,
4212 automatic: Option<OwnedEventId>,
4213 ) -> Result<()> {
4214 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4215 if !prev_sub.automatic || automatic.is_some() {
4218 return Ok(());
4221 }
4222 }
4223 self.subscribe_thread(thread_root.to_owned(), automatic).await
4224 }
4225
4226 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4238 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4239 self.client
4240 .send(unsubscribe_thread::unstable::Request::new(
4241 self.room_id().to_owned(),
4242 thread_root.clone(),
4243 ))
4244 .await?;
4245
4246 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4247
4248 self.client
4250 .state_store()
4251 .upsert_thread_subscription(
4252 self.room_id(),
4253 &thread_root,
4254 StoredThreadSubscription {
4255 status: ThreadSubscriptionStatus::Unsubscribed,
4256 bump_stamp: None,
4257 },
4258 )
4259 .await?;
4260
4261 Ok(())
4262 }
4263
4264 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4281 pub async fn fetch_thread_subscription(
4282 &self,
4283 thread_root: OwnedEventId,
4284 ) -> Result<Option<ThreadSubscription>> {
4285 let result = self
4286 .client
4287 .send(get_thread_subscription::unstable::Request::new(
4288 self.room_id().to_owned(),
4289 thread_root.clone(),
4290 ))
4291 .await;
4292
4293 let subscription = match result {
4294 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4295 Err(http_error) => match http_error.as_client_api_error() {
4296 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4297 _ => return Err(http_error.into()),
4298 },
4299 };
4300
4301 if let Some(sub) = &subscription {
4303 self.client
4304 .state_store()
4305 .upsert_thread_subscription(
4306 self.room_id(),
4307 &thread_root,
4308 StoredThreadSubscription {
4309 status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4310 bump_stamp: None,
4311 },
4312 )
4313 .await?;
4314 } else {
4315 self.client
4317 .state_store()
4318 .remove_thread_subscription(self.room_id(), &thread_root)
4319 .await?;
4320 }
4321
4322 Ok(subscription)
4323 }
4324
4325 pub async fn load_or_fetch_thread_subscription(
4332 &self,
4333 thread_root: &EventId,
4334 ) -> Result<Option<ThreadSubscription>> {
4335 if self.client.thread_subscription_catchup().is_outdated() {
4337 return self.fetch_thread_subscription(thread_root.to_owned()).await;
4338 }
4339
4340 Ok(self
4342 .client
4343 .state_store()
4344 .load_thread_subscription(self.room_id(), thread_root)
4345 .await
4346 .map(|maybe_sub| {
4347 maybe_sub.and_then(|stored| match stored.status {
4348 ThreadSubscriptionStatus::Unsubscribed => None,
4349 ThreadSubscriptionStatus::Subscribed { automatic } => {
4350 Some(ThreadSubscription { automatic })
4351 }
4352 })
4353 })?)
4354 }
4355}
4356
4357#[cfg(feature = "e2e-encryption")]
4358impl RoomIdentityProvider for Room {
4359 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4360 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4361 }
4362
4363 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4364 Box::pin(async {
4365 let members = self
4366 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4367 .await
4368 .unwrap_or_else(|_| Default::default());
4369
4370 let mut ret: Vec<UserIdentity> = Vec::new();
4371 for member in members {
4372 if let Some(i) = self.user_identity(member.user_id()).await {
4373 ret.push(i);
4374 }
4375 }
4376 ret
4377 })
4378 }
4379
4380 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4381 Box::pin(async {
4382 self.client
4383 .encryption()
4384 .get_user_identity(user_id)
4385 .await
4386 .unwrap_or(None)
4387 .map(|u| u.underlying_identity())
4388 })
4389 }
4390}
4391
4392#[derive(Clone, Debug)]
4395pub(crate) struct WeakRoom {
4396 client: WeakClient,
4397 room_id: OwnedRoomId,
4398}
4399
4400impl WeakRoom {
4401 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4403 Self { client, room_id }
4404 }
4405
4406 pub fn get(&self) -> Option<Room> {
4408 self.client.get().and_then(|client| client.get_room(&self.room_id))
4409 }
4410
4411 pub fn room_id(&self) -> &RoomId {
4413 &self.room_id
4414 }
4415}
4416
4417#[derive(Debug, Clone)]
4419pub struct Invite {
4420 pub invitee: RoomMember,
4422 pub inviter: Option<RoomMember>,
4424}
4425
4426#[derive(Error, Debug)]
4427enum InvitationError {
4428 #[error("No membership event found")]
4429 EventMissing,
4430}
4431
4432#[derive(Debug, Clone, Default)]
4434#[non_exhaustive]
4435pub struct Receipts {
4436 pub fully_read: Option<OwnedEventId>,
4438 pub public_read_receipt: Option<OwnedEventId>,
4440 pub private_read_receipt: Option<OwnedEventId>,
4442}
4443
4444impl Receipts {
4445 pub fn new() -> Self {
4447 Self::default()
4448 }
4449
4450 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4459 self.fully_read = event_id.into();
4460 self
4461 }
4462
4463 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4469 self.public_read_receipt = event_id.into();
4470 self
4471 }
4472
4473 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4477 self.private_read_receipt = event_id.into();
4478 self
4479 }
4480
4481 pub fn is_empty(&self) -> bool {
4483 self.fully_read.is_none()
4484 && self.public_read_receipt.is_none()
4485 && self.private_read_receipt.is_none()
4486 }
4487}
4488
4489#[derive(Debug)]
4492pub enum ParentSpace {
4493 Reciprocal(Room),
4496 WithPowerlevel(Room),
4501 Illegitimate(Room),
4504 Unverifiable(OwnedRoomId),
4507}
4508
4509#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4513pub struct ReportedContentScore(i8);
4514
4515impl ReportedContentScore {
4516 pub const MIN: Self = Self(-100);
4520
4521 pub const MAX: Self = Self(0);
4525
4526 pub fn new(value: i8) -> Option<Self> {
4535 value.try_into().ok()
4536 }
4537
4538 pub fn new_saturating(value: i8) -> Self {
4544 if value > Self::MAX {
4545 Self::MAX
4546 } else if value < Self::MIN {
4547 Self::MIN
4548 } else {
4549 Self(value)
4550 }
4551 }
4552
4553 pub fn value(&self) -> i8 {
4555 self.0
4556 }
4557}
4558
4559impl PartialEq<i8> for ReportedContentScore {
4560 fn eq(&self, other: &i8) -> bool {
4561 self.0.eq(other)
4562 }
4563}
4564
4565impl PartialEq<ReportedContentScore> for i8 {
4566 fn eq(&self, other: &ReportedContentScore) -> bool {
4567 self.eq(&other.0)
4568 }
4569}
4570
4571impl PartialOrd<i8> for ReportedContentScore {
4572 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4573 self.0.partial_cmp(other)
4574 }
4575}
4576
4577impl PartialOrd<ReportedContentScore> for i8 {
4578 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4579 self.partial_cmp(&other.0)
4580 }
4581}
4582
4583impl From<ReportedContentScore> for Int {
4584 fn from(value: ReportedContentScore) -> Self {
4585 value.0.into()
4586 }
4587}
4588
4589impl TryFrom<i8> for ReportedContentScore {
4590 type Error = TryFromReportedContentScoreError;
4591
4592 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4593 if value > Self::MAX || value < Self::MIN {
4594 Err(TryFromReportedContentScoreError(()))
4595 } else {
4596 Ok(Self(value))
4597 }
4598 }
4599}
4600
4601impl TryFrom<i16> for ReportedContentScore {
4602 type Error = TryFromReportedContentScoreError;
4603
4604 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4605 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4606 value.try_into()
4607 }
4608}
4609
4610impl TryFrom<i32> for ReportedContentScore {
4611 type Error = TryFromReportedContentScoreError;
4612
4613 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4614 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4615 value.try_into()
4616 }
4617}
4618
4619impl TryFrom<i64> for ReportedContentScore {
4620 type Error = TryFromReportedContentScoreError;
4621
4622 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4623 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4624 value.try_into()
4625 }
4626}
4627
4628impl TryFrom<Int> for ReportedContentScore {
4629 type Error = TryFromReportedContentScoreError;
4630
4631 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4632 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4633 value.try_into()
4634 }
4635}
4636
4637trait EventSource {
4638 fn get_event(
4639 &self,
4640 event_id: &EventId,
4641 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4642}
4643
4644impl EventSource for &Room {
4645 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4646 self.load_or_fetch_event(event_id, None).await
4647 }
4648}
4649
4650#[derive(Debug, Clone, Error)]
4653#[error("out of range conversion attempted")]
4654pub struct TryFromReportedContentScoreError(());
4655
4656#[derive(Debug)]
4659pub struct RoomMemberWithSenderInfo {
4660 pub room_member: RoomMember,
4662 pub sender_info: Option<RoomMember>,
4665}
4666
4667#[cfg(all(test, not(target_family = "wasm")))]
4668mod tests {
4669 use std::collections::BTreeMap;
4670
4671 use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4672 use matrix_sdk_test::{
4673 JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
4674 event_factory::EventFactory, test_json,
4675 };
4676 use ruma::{
4677 RoomVersionId, event_id,
4678 events::{relation::RelationType, room::member::MembershipState},
4679 int, owned_event_id, room_id, user_id,
4680 };
4681 use wiremock::{
4682 Mock, MockServer, ResponseTemplate,
4683 matchers::{header, method, path_regex},
4684 };
4685
4686 use super::ReportedContentScore;
4687 use crate::{
4688 Client,
4689 config::RequestConfig,
4690 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4691 test_utils::{
4692 client::mock_matrix_session,
4693 logged_in_client,
4694 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4695 },
4696 };
4697
4698 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4699 #[async_test]
4700 async fn test_cache_invalidation_while_encrypt() {
4701 use matrix_sdk_base::store::RoomLoadSettings;
4702 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4703
4704 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4705 let session = mock_matrix_session();
4706
4707 let client = Client::builder()
4708 .homeserver_url("http://localhost:1234")
4709 .request_config(RequestConfig::new().disable_retry())
4710 .sqlite_store(&sqlite_path, None)
4711 .build()
4712 .await
4713 .unwrap();
4714 client
4715 .matrix_auth()
4716 .restore_session(session.clone(), RoomLoadSettings::default())
4717 .await
4718 .unwrap();
4719
4720 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4721
4722 let server = MockServer::start().await;
4724 {
4725 Mock::given(method("GET"))
4726 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4727 .and(header("authorization", "Bearer 1234"))
4728 .respond_with(
4729 ResponseTemplate::new(200)
4730 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4731 )
4732 .mount(&server)
4733 .await;
4734 let response = SyncResponseBuilder::default()
4735 .add_joined_room(
4736 JoinedRoomBuilder::default()
4737 .add_state_event(StateTestEvent::Member)
4738 .add_state_event(StateTestEvent::PowerLevels)
4739 .add_state_event(StateTestEvent::Encryption),
4740 )
4741 .build_sync_response();
4742 client.base_client().receive_sync_response(response).await.unwrap();
4743 }
4744
4745 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4746
4747 room.preshare_room_key().await.unwrap();
4749
4750 {
4753 let client = Client::builder()
4754 .homeserver_url("http://localhost:1234")
4755 .request_config(RequestConfig::new().disable_retry())
4756 .sqlite_store(&sqlite_path, None)
4757 .build()
4758 .await
4759 .unwrap();
4760 client
4761 .matrix_auth()
4762 .restore_session(session.clone(), RoomLoadSettings::default())
4763 .await
4764 .unwrap();
4765 client
4766 .encryption()
4767 .enable_cross_process_store_lock("client2".to_owned())
4768 .await
4769 .unwrap();
4770
4771 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4772 assert!(guard.is_some());
4773 }
4774
4775 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4777 assert!(guard.is_some());
4778
4779 let olm = client.olm_machine().await;
4781 let olm = olm.as_ref().expect("Olm machine wasn't started");
4782
4783 let _encrypted_content = olm
4786 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4787 .await
4788 .unwrap();
4789 }
4790
4791 #[test]
4792 fn reported_content_score() {
4793 let score = ReportedContentScore::new(0).unwrap();
4795 assert_eq!(score.value(), 0);
4796 let score = ReportedContentScore::new(-50).unwrap();
4797 assert_eq!(score.value(), -50);
4798 let score = ReportedContentScore::new(-100).unwrap();
4799 assert_eq!(score.value(), -100);
4800 assert_eq!(ReportedContentScore::new(10), None);
4801 assert_eq!(ReportedContentScore::new(-110), None);
4802
4803 let score = ReportedContentScore::new_saturating(0);
4804 assert_eq!(score.value(), 0);
4805 let score = ReportedContentScore::new_saturating(-50);
4806 assert_eq!(score.value(), -50);
4807 let score = ReportedContentScore::new_saturating(-100);
4808 assert_eq!(score.value(), -100);
4809 let score = ReportedContentScore::new_saturating(10);
4810 assert_eq!(score, ReportedContentScore::MAX);
4811 let score = ReportedContentScore::new_saturating(-110);
4812 assert_eq!(score, ReportedContentScore::MIN);
4813
4814 let score = ReportedContentScore::try_from(0i16).unwrap();
4816 assert_eq!(score.value(), 0);
4817 let score = ReportedContentScore::try_from(-100i16).unwrap();
4818 assert_eq!(score.value(), -100);
4819 ReportedContentScore::try_from(10i16).unwrap_err();
4820 ReportedContentScore::try_from(-110i16).unwrap_err();
4821
4822 let score = ReportedContentScore::try_from(0i32).unwrap();
4824 assert_eq!(score.value(), 0);
4825 let score = ReportedContentScore::try_from(-100i32).unwrap();
4826 assert_eq!(score.value(), -100);
4827 ReportedContentScore::try_from(10i32).unwrap_err();
4828 ReportedContentScore::try_from(-110i32).unwrap_err();
4829
4830 let score = ReportedContentScore::try_from(0i64).unwrap();
4832 assert_eq!(score.value(), 0);
4833 let score = ReportedContentScore::try_from(-100i64).unwrap();
4834 assert_eq!(score.value(), -100);
4835 ReportedContentScore::try_from(10i64).unwrap_err();
4836 ReportedContentScore::try_from(-110i64).unwrap_err();
4837
4838 let score = ReportedContentScore::try_from(int!(0)).unwrap();
4840 assert_eq!(score.value(), 0);
4841 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4842 assert_eq!(score.value(), -100);
4843 ReportedContentScore::try_from(int!(10)).unwrap_err();
4844 ReportedContentScore::try_from(int!(-110)).unwrap_err();
4845 }
4846
4847 #[async_test]
4848 async fn test_composer_draft() {
4849 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4850
4851 let client = logged_in_client(None).await;
4852
4853 let response = SyncResponseBuilder::default()
4854 .add_joined_room(JoinedRoomBuilder::default())
4855 .build_sync_response();
4856 client.base_client().receive_sync_response(response).await.unwrap();
4857 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4858
4859 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4860
4861 let draft = ComposerDraft {
4864 plain_text: "Hello, world!".to_owned(),
4865 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4866 draft_type: ComposerDraftType::NewMessage,
4867 attachments: vec![DraftAttachment {
4868 filename: "cat.txt".to_owned(),
4869 content: matrix_sdk_base::DraftAttachmentContent::File {
4870 data: b"meow".to_vec(),
4871 mimetype: Some("text/plain".to_owned()),
4872 size: Some(5),
4873 },
4874 }],
4875 };
4876
4877 room.save_composer_draft(draft.clone(), None).await.unwrap();
4878
4879 let thread_root = owned_event_id!("$thread_root:b.c");
4880 let thread_draft = ComposerDraft {
4881 plain_text: "Hello, thread!".to_owned(),
4882 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4883 draft_type: ComposerDraftType::NewMessage,
4884 attachments: vec![DraftAttachment {
4885 filename: "dog.txt".to_owned(),
4886 content: matrix_sdk_base::DraftAttachmentContent::File {
4887 data: b"wuv".to_vec(),
4888 mimetype: Some("text/plain".to_owned()),
4889 size: Some(4),
4890 },
4891 }],
4892 };
4893
4894 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4895
4896 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4898
4899 assert_eq!(
4901 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4902 Some(thread_draft.clone())
4903 );
4904
4905 room.clear_composer_draft(None).await.unwrap();
4907 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4908
4909 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4911
4912 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4914 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4915 }
4916
4917 #[async_test]
4918 async fn test_mark_join_requests_as_seen() {
4919 let server = MatrixMockServer::new().await;
4920 let client = server.client_builder().build().await;
4921 let event_id = event_id!("$a:b.c");
4922 let room_id = room_id!("!a:b.c");
4923 let user_id = user_id!("@alice:b.c");
4924
4925 let f = EventFactory::new().room(room_id);
4926 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4927 f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4928 ]);
4929 let room = server.sync_room(&client, joined_room_builder).await;
4930
4931 let seen_ids =
4933 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4934 assert!(seen_ids.is_empty());
4935
4936 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4938 .await
4939 .expect("Couldn't mark join request as seen");
4940
4941 let seen_ids =
4943 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4944 assert_eq!(seen_ids.len(), 1);
4945 assert_eq!(
4946 seen_ids.into_iter().next().expect("No next value"),
4947 (event_id.to_owned(), user_id.to_owned())
4948 )
4949 }
4950
4951 #[async_test]
4952 async fn test_own_room_membership_with_no_own_member_event() {
4953 let server = MatrixMockServer::new().await;
4954 let client = server.client_builder().build().await;
4955 let room_id = room_id!("!a:b.c");
4956
4957 let room = server.sync_joined_room(&client, room_id).await;
4958
4959 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4962 assert!(error.is_some());
4963 }
4964
4965 #[async_test]
4966 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4967 let server = MatrixMockServer::new().await;
4968 let client = server.client_builder().build().await;
4969 let room_id = room_id!("!a:b.c");
4970 let user_id = user_id!("@example:localhost");
4971
4972 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4973 let joined_room_builder =
4974 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
4975 let room = server.sync_room(&client, joined_room_builder).await;
4976
4977 let ret = room
4979 .member_with_sender_info(client.user_id().unwrap())
4980 .await
4981 .expect("Room member info should be available");
4982
4983 assert_eq!(ret.room_member.event().user_id(), user_id);
4985
4986 assert!(ret.sender_info.is_none());
4988 }
4989
4990 #[async_test]
4991 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4992 let server = MatrixMockServer::new().await;
4993 let client = server.client_builder().build().await;
4994 let room_id = room_id!("!a:b.c");
4995 let user_id = user_id!("@example:localhost");
4996
4997 let f = EventFactory::new().room(room_id).sender(user_id);
4998 let joined_room_builder =
4999 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5000 let room = server.sync_room(&client, joined_room_builder).await;
5001
5002 let ret = room
5004 .member_with_sender_info(client.user_id().unwrap())
5005 .await
5006 .expect("Room member info should be available");
5007
5008 assert_eq!(ret.room_member.event().user_id(), user_id);
5010
5011 assert!(ret.sender_info.is_some());
5013 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5014 }
5015
5016 #[async_test]
5017 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5018 let server = MatrixMockServer::new().await;
5019 let client = server.client_builder().build().await;
5020 let room_id = room_id!("!a:b.c");
5021 let user_id = user_id!("@example:localhost");
5022 let sender_id = user_id!("@alice:b.c");
5023
5024 let f = EventFactory::new().room(room_id).sender(sender_id);
5025 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5026 f.member(user_id).into(),
5027 f.member(sender_id).into(),
5029 ]);
5030 let room = server.sync_room(&client, joined_room_builder).await;
5031
5032 let ret = room
5034 .member_with_sender_info(client.user_id().unwrap())
5035 .await
5036 .expect("Room member info should be available");
5037
5038 assert_eq!(ret.room_member.event().user_id(), user_id);
5040
5041 assert!(ret.sender_info.is_some());
5043 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5044 }
5045
5046 #[async_test]
5047 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5048 let server = MatrixMockServer::new().await;
5049 let client = server.client_builder().build().await;
5050 let room_id = room_id!("!a:b.c");
5051 let user_id = user_id!("@example:localhost");
5052 let sender_id = user_id!("@alice:b.c");
5053
5054 let f = EventFactory::new().room(room_id).sender(sender_id);
5055 let joined_room_builder =
5056 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5057 let room = server.sync_room(&client, joined_room_builder).await;
5058
5059 server
5061 .mock_get_members()
5062 .ok(vec![f.member(sender_id).into_raw()])
5063 .mock_once()
5064 .mount()
5065 .await;
5066
5067 let ret = room
5069 .member_with_sender_info(client.user_id().unwrap())
5070 .await
5071 .expect("Room member info should be available");
5072
5073 assert_eq!(ret.room_member.event().user_id(), user_id);
5075
5076 assert!(ret.sender_info.is_some());
5078 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5079 }
5080
5081 #[async_test]
5082 async fn test_list_threads() {
5083 let server = MatrixMockServer::new().await;
5084 let client = server.client_builder().build().await;
5085
5086 let room_id = room_id!("!a:b.c");
5087 let sender_id = user_id!("@alice:b.c");
5088 let f = EventFactory::new().room(room_id).sender(sender_id);
5089
5090 let eid1 = event_id!("$1");
5091 let eid2 = event_id!("$2");
5092 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5093 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5094
5095 server
5096 .mock_room_threads()
5097 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5098 .mock_once()
5099 .mount()
5100 .await;
5101 server
5102 .mock_room_threads()
5103 .match_from("prev_batch")
5104 .ok(batch2, None)
5105 .mock_once()
5106 .mount()
5107 .await;
5108
5109 let room = server.sync_joined_room(&client, room_id).await;
5110 let result =
5111 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5112 assert_eq!(result.chunk.len(), 1);
5113 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5114 assert!(result.prev_batch_token.is_some());
5115
5116 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5117 let result = room.list_threads(opts).await.expect("Failed to list threads");
5118 assert_eq!(result.chunk.len(), 1);
5119 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5120 assert!(result.prev_batch_token.is_none());
5121 }
5122
5123 #[async_test]
5124 async fn test_relations() {
5125 let server = MatrixMockServer::new().await;
5126 let client = server.client_builder().build().await;
5127
5128 let room_id = room_id!("!a:b.c");
5129 let sender_id = user_id!("@alice:b.c");
5130 let f = EventFactory::new().room(room_id).sender(sender_id);
5131
5132 let target_event_id = owned_event_id!("$target");
5133 let eid1 = event_id!("$1");
5134 let eid2 = event_id!("$2");
5135 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5136 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5137
5138 server
5139 .mock_room_relations()
5140 .match_target_event(target_event_id.clone())
5141 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5142 .mock_once()
5143 .mount()
5144 .await;
5145
5146 server
5147 .mock_room_relations()
5148 .match_target_event(target_event_id.clone())
5149 .match_from("next_batch")
5150 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5151 .mock_once()
5152 .mount()
5153 .await;
5154
5155 let room = server.sync_joined_room(&client, room_id).await;
5156
5157 let mut opts = RelationsOptions {
5159 include_relations: IncludeRelations::AllRelations,
5160 ..Default::default()
5161 };
5162 let result = room
5163 .relations(target_event_id.clone(), opts.clone())
5164 .await
5165 .expect("Failed to list relations the first time");
5166 assert_eq!(result.chunk.len(), 1);
5167 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5168 assert!(result.prev_batch_token.is_none());
5169 assert!(result.next_batch_token.is_some());
5170 assert!(result.recursion_depth.is_none());
5171
5172 opts.from = result.next_batch_token;
5173 let result = room
5174 .relations(target_event_id, opts)
5175 .await
5176 .expect("Failed to list relations the second time");
5177 assert_eq!(result.chunk.len(), 1);
5178 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5179 assert!(result.prev_batch_token.is_none());
5180 assert!(result.next_batch_token.is_none());
5181 assert!(result.recursion_depth.is_none());
5182 }
5183
5184 #[async_test]
5185 async fn test_relations_with_reltype() {
5186 let server = MatrixMockServer::new().await;
5187 let client = server.client_builder().build().await;
5188
5189 let room_id = room_id!("!a:b.c");
5190 let sender_id = user_id!("@alice:b.c");
5191 let f = EventFactory::new().room(room_id).sender(sender_id);
5192
5193 let target_event_id = owned_event_id!("$target");
5194 let eid1 = event_id!("$1");
5195 let eid2 = event_id!("$2");
5196 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5197 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5198
5199 server
5200 .mock_room_relations()
5201 .match_target_event(target_event_id.clone())
5202 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5203 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5204 .mock_once()
5205 .mount()
5206 .await;
5207
5208 server
5209 .mock_room_relations()
5210 .match_target_event(target_event_id.clone())
5211 .match_from("next_batch")
5212 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5213 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5214 .mock_once()
5215 .mount()
5216 .await;
5217
5218 let room = server.sync_joined_room(&client, room_id).await;
5219
5220 let mut opts = RelationsOptions {
5222 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5223 ..Default::default()
5224 };
5225 let result = room
5226 .relations(target_event_id.clone(), opts.clone())
5227 .await
5228 .expect("Failed to list relations the first time");
5229 assert_eq!(result.chunk.len(), 1);
5230 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5231 assert!(result.prev_batch_token.is_none());
5232 assert!(result.next_batch_token.is_some());
5233 assert!(result.recursion_depth.is_none());
5234
5235 opts.from = result.next_batch_token;
5236 let result = room
5237 .relations(target_event_id, opts)
5238 .await
5239 .expect("Failed to list relations the second time");
5240 assert_eq!(result.chunk.len(), 1);
5241 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5242 assert!(result.prev_batch_token.is_none());
5243 assert!(result.next_batch_token.is_none());
5244 assert!(result.recursion_depth.is_none());
5245 }
5246
5247 #[async_test]
5248 async fn test_power_levels_computation() {
5249 let server = MatrixMockServer::new().await;
5250 let client = server.client_builder().build().await;
5251
5252 let room_id = room_id!("!a:b.c");
5253 let sender_id = client.user_id().expect("No session id");
5254 let f = EventFactory::new().room(room_id).sender(sender_id);
5255 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5256
5257 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5259 let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5260 let room_member_event = f.member(sender_id).into();
5261
5262 let room = server
5264 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5265 .await;
5266 let ctx = room
5267 .push_condition_room_ctx()
5268 .await
5269 .expect("Failed to get push condition context")
5270 .expect("Could not get push condition context");
5271
5272 assert!(ctx.power_levels.is_none());
5274
5275 let room = server
5277 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5278 .await;
5279 let ctx = room
5280 .push_condition_room_ctx()
5281 .await
5282 .expect("Failed to get push condition context")
5283 .expect("Could not get push condition context");
5284
5285 assert!(ctx.power_levels.is_none());
5287
5288 let room = server
5290 .sync_room(
5291 &client,
5292 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5293 )
5294 .await;
5295 let ctx = room
5296 .push_condition_room_ctx()
5297 .await
5298 .expect("Failed to get push condition context")
5299 .expect("Could not get push condition context");
5300
5301 assert!(ctx.power_levels.is_some());
5303 }
5304}