1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{
39 IdentityStatusChange, RoomIdentityProvider, UserIdentity, types::events::CryptoContextInfo,
40};
41pub use matrix_sdk_base::store::StoredThreadSubscription;
42use matrix_sdk_base::{
43 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
44 StateChanges, StateStoreDataKey, StateStoreDataValue,
45 deserialized_responses::{
46 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
47 },
48 media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
49 store::{StateStoreExt, ThreadSubscriptionStatus},
50};
51#[cfg(feature = "e2e-encryption")]
52use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
53#[cfg(feature = "e2e-encryption")]
54use matrix_sdk_common::BoxFuture;
55use matrix_sdk_common::{
56 deserialized_responses::TimelineEvent,
57 executor::{JoinHandle, spawn},
58 timeout::timeout,
59};
60#[cfg(feature = "experimental-search")]
61use matrix_sdk_search::error::IndexError;
62#[cfg(feature = "experimental-search")]
63#[cfg(doc)]
64use matrix_sdk_search::index::RoomIndex;
65use mime::Mime;
66use reply::Reply;
67#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
68use ruma::events::AnySyncMessageLikeEvent;
69#[cfg(feature = "experimental-encrypted-state-events")]
70use ruma::events::AnySyncStateEvent;
71#[cfg(feature = "unstable-msc4274")]
72use ruma::events::room::message::GalleryItemType;
73#[cfg(feature = "e2e-encryption")]
74use ruma::events::{
75 AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
76};
77use ruma::{
78 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
79 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
80 api::client::{
81 config::{set_global_account_data, set_room_account_data},
82 context,
83 error::ErrorKind,
84 filter::LazyLoadOptions,
85 membership::{
86 Invite3pid, ban_user, forget_room, get_member_events,
87 invite_user::{self, v3::InvitationRecipient},
88 kick_user, leave_room, unban_user,
89 },
90 message::send_message_event,
91 read_marker::set_read_marker,
92 receipt::create_receipt,
93 redact::redact_event,
94 room::{get_room_event, report_content, report_room},
95 state::{get_state_event_for_key, send_state_event},
96 tag::{create_tag, delete_tag},
97 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
98 typing::create_typing_event::{self, v3::Typing},
99 },
100 assign,
101 events::{
102 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
103 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
104 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
105 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
106 StaticStateEventContent, SyncStateEvent,
107 beacon::BeaconEventContent,
108 beacon_info::BeaconInfoEventContent,
109 direct::DirectEventContent,
110 marked_unread::MarkedUnreadEventContent,
111 receipt::{Receipt, ReceiptThread, ReceiptType},
112 room::{
113 ImageInfo, MediaSource, ThumbnailInfo,
114 avatar::{self, RoomAvatarEventContent},
115 encryption::RoomEncryptionEventContent,
116 history_visibility::HistoryVisibility,
117 member::{MembershipChange, SyncRoomMemberEvent},
118 message::{
119 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
120 ImageMessageEventContent, MessageType, RoomMessageEventContent,
121 TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
122 UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
123 },
124 name::RoomNameEventContent,
125 pinned_events::RoomPinnedEventsEventContent,
126 power_levels::{
127 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
128 },
129 server_acl::RoomServerAclEventContent,
130 topic::RoomTopicEventContent,
131 },
132 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
133 tag::{TagInfo, TagName},
134 typing::SyncTypingEvent,
135 },
136 int,
137 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
138 serde::Raw,
139 time::Instant,
140};
141#[cfg(feature = "experimental-encrypted-state-events")]
142use ruma::{
143 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
144 serde::JsonCastable,
145};
146use serde::de::DeserializeOwned;
147use thiserror::Error;
148use tokio::{join, sync::broadcast};
149use tracing::{debug, error, info, instrument, trace, warn};
150
151use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
152pub use self::{
153 member::{RoomMember, RoomMemberRole},
154 messages::{
155 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
156 Relations, RelationsOptions, ThreadRoots,
157 },
158};
159#[cfg(feature = "e2e-encryption")]
160use crate::encryption::backups::BackupState;
161#[cfg(doc)]
162use crate::event_cache::EventCache;
163#[cfg(feature = "experimental-encrypted-state-events")]
164use crate::room::futures::{SendRawStateEvent, SendStateEvent};
165use crate::{
166 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
167 attachment::{AttachmentConfig, AttachmentInfo},
168 client::WeakClient,
169 config::RequestConfig,
170 error::{BeaconError, WrongRoomState},
171 event_cache::{self, EventCacheDropHandles, RoomEventCache},
172 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
173 live_location_share::ObservableLiveLocation,
174 media::{MediaFormat, MediaRequestParameters},
175 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
176 room::{
177 knock_requests::{KnockRequest, KnockRequestMemberInfo},
178 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
179 privacy_settings::RoomPrivacySettings,
180 },
181 sync::RoomUpdate,
182 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
183};
184
185pub mod edit;
186pub mod futures;
187pub mod identity_status_changes;
188pub mod knock_requests;
190mod member;
191mod messages;
192pub mod power_levels;
193pub mod reply;
194
195pub mod calls;
196
197pub mod privacy_settings;
199
200#[cfg(feature = "e2e-encryption")]
201pub(crate) mod shared_room_history;
202
203#[derive(Debug, Clone)]
206pub struct Room {
207 inner: BaseRoom,
208 pub(crate) client: Client,
209}
210
211impl Deref for Room {
212 type Target = BaseRoom;
213
214 fn deref(&self) -> &Self::Target {
215 &self.inner
216 }
217}
218
219const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
220const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
221
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
224pub struct ThreadSubscription {
225 pub automatic: bool,
228}
229
230#[derive(Debug)]
232pub struct PushContext {
233 push_condition_room_ctx: PushConditionRoomCtx,
235
236 push_rules: Ruleset,
239}
240
241impl PushContext {
242 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
244 Self { push_condition_room_ctx, push_rules }
245 }
246
247 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
249 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
250 }
251
252 #[doc(hidden)]
255 #[instrument(skip_all)]
256 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
257 let rules = self
258 .push_rules
259 .iter()
260 .filter_map(|r| {
261 if !r.enabled() {
262 return None;
263 }
264
265 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
266
267 let conditions = match r {
268 AnyPushRuleRef::Override(r) => {
269 format!("{:?}", r.conditions)
270 }
271 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
272 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
273 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
274 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
275 _ => "<unknown push rule kind>".to_owned(),
276 };
277
278 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
279 })
280 .collect::<Vec<_>>()
281 .join("\n");
282 trace!("rules:\n\n{rules}\n\n");
283
284 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
285
286 if let Some(found) = found {
287 trace!("rule {} matched", found.rule_id());
288 found.actions().to_owned()
289 } else {
290 trace!("no match");
291 Vec::new()
292 }
293 }
294}
295
296macro_rules! make_media_type {
297 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
298 let (body, formatted, filename) = match $caption {
302 Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
303 None => ($filename, None, None),
304 };
305
306 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
307
308 match $content_type.type_() {
309 mime::IMAGE => {
310 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
311 mimetype: Some($content_type.as_ref().to_owned()),
312 thumbnail_source,
313 thumbnail_info
314 });
315 let content = assign!(ImageMessageEventContent::new(body, $source), {
316 info: Some(Box::new(info)),
317 formatted,
318 filename
319 });
320 <$t>::Image(content)
321 }
322
323 mime::AUDIO => {
324 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
325 formatted,
326 filename
327 });
328
329 if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
330 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
331 let waveform = waveform_vec
332 .iter()
333 .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
334 .map(Into::into)
335 .collect();
336 content.audio =
337 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
338 }
339
340 if matches!($info, Some(AttachmentInfo::Voice(_))) {
341 content.voice = Some(UnstableVoiceContentBlock::new());
342 }
343
344 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
345 audio_info.mimetype = Some($content_type.as_ref().to_owned());
346 let content = content.info(Box::new(audio_info));
347
348 <$t>::Audio(content)
349 }
350
351 mime::VIDEO => {
352 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
353 mimetype: Some($content_type.as_ref().to_owned()),
354 thumbnail_source,
355 thumbnail_info
356 });
357 let content = assign!(VideoMessageEventContent::new(body, $source), {
358 info: Some(Box::new(info)),
359 formatted,
360 filename
361 });
362 <$t>::Video(content)
363 }
364
365 _ => {
366 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
367 mimetype: Some($content_type.as_ref().to_owned()),
368 thumbnail_source,
369 thumbnail_info
370 });
371 let content = assign!(FileMessageEventContent::new(body, $source), {
372 info: Some(Box::new(info)),
373 formatted,
374 filename,
375 });
376 <$t>::File(content)
377 }
378 }
379 }};
380}
381
382impl Room {
383 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
390 Self { inner: room, client }
391 }
392
393 #[doc(alias = "reject_invitation")]
399 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
400 async fn leave_impl(&self) -> (Result<()>, &Room) {
401 let state = self.state();
402 if state == RoomState::Left {
403 return (
404 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
405 "Joined or Invited",
406 state,
407 )))),
408 self,
409 );
410 }
411
412 let should_forget = matches!(self.state(), RoomState::Invited);
415
416 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
417 let response = self.client.send(request).await;
418
419 if let Err(error) = response {
422 #[allow(clippy::collapsible_match)]
423 let ignore_error = if let Some(error) = error.client_api_error_kind() {
424 match error {
425 ErrorKind::Forbidden { .. } => true,
428 _ => false,
429 }
430 } else {
431 false
432 };
433
434 error!(?error, ignore_error, should_forget, "Failed to leave the room");
435
436 if !ignore_error {
437 return (Err(error.into()), self);
438 }
439 }
440
441 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
442 return (Err(e.into()), self);
443 }
444
445 if should_forget {
446 trace!("Trying to forget the room");
447
448 if let Err(error) = self.forget().await {
449 error!(?error, "Failed to forget the room");
450 }
451 }
452
453 (Ok(()), self)
454 }
455
456 pub async fn leave(&self) -> Result<()> {
464 let mut rooms: Vec<Room> = vec![self.clone()];
465 let mut current_room = self;
466
467 while let Some(predecessor) = current_room.predecessor_room() {
468 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
469
470 if let Some(predecessor_room) = maybe_predecessor_room {
471 rooms.push(predecessor_room.clone());
472 current_room = rooms.last().expect("Room just pushed so can't be empty");
473 } else {
474 warn!("Cannot find predecessor room");
475 break;
476 }
477 }
478
479 let batch_size = 5;
480
481 let rooms_futures: Vec<_> = rooms
482 .iter()
483 .filter_map(|room| match room.state() {
484 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
485 Some(room.leave_impl())
486 }
487 RoomState::Banned | RoomState::Left => None,
488 })
489 .collect();
490
491 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
492
493 let mut maybe_this_room_failed_with: Option<Error> = None;
494
495 while let Some(result) = futures_stream.next().await {
496 if let (Err(e), room) = result {
497 if room.room_id() == self.room_id() {
498 maybe_this_room_failed_with = Some(e);
499 } else {
500 warn!("Failure while attempting to leave predecessor room: {e:?}");
501 }
502 }
503 }
504
505 maybe_this_room_failed_with.map_or(Ok(()), Err)
506 }
507
508 #[doc(alias = "accept_invitation")]
512 pub async fn join(&self) -> Result<()> {
513 let prev_room_state = self.inner.state();
514
515 if prev_room_state == RoomState::Joined {
516 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
517 "Invited or Left",
518 prev_room_state,
519 ))));
520 }
521
522 self.client.join_room_by_id(self.room_id()).await?;
523
524 Ok(())
525 }
526
527 pub fn client(&self) -> Client {
531 self.client.clone()
532 }
533
534 pub fn is_synced(&self) -> bool {
537 self.inner.is_state_fully_synced()
538 }
539
540 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
570 let Some(url) = self.avatar_url() else { return Ok(None) };
571 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
572 Ok(Some(self.client.media().get_media_content(&request, true).await?))
573 }
574
575 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
604 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
605 let room_id = self.inner.room_id();
606 let request = options.into_request(room_id);
607 let http_response = self.client.send(request).await?;
608
609 let push_ctx = self.push_context().await?;
610 let chunk = join_all(
611 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
612 )
613 .await;
614
615 Ok(Messages {
616 start: http_response.start,
617 end: http_response.end,
618 chunk,
619 state: http_response.state,
620 })
621 }
622
623 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
633 where
634 Ev: SyncEvent + DeserializeOwned + Send + 'static,
635 H: EventHandler<Ev, Ctx>,
636 {
637 self.client.add_room_event_handler(self.room_id(), handler)
638 }
639
640 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
645 self.client.subscribe_to_room_updates(self.room_id())
646 }
647
648 pub fn subscribe_to_typing_notifications(
654 &self,
655 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
656 let (sender, receiver) = broadcast::channel(16);
657 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
658 let own_user_id = self.own_user_id().to_owned();
659 move |event: SyncTypingEvent| async move {
660 let typing_user_ids = event
662 .content
663 .user_ids
664 .into_iter()
665 .filter(|user_id| *user_id != own_user_id)
666 .collect();
667 let _ = sender.send(typing_user_ids);
669 }
670 });
671 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
672 (drop_guard, receiver)
673 }
674
675 #[cfg(feature = "e2e-encryption")]
698 pub async fn subscribe_to_identity_status_changes(
699 &self,
700 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
701 IdentityStatusChanges::create_stream(self.clone()).await
702 }
703
704 #[cfg(not(feature = "experimental-encrypted-state-events"))]
709 #[allow(clippy::unused_async)] async fn try_decrypt_event(
711 &self,
712 event: Raw<AnyTimelineEvent>,
713 push_ctx: Option<&PushContext>,
714 ) -> TimelineEvent {
715 #[cfg(feature = "e2e-encryption")]
716 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
717 SyncMessageLikeEvent::Original(_),
718 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
719 && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
720 {
721 return event;
722 }
723
724 let mut event = TimelineEvent::from_plaintext(event.cast());
725 if let Some(push_ctx) = push_ctx {
726 event.set_push_actions(push_ctx.for_event(event.raw()).await);
727 }
728
729 event
730 }
731
732 #[cfg(feature = "experimental-encrypted-state-events")]
737 #[allow(clippy::unused_async)] async fn try_decrypt_event(
739 &self,
740 event: Raw<AnyTimelineEvent>,
741 push_ctx: Option<&PushContext>,
742 ) -> TimelineEvent {
743 match event.deserialize_as::<AnySyncTimelineEvent>() {
745 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
746 SyncMessageLikeEvent::Original(_),
747 ))) => {
748 if let Ok(event) = self
749 .decrypt_event(
750 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
751 push_ctx,
752 )
753 .await
754 {
755 return event;
756 }
757 }
758 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
759 SyncStateEvent::Original(_),
760 ))) => {
761 if let Ok(event) = self
762 .decrypt_event(
763 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
764 push_ctx,
765 )
766 .await
767 {
768 return event;
769 }
770 }
771 _ => {}
772 }
773
774 let mut event = TimelineEvent::from_plaintext(event.cast());
775 if let Some(push_ctx) = push_ctx {
776 event.set_push_actions(push_ctx.for_event(event.raw()).await);
777 }
778
779 event
780 }
781
782 pub async fn event(
787 &self,
788 event_id: &EventId,
789 request_config: Option<RequestConfig>,
790 ) -> Result<TimelineEvent> {
791 let request =
792 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
793
794 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
795 let push_ctx = self.push_context().await?;
796 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
797
798 if let Ok((cache, _handles)) = self.event_cache().await {
800 cache.save_events([event.clone()]).await;
801 }
802
803 Ok(event)
804 }
805
806 pub async fn load_or_fetch_event(
813 &self,
814 event_id: &EventId,
815 request_config: Option<RequestConfig>,
816 ) -> Result<TimelineEvent> {
817 match self.event_cache().await {
818 Ok((event_cache, _drop_handles)) => {
819 if let Some(event) = event_cache.find_event(event_id).await {
820 return Ok(event);
821 }
822 }
824 Err(err) => {
825 debug!("error when getting the event cache: {err}");
826 }
827 }
828 self.event(event_id, request_config).await
829 }
830
831 pub async fn event_with_context(
834 &self,
835 event_id: &EventId,
836 lazy_load_members: bool,
837 context_size: UInt,
838 request_config: Option<RequestConfig>,
839 ) -> Result<EventWithContextResponse> {
840 let mut request =
841 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
842
843 request.limit = context_size;
844
845 if lazy_load_members {
846 request.filter.lazy_load_options =
847 LazyLoadOptions::Enabled { include_redundant_members: false };
848 }
849
850 let response = self.client.send(request).with_request_config(request_config).await?;
851
852 let push_ctx = self.push_context().await?;
853 let push_ctx = push_ctx.as_ref();
854 let target_event = if let Some(event) = response.event {
855 Some(self.try_decrypt_event(event, push_ctx).await)
856 } else {
857 None
858 };
859
860 let (events_before, events_after) = join!(
864 join_all(
865 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
866 ),
867 join_all(
868 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
869 ),
870 );
871
872 if let Ok((cache, _handles)) = self.event_cache().await {
874 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
875 if let Some(event) = &target_event {
876 events_to_save.push(event.clone());
877 }
878
879 for event in &events_before {
880 events_to_save.push(event.clone());
881 }
882
883 for event in &events_after {
884 events_to_save.push(event.clone());
885 }
886
887 cache.save_events(events_to_save).await;
888 }
889
890 Ok(EventWithContextResponse {
891 event: target_event,
892 events_before,
893 events_after,
894 state: response.state,
895 prev_batch_token: response.start,
896 next_batch_token: response.end,
897 })
898 }
899
900 pub(crate) async fn request_members(&self) -> Result<()> {
901 self.client
902 .locks()
903 .members_request_deduplicated_handler
904 .run(self.room_id().to_owned(), async move {
905 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
906 let response = self
907 .client
908 .send(request.clone())
909 .with_request_config(
910 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
913 )
914 .await?;
915
916 Box::pin(self.client.base_client().receive_all_members(
918 self.room_id(),
919 &request,
920 &response,
921 ))
922 .await?;
923
924 Ok(())
925 })
926 .await
927 }
928
929 pub async fn request_encryption_state(&self) -> Result<()> {
934 if !self.inner.encryption_state().is_unknown() {
935 return Ok(());
936 }
937
938 self.client
939 .locks()
940 .encryption_state_deduplicated_handler
941 .run(self.room_id().to_owned(), async move {
942 let request = get_state_event_for_key::v3::Request::new(
944 self.room_id().to_owned(),
945 StateEventType::RoomEncryption,
946 "".to_owned(),
947 );
948 let response = match self.client.send(request).await {
949 Ok(response) => Some(
950 response
951 .into_content()
952 .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
953 ),
954 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
955 Err(err) => return Err(err.into()),
956 };
957
958 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
959
960 let mut room_info = self.clone_info();
963 room_info.mark_encryption_state_synced();
964 room_info.set_encryption_event(response.clone());
965 let mut changes = StateChanges::default();
966 changes.add_room(room_info.clone());
967
968 self.client.state_store().save_changes(&changes).await?;
969 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
970
971 Ok(())
972 })
973 .await
974 }
975
976 pub fn encryption_state(&self) -> EncryptionState {
981 self.inner.encryption_state()
982 }
983
984 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
990 self.request_encryption_state().await?;
991
992 Ok(self.encryption_state())
993 }
994
995 #[cfg(feature = "e2e-encryption")]
997 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
998 let encryption = self.client.encryption();
999
1000 let this_device_is_verified = match encryption.get_own_device().await {
1001 Ok(Some(device)) => device.is_verified_with_cross_signing(),
1002
1003 _ => true,
1005 };
1006
1007 let backup_exists_on_server =
1008 encryption.backups().exists_on_server().await.unwrap_or(false);
1009
1010 CryptoContextInfo {
1011 device_creation_ts: encryption.device_creation_timestamp().await,
1012 this_device_is_verified,
1013 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1014 backup_exists_on_server,
1015 }
1016 }
1017
1018 fn are_events_visible(&self) -> bool {
1019 if let RoomState::Invited = self.inner.state() {
1020 return matches!(
1021 self.inner.history_visibility_or_default(),
1022 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1023 );
1024 }
1025
1026 true
1027 }
1028
1029 pub async fn sync_members(&self) -> Result<()> {
1035 if !self.are_events_visible() {
1036 return Ok(());
1037 }
1038
1039 if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1040 }
1041
1042 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1056 self.sync_members().await?;
1057 self.get_member_no_sync(user_id).await
1058 }
1059
1060 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1074 Ok(self
1075 .inner
1076 .get_member(user_id)
1077 .await?
1078 .map(|member| RoomMember::new(self.client.clone(), member)))
1079 }
1080
1081 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1090 self.sync_members().await?;
1091 self.members_no_sync(memberships).await
1092 }
1093
1094 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1103 Ok(self
1104 .inner
1105 .members(memberships)
1106 .await?
1107 .into_iter()
1108 .map(|member| RoomMember::new(self.client.clone(), member))
1109 .collect())
1110 }
1111
1112 pub async fn get_state_events(
1114 &self,
1115 event_type: StateEventType,
1116 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1117 self.client
1118 .state_store()
1119 .get_state_events(self.room_id(), event_type)
1120 .await
1121 .map_err(Into::into)
1122 }
1123
1124 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1141 where
1142 C: StaticEventContent<IsPrefix = ruma::events::False>
1143 + StaticStateEventContent
1144 + RedactContent,
1145 C::Redacted: RedactedStateEventContent,
1146 {
1147 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1148 }
1149
1150 pub async fn get_state_events_for_keys(
1153 &self,
1154 event_type: StateEventType,
1155 state_keys: &[&str],
1156 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1157 self.client
1158 .state_store()
1159 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1160 .await
1161 .map_err(Into::into)
1162 }
1163
1164 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1184 &self,
1185 state_keys: I,
1186 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1187 where
1188 C: StaticEventContent<IsPrefix = ruma::events::False>
1189 + StaticStateEventContent
1190 + RedactContent,
1191 C::StateKey: Borrow<K>,
1192 C::Redacted: RedactedStateEventContent,
1193 K: AsRef<str> + Sized + Sync + 'a,
1194 I: IntoIterator<Item = &'a K> + Send,
1195 I::IntoIter: Send,
1196 {
1197 Ok(self
1198 .client
1199 .state_store()
1200 .get_state_events_for_keys_static(self.room_id(), state_keys)
1201 .await?)
1202 }
1203
1204 pub async fn get_state_event(
1206 &self,
1207 event_type: StateEventType,
1208 state_key: &str,
1209 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1210 self.client
1211 .state_store()
1212 .get_state_event(self.room_id(), event_type, state_key)
1213 .await
1214 .map_err(Into::into)
1215 }
1216
1217 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1236 where
1237 C: StaticEventContent<IsPrefix = ruma::events::False>
1238 + StaticStateEventContent<StateKey = EmptyStateKey>
1239 + RedactContent,
1240 C::Redacted: RedactedStateEventContent,
1241 {
1242 self.get_state_event_static_for_key(&EmptyStateKey).await
1243 }
1244
1245 pub async fn get_state_event_static_for_key<C, K>(
1265 &self,
1266 state_key: &K,
1267 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1268 where
1269 C: StaticEventContent<IsPrefix = ruma::events::False>
1270 + StaticStateEventContent
1271 + RedactContent,
1272 C::StateKey: Borrow<K>,
1273 C::Redacted: RedactedStateEventContent,
1274 K: AsRef<str> + ?Sized + Sync,
1275 {
1276 Ok(self
1277 .client
1278 .state_store()
1279 .get_state_event_static_for_key(self.room_id(), state_key)
1280 .await?)
1281 }
1282
1283 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1287 Ok(self
1292 .get_state_events_static::<SpaceParentEventContent>()
1293 .await?
1294 .into_iter()
1295 .filter_map(|parent_event| match parent_event.deserialize() {
1297 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1298 Some((e.state_key.to_owned(), e.sender))
1299 }
1300 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1301 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1302 Err(e) => {
1303 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1304 None
1305 }
1306 })
1307 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1309 let Some(parent_room) = self.client.get_room(&state_key) else {
1310 return Ok(ParentSpace::Unverifiable(state_key));
1313 };
1314 if let Some(child_event) = parent_room
1317 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1318 .await?
1319 {
1320 match child_event.deserialize() {
1321 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1322 return Ok(ParentSpace::Reciprocal(parent_room));
1325 }
1326 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1327 Ok(SyncOrStrippedState::Stripped(_)) => {}
1328 Err(e) => {
1329 info!(
1330 room_id = ?self.room_id(), parent_room_id = ?state_key,
1331 "Could not deserialize m.space.child: {e}"
1332 );
1333 }
1334 }
1335 }
1340
1341 let Some(member) = parent_room.get_member(&sender).await? else {
1344 return Ok(ParentSpace::Illegitimate(parent_room));
1346 };
1347
1348 if member.can_send_state(StateEventType::SpaceChild) {
1349 Ok(ParentSpace::WithPowerlevel(parent_room))
1351 } else {
1352 Ok(ParentSpace::Illegitimate(parent_room))
1353 }
1354 })
1355 .collect::<FuturesUnordered<_>>())
1356 }
1357
1358 pub async fn account_data(
1360 &self,
1361 data_type: RoomAccountDataEventType,
1362 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1363 self.client
1364 .state_store()
1365 .get_room_account_data_event(self.room_id(), data_type)
1366 .await
1367 .map_err(Into::into)
1368 }
1369
1370 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1389 where
1390 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1391 {
1392 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1393 }
1394
1395 #[cfg(feature = "e2e-encryption")]
1400 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1401 let user_ids = self
1402 .client
1403 .state_store()
1404 .get_user_ids(self.room_id(), RoomMemberships::empty())
1405 .await?;
1406
1407 for user_id in user_ids {
1408 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1409 let any_unverified = devices.devices().any(|d| !d.is_verified());
1410
1411 if any_unverified {
1412 return Ok(false);
1413 }
1414 }
1415
1416 Ok(true)
1417 }
1418
1419 pub async fn set_account_data<T>(
1434 &self,
1435 content: T,
1436 ) -> Result<set_room_account_data::v3::Response>
1437 where
1438 T: RoomAccountDataEventContent,
1439 {
1440 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1441
1442 let request = set_room_account_data::v3::Request::new(
1443 own_user.to_owned(),
1444 self.room_id().to_owned(),
1445 &content,
1446 )?;
1447
1448 Ok(self.client.send(request).await?)
1449 }
1450
1451 pub async fn set_account_data_raw(
1476 &self,
1477 event_type: RoomAccountDataEventType,
1478 content: Raw<AnyRoomAccountDataEventContent>,
1479 ) -> Result<set_room_account_data::v3::Response> {
1480 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1481
1482 let request = set_room_account_data::v3::Request::new_raw(
1483 own_user.to_owned(),
1484 self.room_id().to_owned(),
1485 event_type,
1486 content,
1487 );
1488
1489 Ok(self.client.send(request).await?)
1490 }
1491
1492 pub async fn set_tag(
1523 &self,
1524 tag: TagName,
1525 tag_info: TagInfo,
1526 ) -> Result<create_tag::v3::Response> {
1527 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1528 let request = create_tag::v3::Request::new(
1529 user_id.to_owned(),
1530 self.inner.room_id().to_owned(),
1531 tag.to_string(),
1532 tag_info,
1533 );
1534 Ok(self.client.send(request).await?)
1535 }
1536
1537 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1544 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1545 let request = delete_tag::v3::Request::new(
1546 user_id.to_owned(),
1547 self.inner.room_id().to_owned(),
1548 tag.to_string(),
1549 );
1550 Ok(self.client.send(request).await?)
1551 }
1552
1553 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1563 if is_favourite {
1564 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1565
1566 self.set_tag(TagName::Favorite, tag_info).await?;
1567
1568 if self.is_low_priority() {
1569 self.remove_tag(TagName::LowPriority).await?;
1570 }
1571 } else {
1572 self.remove_tag(TagName::Favorite).await?;
1573 }
1574 Ok(())
1575 }
1576
1577 pub async fn set_is_low_priority(
1587 &self,
1588 is_low_priority: bool,
1589 tag_order: Option<f64>,
1590 ) -> Result<()> {
1591 if is_low_priority {
1592 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1593
1594 self.set_tag(TagName::LowPriority, tag_info).await?;
1595
1596 if self.is_favourite() {
1597 self.remove_tag(TagName::Favorite).await?;
1598 }
1599 } else {
1600 self.remove_tag(TagName::LowPriority).await?;
1601 }
1602 Ok(())
1603 }
1604
1605 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1614 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1615
1616 let mut content = self
1617 .client
1618 .account()
1619 .account_data::<DirectEventContent>()
1620 .await?
1621 .map(|c| c.deserialize())
1622 .transpose()?
1623 .unwrap_or_default();
1624
1625 let this_room_id = self.inner.room_id();
1626
1627 if is_direct {
1628 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1629 room_members.retain(|member| member.user_id() != self.own_user_id());
1630
1631 for member in room_members {
1632 let entry = content.entry(member.user_id().into()).or_default();
1633 if !entry.iter().any(|room_id| room_id == this_room_id) {
1634 entry.push(this_room_id.to_owned());
1635 }
1636 }
1637 } else {
1638 for (_, list) in content.iter_mut() {
1639 list.retain(|room_id| *room_id != this_room_id);
1640 }
1641
1642 content.retain(|_, list| !list.is_empty());
1644 }
1645
1646 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1647
1648 self.client.send(request).await?;
1649 Ok(())
1650 }
1651
1652 #[cfg(feature = "e2e-encryption")]
1660 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1661 pub async fn decrypt_event(
1662 &self,
1663 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1664 push_ctx: Option<&PushContext>,
1665 ) -> Result<TimelineEvent> {
1666 let machine = self.client.olm_machine().await;
1667 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1668
1669 match machine
1670 .try_decrypt_room_event(
1671 event.cast_ref(),
1672 self.inner.room_id(),
1673 self.client.decryption_settings(),
1674 )
1675 .await?
1676 {
1677 RoomEventDecryptionResult::Decrypted(decrypted) => {
1678 let push_actions = if let Some(push_ctx) = push_ctx {
1679 Some(push_ctx.for_event(&decrypted.event).await)
1680 } else {
1681 None
1682 };
1683 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1684 }
1685 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1686 self.client
1687 .encryption()
1688 .backups()
1689 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1690 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1691 }
1692 }
1693 }
1694
1695 #[cfg(feature = "experimental-encrypted-state-events")]
1703 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1704 &self,
1705 event: &Raw<T>,
1706 push_ctx: Option<&PushContext>,
1707 ) -> Result<TimelineEvent> {
1708 let machine = self.client.olm_machine().await;
1709 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1710
1711 match machine
1712 .try_decrypt_room_event(
1713 event.cast_ref(),
1714 self.inner.room_id(),
1715 self.client.decryption_settings(),
1716 )
1717 .await?
1718 {
1719 RoomEventDecryptionResult::Decrypted(decrypted) => {
1720 let push_actions = if let Some(push_ctx) = push_ctx {
1721 Some(push_ctx.for_event(&decrypted.event).await)
1722 } else {
1723 None
1724 };
1725 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1726 }
1727 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1728 self.client
1729 .encryption()
1730 .backups()
1731 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1732 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1735 }
1736 }
1737 }
1738
1739 #[cfg(feature = "e2e-encryption")]
1752 pub async fn get_encryption_info(
1753 &self,
1754 session_id: &str,
1755 sender: &UserId,
1756 ) -> Option<Arc<EncryptionInfo>> {
1757 let machine = self.client.olm_machine().await;
1758 let machine = machine.as_ref()?;
1759 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1760 }
1761
1762 #[cfg(feature = "e2e-encryption")]
1775 pub async fn discard_room_key(&self) -> Result<()> {
1776 let machine = self.client.olm_machine().await;
1777 if let Some(machine) = machine.as_ref() {
1778 machine.discard_room_key(self.inner.room_id()).await?;
1779 Ok(())
1780 } else {
1781 Err(Error::NoOlmMachine)
1782 }
1783 }
1784
1785 #[instrument(skip_all)]
1793 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1794 let request = assign!(
1795 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1796 { reason: reason.map(ToOwned::to_owned) }
1797 );
1798 self.client.send(request).await?;
1799 Ok(())
1800 }
1801
1802 #[instrument(skip_all)]
1810 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1811 let request = assign!(
1812 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1813 { reason: reason.map(ToOwned::to_owned) }
1814 );
1815 self.client.send(request).await?;
1816 Ok(())
1817 }
1818
1819 #[instrument(skip_all)]
1828 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1829 let request = assign!(
1830 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1831 { reason: reason.map(ToOwned::to_owned) }
1832 );
1833 self.client.send(request).await?;
1834 Ok(())
1835 }
1836
1837 #[instrument(skip_all)]
1843 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1844 #[cfg(feature = "e2e-encryption")]
1845 if self.client.inner.enable_share_history_on_invite {
1846 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1847 }
1848
1849 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1850 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1851 self.client.send(request).await?;
1852
1853 self.mark_members_missing();
1857
1858 Ok(())
1859 }
1860
1861 #[instrument(skip_all)]
1867 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1868 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1869 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1870 self.client.send(request).await?;
1871
1872 self.mark_members_missing();
1876
1877 Ok(())
1878 }
1879
1880 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1915 self.ensure_room_joined()?;
1916
1917 let send = if let Some(typing_time) =
1920 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1921 {
1922 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1923 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1927 } else {
1928 !typing
1930 }
1931 } else {
1932 typing
1935 };
1936
1937 if send {
1938 self.send_typing_notice(typing).await?;
1939 }
1940
1941 Ok(())
1942 }
1943
1944 #[instrument(name = "typing_notice", skip(self))]
1945 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1946 let typing = if typing {
1947 self.client
1948 .inner
1949 .typing_notice_times
1950 .write()
1951 .unwrap()
1952 .insert(self.room_id().to_owned(), Instant::now());
1953 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1954 } else {
1955 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1956 Typing::No
1957 };
1958
1959 let request = create_typing_event::v3::Request::new(
1960 self.own_user_id().to_owned(),
1961 self.room_id().to_owned(),
1962 typing,
1963 );
1964
1965 self.client.send(request).await?;
1966
1967 Ok(())
1968 }
1969
1970 #[instrument(skip_all)]
1987 pub async fn send_single_receipt(
1988 &self,
1989 receipt_type: create_receipt::v3::ReceiptType,
1990 thread: ReceiptThread,
1991 event_id: OwnedEventId,
1992 ) -> Result<()> {
1993 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1996
1997 self.client
1998 .inner
1999 .locks
2000 .read_receipt_deduplicated_handler
2001 .run((request_key, event_id.clone()), async {
2002 let is_unthreaded = thread == ReceiptThread::Unthreaded;
2004
2005 let mut request = create_receipt::v3::Request::new(
2006 self.room_id().to_owned(),
2007 receipt_type,
2008 event_id,
2009 );
2010 request.thread = thread;
2011
2012 self.client.send(request).await?;
2013
2014 if is_unthreaded {
2015 self.set_unread_flag(false).await?;
2016 }
2017
2018 Ok(())
2019 })
2020 .await
2021 }
2022
2023 #[instrument(skip_all)]
2033 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2034 if receipts.is_empty() {
2035 return Ok(());
2036 }
2037
2038 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2039 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2040 fully_read,
2041 read_receipt: public_read_receipt,
2042 private_read_receipt,
2043 });
2044
2045 self.client.send(request).await?;
2046
2047 self.set_unread_flag(false).await?;
2048
2049 Ok(())
2050 }
2051
2052 #[allow(unused_variables, unused_mut)]
2056 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2057 use ruma::{
2058 EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2059 };
2060 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2061
2062 if !self.latest_encryption_state().await?.is_encrypted() {
2063 let mut content =
2064 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2065 #[cfg(feature = "experimental-encrypted-state-events")]
2066 if encrypted_state_events {
2067 content = content.with_encrypted_state();
2068 }
2069 self.send_state_event(content).await?;
2070
2071 let res = timeout(
2078 async {
2079 loop {
2080 self.client.inner.sync_beat.listen().await;
2082 let _state_store_lock =
2083 self.client.base_client().state_store_lock().lock().await;
2084
2085 if !self.inner.encryption_state().is_unknown() {
2086 break;
2087 }
2088 }
2089 },
2090 SYNC_WAIT_TIME,
2091 )
2092 .await;
2093
2094 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2095
2096 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2098 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2099 debug!("room successfully marked as encrypted");
2100 return Ok(());
2101 }
2102
2103 #[cfg(feature = "experimental-encrypted-state-events")]
2105 if res.is_ok() && {
2106 if encrypted_state_events {
2107 self.inner.encryption_state().is_state_encrypted()
2108 } else {
2109 self.inner.encryption_state().is_encrypted()
2110 }
2111 } {
2112 debug!("room successfully marked as encrypted");
2113 return Ok(());
2114 }
2115
2116 debug!("still not marked as encrypted, marking encryption state as missing");
2121
2122 let mut room_info = self.clone_info();
2123 room_info.mark_encryption_state_missing();
2124 let mut changes = StateChanges::default();
2125 changes.add_room(room_info.clone());
2126
2127 self.client.state_store().save_changes(&changes).await?;
2128 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2129 }
2130
2131 Ok(())
2132 }
2133
2134 #[instrument(skip_all)]
2166 pub async fn enable_encryption(&self) -> Result<()> {
2167 self.enable_encryption_inner(false).await
2168 }
2169
2170 #[instrument(skip_all)]
2203 #[cfg(feature = "experimental-encrypted-state-events")]
2204 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2205 self.enable_encryption_inner(true).await
2206 }
2207
2208 #[cfg(feature = "e2e-encryption")]
2217 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2218 async fn preshare_room_key(&self) -> Result<()> {
2219 self.ensure_room_joined()?;
2220
2221 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2223 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2224
2225 self.client
2226 .locks()
2227 .group_session_deduplicated_handler
2228 .run(self.room_id().to_owned(), async move {
2229 {
2230 let members = self
2231 .client
2232 .state_store()
2233 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2234 .await?;
2235 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2236 };
2237
2238 let response = self.share_room_key().await;
2239
2240 if let Err(r) = response {
2244 let machine = self.client.olm_machine().await;
2245 if let Some(machine) = machine.as_ref() {
2246 machine.discard_room_key(self.room_id()).await?;
2247 }
2248 return Err(r);
2249 }
2250
2251 Ok(())
2252 })
2253 .await
2254 }
2255
2256 #[cfg(feature = "e2e-encryption")]
2262 #[instrument(skip_all)]
2263 async fn share_room_key(&self) -> Result<()> {
2264 self.ensure_room_joined()?;
2265
2266 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2267
2268 for request in requests {
2269 let response = self.client.send_to_device(&request).await?;
2270 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2271 }
2272
2273 Ok(())
2274 }
2275
2276 #[instrument(skip_all)]
2285 pub async fn sync_up(&self) {
2286 while !self.is_synced() && self.state() == RoomState::Joined {
2287 let wait_for_beat = self.client.inner.sync_beat.listen();
2288 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2290 }
2291 }
2292
2293 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2364 SendMessageLikeEvent::new(self, content)
2365 }
2366
2367 #[cfg(feature = "e2e-encryption")]
2370 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2371 let olm = self.client.olm_machine().await;
2372 let olm = olm.as_ref().expect("Olm machine wasn't started");
2373
2374 let members =
2375 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2376
2377 let tracked: HashMap<_, _> = olm
2378 .store()
2379 .load_tracked_users()
2380 .await?
2381 .into_iter()
2382 .map(|tracked| (tracked.user_id, tracked.dirty))
2383 .collect();
2384
2385 let members_with_unknown_devices =
2388 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2389
2390 let (req_id, request) =
2391 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2392
2393 if !request.device_keys.is_empty() {
2394 self.client.keys_query(&req_id, request.device_keys).await?;
2395 }
2396
2397 Ok(())
2398 }
2399
2400 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2444 pub fn send_raw<'a>(
2445 &'a self,
2446 event_type: &'a str,
2447 content: impl IntoRawMessageLikeEventContent,
2448 ) -> SendRawMessageLikeEvent<'a> {
2449 SendRawMessageLikeEvent::new(self, event_type, content)
2452 }
2453
2454 #[instrument(skip_all)]
2502 pub fn send_attachment<'a>(
2503 &'a self,
2504 filename: impl Into<String>,
2505 content_type: &'a Mime,
2506 data: Vec<u8>,
2507 config: AttachmentConfig,
2508 ) -> SendAttachment<'a> {
2509 SendAttachment::new(self, filename.into(), content_type, data, config)
2510 }
2511
2512 #[instrument(skip_all)]
2540 pub(super) async fn prepare_and_send_attachment<'a>(
2541 &'a self,
2542 filename: String,
2543 content_type: &'a Mime,
2544 data: Vec<u8>,
2545 mut config: AttachmentConfig,
2546 send_progress: SharedObservable<TransmissionProgress>,
2547 store_in_cache: bool,
2548 ) -> Result<send_message_event::v3::Response> {
2549 self.ensure_room_joined()?;
2550
2551 let txn_id = config.txn_id.take();
2552 let mentions = config.mentions.take();
2553
2554 let thumbnail = config.thumbnail.take();
2555
2556 let thumbnail_cache_info = if store_in_cache {
2558 thumbnail
2559 .as_ref()
2560 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2561 } else {
2562 None
2563 };
2564
2565 #[cfg(feature = "e2e-encryption")]
2566 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2567 self.client
2568 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2569 .await?
2570 } else {
2571 self.client
2572 .media()
2573 .upload_plain_media_and_thumbnail(
2574 content_type,
2575 data.clone(),
2578 thumbnail,
2579 send_progress,
2580 )
2581 .await?
2582 };
2583
2584 #[cfg(not(feature = "e2e-encryption"))]
2585 let (media_source, thumbnail) = self
2586 .client
2587 .media()
2588 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2589 .await?;
2590
2591 if store_in_cache {
2592 let media_store_lock_guard = self.client.media_store().lock().await?;
2593
2594 debug!("caching the media");
2598 let request =
2599 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2600
2601 if let Err(err) = media_store_lock_guard
2602 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2603 .await
2604 {
2605 warn!("unable to cache the media after uploading it: {err}");
2606 }
2607
2608 if let Some(((data, height, width), source)) =
2609 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2610 {
2611 debug!("caching the thumbnail");
2612
2613 let request = MediaRequestParameters {
2614 source: source.clone(),
2615 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2616 };
2617
2618 if let Err(err) = media_store_lock_guard
2619 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2620 .await
2621 {
2622 warn!("unable to cache the media after uploading it: {err}");
2623 }
2624 }
2625 }
2626
2627 let content = self
2628 .make_media_event(
2629 Room::make_attachment_type(
2630 content_type,
2631 filename,
2632 media_source,
2633 config.caption,
2634 config.info,
2635 thumbnail,
2636 ),
2637 mentions,
2638 config.reply,
2639 )
2640 .await?;
2641
2642 let mut fut = self.send(content);
2643 if let Some(txn_id) = txn_id {
2644 fut = fut.with_transaction_id(txn_id);
2645 }
2646 fut.await
2647 }
2648
2649 #[allow(clippy::too_many_arguments)]
2652 pub(crate) fn make_attachment_type(
2653 content_type: &Mime,
2654 filename: String,
2655 source: MediaSource,
2656 caption: Option<TextMessageEventContent>,
2657 info: Option<AttachmentInfo>,
2658 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2659 ) -> MessageType {
2660 make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2661 }
2662
2663 pub(crate) async fn make_media_event(
2666 &self,
2667 msg_type: MessageType,
2668 mentions: Option<Mentions>,
2669 reply: Option<Reply>,
2670 ) -> Result<RoomMessageEventContent> {
2671 let mut content = RoomMessageEventContent::new(msg_type);
2672 if let Some(mentions) = mentions {
2673 content = content.add_mentions(mentions);
2674 }
2675 if let Some(reply) = reply {
2676 content = self.make_reply_event(content.into(), reply).await?;
2679 }
2680 Ok(content)
2681 }
2682
2683 #[cfg(feature = "unstable-msc4274")]
2686 #[allow(clippy::too_many_arguments)]
2687 pub(crate) fn make_gallery_item_type(
2688 content_type: &Mime,
2689 filename: String,
2690 source: MediaSource,
2691 caption: Option<TextMessageEventContent>,
2692 info: Option<AttachmentInfo>,
2693 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2694 ) -> GalleryItemType {
2695 make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2696 }
2697
2698 pub async fn update_power_levels(
2707 &self,
2708 updates: Vec<(&UserId, Int)>,
2709 ) -> Result<send_state_event::v3::Response> {
2710 let mut power_levels = self.power_levels().await?;
2711
2712 for (user_id, new_level) in updates {
2713 if new_level == power_levels.users_default {
2714 power_levels.users.remove(user_id);
2715 } else {
2716 power_levels.users.insert(user_id.to_owned(), new_level);
2717 }
2718 }
2719
2720 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2721 }
2722
2723 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2728 let mut power_levels = self.power_levels().await?;
2729 power_levels.apply(changes)?;
2730 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2731 Ok(())
2732 }
2733
2734 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2738 let creators = self.creators().unwrap_or_default();
2739 let rules = self.clone_info().room_version_rules_or_default();
2740
2741 let default_power_levels =
2742 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2743 let changes = RoomPowerLevelChanges::from(default_power_levels);
2744 self.apply_power_level_changes(changes).await?;
2745 Ok(self.power_levels().await?)
2746 }
2747
2748 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2753 let power_level = self.get_user_power_level(user_id).await?;
2754 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2755 }
2756
2757 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2762 let event = self.power_levels().await?;
2763 Ok(event.for_user(user_id))
2764 }
2765
2766 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2769 let power_levels = self.power_levels().await.ok();
2770 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2771 if let Some(power_levels) = power_levels {
2772 for (id, level) in power_levels.users.into_iter() {
2773 user_power_levels.insert(id, level.into());
2774 }
2775 }
2776 user_power_levels
2777 }
2778
2779 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2781 self.send_state_event(RoomNameEventContent::new(name)).await
2782 }
2783
2784 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2786 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2787 }
2788
2789 pub async fn set_avatar_url(
2795 &self,
2796 url: &MxcUri,
2797 info: Option<avatar::ImageInfo>,
2798 ) -> Result<send_state_event::v3::Response> {
2799 self.ensure_room_joined()?;
2800
2801 let mut room_avatar_event = RoomAvatarEventContent::new();
2802 room_avatar_event.url = Some(url.to_owned());
2803 room_avatar_event.info = info.map(Box::new);
2804
2805 self.send_state_event(room_avatar_event).await
2806 }
2807
2808 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2810 self.send_state_event(RoomAvatarEventContent::new()).await
2811 }
2812
2813 pub async fn upload_avatar(
2821 &self,
2822 mime: &Mime,
2823 data: Vec<u8>,
2824 info: Option<avatar::ImageInfo>,
2825 ) -> Result<send_state_event::v3::Response> {
2826 self.ensure_room_joined()?;
2827
2828 let upload_response = self.client.media().upload(mime, data, None).await?;
2829 let mut info = info.unwrap_or_default();
2830 info.blurhash = upload_response.blurhash;
2831 info.mimetype = Some(mime.to_string());
2832
2833 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2834 }
2835
2836 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2880 #[instrument(skip_all)]
2881 pub async fn send_state_event(
2882 &self,
2883 content: impl StateEventContent<StateKey = EmptyStateKey>,
2884 ) -> Result<send_state_event::v3::Response> {
2885 self.send_state_event_for_key(&EmptyStateKey, content).await
2886 }
2887
2888 #[cfg(feature = "experimental-encrypted-state-events")]
2939 #[instrument(skip_all)]
2940 pub fn send_state_event<'a>(
2941 &'a self,
2942 content: impl StateEventContent<StateKey = EmptyStateKey>,
2943 ) -> SendStateEvent<'a> {
2944 self.send_state_event_for_key(&EmptyStateKey, content)
2945 }
2946
2947 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2988 pub async fn send_state_event_for_key<C, K>(
2989 &self,
2990 state_key: &K,
2991 content: C,
2992 ) -> Result<send_state_event::v3::Response>
2993 where
2994 C: StateEventContent,
2995 C::StateKey: Borrow<K>,
2996 K: AsRef<str> + ?Sized,
2997 {
2998 self.ensure_room_joined()?;
2999 let request =
3000 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3001 let response = self.client.send(request).await?;
3002 Ok(response)
3003 }
3004
3005 #[cfg(feature = "experimental-encrypted-state-events")]
3054 pub fn send_state_event_for_key<'a, C, K>(
3055 &'a self,
3056 state_key: &K,
3057 content: C,
3058 ) -> SendStateEvent<'a>
3059 where
3060 C: StateEventContent,
3061 C::StateKey: Borrow<K>,
3062 K: AsRef<str> + ?Sized,
3063 {
3064 SendStateEvent::new(self, state_key, content)
3065 }
3066
3067 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3102 #[instrument(skip_all)]
3103 pub async fn send_state_event_raw(
3104 &self,
3105 event_type: &str,
3106 state_key: &str,
3107 content: impl IntoRawStateEventContent,
3108 ) -> Result<send_state_event::v3::Response> {
3109 self.ensure_room_joined()?;
3110
3111 let request = send_state_event::v3::Request::new_raw(
3112 self.room_id().to_owned(),
3113 event_type.into(),
3114 state_key.to_owned(),
3115 content.into_raw_state_event_content(),
3116 );
3117
3118 Ok(self.client.send(request).await?)
3119 }
3120
3121 #[cfg(feature = "experimental-encrypted-state-events")]
3163 #[instrument(skip_all)]
3164 pub fn send_state_event_raw<'a>(
3165 &'a self,
3166 event_type: &'a str,
3167 state_key: &'a str,
3168 content: impl IntoRawStateEventContent,
3169 ) -> SendRawStateEvent<'a> {
3170 SendRawStateEvent::new(self, event_type, state_key, content)
3171 }
3172
3173 #[instrument(skip_all)]
3208 pub async fn redact(
3209 &self,
3210 event_id: &EventId,
3211 reason: Option<&str>,
3212 txn_id: Option<OwnedTransactionId>,
3213 ) -> HttpResult<redact_event::v3::Response> {
3214 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3215 let request = assign!(
3216 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3217 { reason: reason.map(ToOwned::to_owned) }
3218 );
3219
3220 self.client.send(request).await
3221 }
3222
3223 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3232 let acl_ev = self
3233 .get_state_event_static::<RoomServerAclEventContent>()
3234 .await?
3235 .and_then(|ev| ev.deserialize().ok());
3236 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3237 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3238 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3239 });
3240
3241 let members: Vec<_> = self
3245 .members_no_sync(RoomMemberships::JOIN)
3246 .await?
3247 .into_iter()
3248 .filter(|member| {
3249 let server = member.user_id().server_name();
3250 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3251 })
3252 .collect();
3253
3254 let max = members
3257 .iter()
3258 .max_by_key(|member| member.power_level())
3259 .filter(|max| max.power_level() >= int!(50))
3260 .map(|member| member.user_id().server_name());
3261
3262 let servers = members
3264 .iter()
3265 .map(|member| member.user_id().server_name())
3266 .filter(|server| max.filter(|max| max == server).is_none())
3267 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3268 *servers.entry(server).or_default() += 1;
3269 servers
3270 });
3271 let mut servers: Vec<_> = servers.into_iter().collect();
3272 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3273
3274 Ok(max
3275 .into_iter()
3276 .chain(servers.into_iter().map(|(name, _)| name))
3277 .take(3)
3278 .map(ToOwned::to_owned)
3279 .collect())
3280 }
3281
3282 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3289 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3290 return Ok(alias.matrix_to_uri());
3291 }
3292
3293 let via = self.route().await?;
3294 Ok(self.room_id().matrix_to_uri_via(via))
3295 }
3296
3297 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3308 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3309 return Ok(alias.matrix_uri(join));
3310 }
3311
3312 let via = self.route().await?;
3313 Ok(self.room_id().matrix_uri_via(via, join))
3314 }
3315
3316 pub async fn matrix_to_event_permalink(
3330 &self,
3331 event_id: impl Into<OwnedEventId>,
3332 ) -> Result<MatrixToUri> {
3333 let via = self.route().await?;
3336 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3337 }
3338
3339 pub async fn matrix_event_permalink(
3353 &self,
3354 event_id: impl Into<OwnedEventId>,
3355 ) -> Result<MatrixUri> {
3356 let via = self.route().await?;
3359 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3360 }
3361
3362 pub async fn load_user_receipt(
3375 &self,
3376 receipt_type: ReceiptType,
3377 thread: ReceiptThread,
3378 user_id: &UserId,
3379 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3380 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3381 }
3382
3383 pub async fn load_event_receipts(
3396 &self,
3397 receipt_type: ReceiptType,
3398 thread: ReceiptThread,
3399 event_id: &EventId,
3400 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3401 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3402 }
3403
3404 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3409 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3410 }
3411
3412 pub(crate) async fn push_condition_room_ctx_internal(
3419 &self,
3420 with_threads_subscriptions: bool,
3421 ) -> Result<Option<PushConditionRoomCtx>> {
3422 let room_id = self.room_id();
3423 let user_id = self.own_user_id();
3424 let room_info = self.clone_info();
3425 let member_count = room_info.active_members_count();
3426
3427 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3428 member.name().to_owned()
3429 } else {
3430 return Ok(None);
3431 };
3432
3433 let power_levels = match self.power_levels().await {
3434 Ok(power_levels) => Some(power_levels.into()),
3435 Err(error) => {
3436 if matches!(room_info.state(), RoomState::Joined) {
3437 error!("Could not compute power levels for push conditions: {error}");
3440 }
3441 None
3442 }
3443 };
3444
3445 let mut ctx = assign!(PushConditionRoomCtx::new(
3446 room_id.to_owned(),
3447 UInt::new(member_count).unwrap_or(UInt::MAX),
3448 user_id.to_owned(),
3449 user_display_name,
3450 ),
3451 {
3452 power_levels,
3453 });
3454
3455 if with_threads_subscriptions {
3456 let this = self.clone();
3457 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3458 let room = this.clone();
3459 Box::pin(async move {
3460 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3461 maybe_sub.is_some()
3462 } else {
3463 false
3464 }
3465 })
3466 });
3467 }
3468
3469 Ok(Some(ctx))
3470 }
3471
3472 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3475 self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3476 }
3477
3478 #[instrument(skip(self))]
3482 pub(crate) async fn push_context_internal(
3483 &self,
3484 with_threads_subscriptions: bool,
3485 ) -> Result<Option<PushContext>> {
3486 let Some(push_condition_room_ctx) =
3487 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3488 else {
3489 debug!("Could not aggregate push context");
3490 return Ok(None);
3491 };
3492 let push_rules = self.client().account().push_rules().await?;
3493 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3494 }
3495
3496 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3501 if let Some(ctx) = self.push_context().await? {
3502 Ok(Some(ctx.for_event(event).await))
3503 } else {
3504 Ok(None)
3505 }
3506 }
3507
3508 pub async fn invite_details(&self) -> Result<Invite> {
3511 let state = self.state();
3512
3513 if state != RoomState::Invited {
3514 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3515 }
3516
3517 let invitee = self
3518 .get_member_no_sync(self.own_user_id())
3519 .await?
3520 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3521 let event = invitee.event();
3522 let inviter_id = event.sender();
3523 let inviter = self.get_member_no_sync(inviter_id).await?;
3524 Ok(Invite { invitee, inviter })
3525 }
3526
3527 pub async fn member_with_sender_info(
3535 &self,
3536 user_id: &UserId,
3537 ) -> Result<RoomMemberWithSenderInfo> {
3538 let Some(member) = self.get_member_no_sync(user_id).await? else {
3539 return Err(Error::InsufficientData);
3540 };
3541
3542 let sender_member =
3543 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3544 Some(member)
3546 } else if self.are_members_synced() {
3547 None
3549 } else if self.sync_members().await.is_ok() {
3550 self.get_member_no_sync(member.event().sender()).await?
3552 } else {
3553 None
3554 };
3555
3556 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3557 }
3558
3559 pub async fn forget(&self) -> Result<()> {
3565 let state = self.state();
3566 match state {
3567 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3568 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3569 "Left / Banned",
3570 state,
3571 ))));
3572 }
3573 RoomState::Left | RoomState::Banned => {}
3574 }
3575
3576 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3577 let _response = self.client.send(request).await?;
3578
3579 if self.inner.direct_targets_length() != 0
3581 && let Err(e) = self.set_is_direct(false).await
3582 {
3583 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3586 }
3587
3588 self.client.base_client().forget_room(self.inner.room_id()).await?;
3589
3590 Ok(())
3591 }
3592
3593 fn ensure_room_joined(&self) -> Result<()> {
3594 let state = self.state();
3595 if state == RoomState::Joined {
3596 Ok(())
3597 } else {
3598 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3599 }
3600 }
3601
3602 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3604 if !matches!(self.state(), RoomState::Joined) {
3605 return None;
3606 }
3607
3608 let notification_settings = self.client().notification_settings().await;
3609
3610 let notification_mode =
3612 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3613
3614 if notification_mode.is_some() {
3615 notification_mode
3616 } else if let Ok(is_encrypted) =
3617 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3618 {
3619 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3624 let default_mode = notification_settings
3625 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3626 .await;
3627 Some(default_mode)
3628 } else {
3629 None
3630 }
3631 }
3632
3633 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3644 if !matches!(self.state(), RoomState::Joined) {
3645 return None;
3646 }
3647
3648 let notification_settings = self.client().notification_settings().await;
3649
3650 let mode =
3652 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3653
3654 if let Some(mode) = mode {
3655 self.update_cached_user_defined_notification_mode(mode);
3656 }
3657
3658 mode
3659 }
3660
3661 pub async fn report_content(
3674 &self,
3675 event_id: OwnedEventId,
3676 score: Option<ReportedContentScore>,
3677 reason: Option<String>,
3678 ) -> Result<report_content::v3::Response> {
3679 let state = self.state();
3680 if state != RoomState::Joined {
3681 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3682 }
3683
3684 let request = report_content::v3::Request::new(
3685 self.inner.room_id().to_owned(),
3686 event_id,
3687 score.map(Into::into),
3688 reason,
3689 );
3690 Ok(self.client.send(request).await?)
3691 }
3692
3693 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3704 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3705
3706 Ok(self.client.send(request).await?)
3707 }
3708
3709 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3715 if self.is_marked_unread() == unread {
3716 return Ok(());
3718 }
3719
3720 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3721
3722 let content = MarkedUnreadEventContent::new(unread);
3723
3724 let request = set_room_account_data::v3::Request::new(
3725 user_id.to_owned(),
3726 self.inner.room_id().to_owned(),
3727 &content,
3728 )?;
3729
3730 self.client.send(request).await?;
3731 Ok(())
3732 }
3733
3734 pub async fn event_cache(
3737 &self,
3738 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3739 self.client.event_cache().for_room(self.room_id()).await
3740 }
3741
3742 pub(crate) async fn get_user_beacon_info(
3749 &self,
3750 user_id: &UserId,
3751 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3752 let raw_event = self
3753 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3754 .await?
3755 .ok_or(BeaconError::NotFound)?;
3756
3757 match raw_event.deserialize()? {
3758 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3759 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3760 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3761 }
3762 }
3763
3764 pub async fn start_live_location_share(
3777 &self,
3778 duration_millis: u64,
3779 description: Option<String>,
3780 ) -> Result<send_state_event::v3::Response> {
3781 self.ensure_room_joined()?;
3782
3783 self.send_state_event_for_key(
3784 self.own_user_id(),
3785 BeaconInfoEventContent::new(
3786 description,
3787 Duration::from_millis(duration_millis),
3788 true,
3789 None,
3790 ),
3791 )
3792 .await
3793 }
3794
3795 pub async fn stop_live_location_share(
3802 &self,
3803 ) -> Result<send_state_event::v3::Response, BeaconError> {
3804 self.ensure_room_joined()?;
3805
3806 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3807 beacon_info_event.content.stop();
3808 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3809 }
3810
3811 pub async fn send_location_beacon(
3823 &self,
3824 geo_uri: String,
3825 ) -> Result<send_message_event::v3::Response, BeaconError> {
3826 self.ensure_room_joined()?;
3827
3828 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3829
3830 if beacon_info_event.content.is_live() {
3831 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3832 Ok(self.send(content).await?)
3833 } else {
3834 Err(BeaconError::NotLive)
3835 }
3836 }
3837
3838 pub async fn save_composer_draft(
3841 &self,
3842 draft: ComposerDraft,
3843 thread_root: Option<&EventId>,
3844 ) -> Result<()> {
3845 self.client
3846 .state_store()
3847 .set_kv_data(
3848 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3849 StateStoreDataValue::ComposerDraft(draft),
3850 )
3851 .await?;
3852 Ok(())
3853 }
3854
3855 pub async fn load_composer_draft(
3858 &self,
3859 thread_root: Option<&EventId>,
3860 ) -> Result<Option<ComposerDraft>> {
3861 let data = self
3862 .client
3863 .state_store()
3864 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3865 .await?;
3866 Ok(data.and_then(|d| d.into_composer_draft()))
3867 }
3868
3869 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3872 self.client
3873 .state_store()
3874 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3875 .await?;
3876 Ok(())
3877 }
3878
3879 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3882 let response = self
3883 .client
3884 .send(get_state_event_for_key::v3::Request::new(
3885 self.room_id().to_owned(),
3886 StateEventType::RoomPinnedEvents,
3887 "".to_owned(),
3888 ))
3889 .await;
3890
3891 match response {
3892 Ok(response) => Ok(Some(
3893 response
3894 .into_content()
3895 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3896 .pinned,
3897 )),
3898 Err(http_error) => match http_error.as_client_api_error() {
3899 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3900 _ => Err(http_error.into()),
3901 },
3902 }
3903 }
3904
3905 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3913 ObservableLiveLocation::new(&self.client, self.room_id())
3914 }
3915
3916 pub async fn subscribe_to_knock_requests(
3930 &self,
3931 ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
3932 let this = Arc::new(self.clone());
3933
3934 let room_member_events_observer =
3935 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3936
3937 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3938 let mut seen_request_ids_stream = self
3939 .seen_knock_request_ids_map
3940 .subscribe()
3941 .await
3942 .map(|values| values.unwrap_or_default());
3943
3944 let mut room_info_stream = self.subscribe_info();
3945
3946 let clear_seen_ids_handle = spawn({
3949 let this = self.clone();
3950 async move {
3951 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3952 while member_updates_stream.recv().await.is_ok() {
3953 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3955 warn!("Failed to remove seen knock requests: {err}")
3956 }
3957 }
3958 }
3959 });
3960
3961 let combined_stream = stream! {
3962 match this.get_current_join_requests(¤t_seen_ids).await {
3964 Ok(initial_requests) => yield initial_requests,
3965 Err(err) => warn!("Failed to get initial requests to join: {err}")
3966 }
3967
3968 let mut requests_stream = room_member_events_observer.subscribe();
3969 let mut seen_ids = current_seen_ids.clone();
3970
3971 loop {
3972 tokio::select! {
3975 Some((event, _)) = requests_stream.next() => {
3976 if let Some(event) = event.as_original() {
3977 let emit = if event.prev_content().is_some() {
3979 matches!(event.membership_change(),
3980 MembershipChange::Banned |
3981 MembershipChange::Knocked |
3982 MembershipChange::KnockAccepted |
3983 MembershipChange::KnockDenied |
3984 MembershipChange::KnockRetracted
3985 )
3986 } else {
3987 true
3990 };
3991
3992 if emit {
3993 match this.get_current_join_requests(&seen_ids).await {
3994 Ok(requests) => yield requests,
3995 Err(err) => {
3996 warn!("Failed to get updated knock requests on new member event: {err}")
3997 }
3998 }
3999 }
4000 }
4001 }
4002
4003 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4004 seen_ids = new_seen_ids;
4006
4007 match this.get_current_join_requests(&seen_ids).await {
4010 Ok(requests) => yield requests,
4011 Err(err) => {
4012 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4013 }
4014 }
4015 }
4016
4017 Some(room_info) = room_info_stream.next() => {
4018 if !room_info.are_members_synced() {
4021 match this.get_current_join_requests(&seen_ids).await {
4022 Ok(requests) => yield requests,
4023 Err(err) => {
4024 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4025 }
4026 }
4027 }
4028 }
4029 else => break,
4031 }
4032 }
4033 };
4034
4035 Ok((combined_stream, clear_seen_ids_handle))
4036 }
4037
4038 async fn get_current_join_requests(
4039 &self,
4040 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4041 ) -> Result<Vec<KnockRequest>> {
4042 Ok(self
4043 .members(RoomMemberships::KNOCK)
4044 .await?
4045 .into_iter()
4046 .filter_map(|member| {
4047 let event_id = member.event().event_id()?;
4048 Some(KnockRequest::new(
4049 self,
4050 event_id,
4051 member.event().timestamp(),
4052 KnockRequestMemberInfo::from_member(&member),
4053 seen_request_ids.contains_key(event_id),
4054 ))
4055 })
4056 .collect())
4057 }
4058
4059 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4061 RoomPrivacySettings::new(&self.inner, &self.client)
4062 }
4063
4064 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4072 let request = opts.into_request(self.room_id());
4073
4074 let response = self.client.send(request).await?;
4075
4076 let push_ctx = self.push_context().await?;
4077 let chunk = join_all(
4078 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4079 )
4080 .await;
4081
4082 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4083 }
4084
4085 pub async fn relations(
4099 &self,
4100 event_id: OwnedEventId,
4101 opts: RelationsOptions,
4102 ) -> Result<Relations> {
4103 opts.send(self, event_id).await
4104 }
4105
4106 #[cfg(feature = "experimental-search")]
4109 pub async fn search(
4110 &self,
4111 query: &str,
4112 max_number_of_results: usize,
4113 pagination_offset: Option<usize>,
4114 ) -> Result<Vec<OwnedEventId>, IndexError> {
4115 let mut search_index_guard = self.client.search_index().lock().await;
4116 search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4117 }
4118
4119 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4141 pub async fn subscribe_thread(
4142 &self,
4143 thread_root: OwnedEventId,
4144 automatic: Option<OwnedEventId>,
4145 ) -> Result<()> {
4146 let is_automatic = automatic.is_some();
4147
4148 match self
4149 .client
4150 .send(subscribe_thread::unstable::Request::new(
4151 self.room_id().to_owned(),
4152 thread_root.clone(),
4153 automatic,
4154 ))
4155 .await
4156 {
4157 Ok(_response) => {
4158 trace!("Server acknowledged the thread subscription; saving in db");
4159
4160 self.client
4162 .state_store()
4163 .upsert_thread_subscription(
4164 self.room_id(),
4165 &thread_root,
4166 StoredThreadSubscription {
4167 status: ThreadSubscriptionStatus::Subscribed {
4168 automatic: is_automatic,
4169 },
4170 bump_stamp: None,
4171 },
4172 )
4173 .await?;
4174
4175 Ok(())
4176 }
4177
4178 Err(err) => {
4179 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4180 trace!("Thread subscription skipped: {err}");
4185 Ok(())
4186 } else {
4187 Err(err.into())
4189 }
4190 }
4191 }
4192 }
4193
4194 pub async fn subscribe_thread_if_needed(
4200 &self,
4201 thread_root: &EventId,
4202 automatic: Option<OwnedEventId>,
4203 ) -> Result<()> {
4204 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4205 if !prev_sub.automatic || automatic.is_some() {
4208 return Ok(());
4211 }
4212 }
4213 self.subscribe_thread(thread_root.to_owned(), automatic).await
4214 }
4215
4216 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4228 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4229 self.client
4230 .send(unsubscribe_thread::unstable::Request::new(
4231 self.room_id().to_owned(),
4232 thread_root.clone(),
4233 ))
4234 .await?;
4235
4236 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4237
4238 self.client
4240 .state_store()
4241 .upsert_thread_subscription(
4242 self.room_id(),
4243 &thread_root,
4244 StoredThreadSubscription {
4245 status: ThreadSubscriptionStatus::Unsubscribed,
4246 bump_stamp: None,
4247 },
4248 )
4249 .await?;
4250
4251 Ok(())
4252 }
4253
4254 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4271 pub async fn fetch_thread_subscription(
4272 &self,
4273 thread_root: OwnedEventId,
4274 ) -> Result<Option<ThreadSubscription>> {
4275 let result = self
4276 .client
4277 .send(get_thread_subscription::unstable::Request::new(
4278 self.room_id().to_owned(),
4279 thread_root.clone(),
4280 ))
4281 .await;
4282
4283 let subscription = match result {
4284 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4285 Err(http_error) => match http_error.as_client_api_error() {
4286 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4287 _ => return Err(http_error.into()),
4288 },
4289 };
4290
4291 if let Some(sub) = &subscription {
4293 self.client
4294 .state_store()
4295 .upsert_thread_subscription(
4296 self.room_id(),
4297 &thread_root,
4298 StoredThreadSubscription {
4299 status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4300 bump_stamp: None,
4301 },
4302 )
4303 .await?;
4304 } else {
4305 self.client
4307 .state_store()
4308 .remove_thread_subscription(self.room_id(), &thread_root)
4309 .await?;
4310 }
4311
4312 Ok(subscription)
4313 }
4314
4315 pub async fn load_or_fetch_thread_subscription(
4322 &self,
4323 thread_root: &EventId,
4324 ) -> Result<Option<ThreadSubscription>> {
4325 if self.client.thread_subscription_catchup().is_outdated() {
4327 return self.fetch_thread_subscription(thread_root.to_owned()).await;
4328 }
4329
4330 Ok(self
4332 .client
4333 .state_store()
4334 .load_thread_subscription(self.room_id(), thread_root)
4335 .await
4336 .map(|maybe_sub| {
4337 maybe_sub.and_then(|stored| match stored.status {
4338 ThreadSubscriptionStatus::Unsubscribed => None,
4339 ThreadSubscriptionStatus::Subscribed { automatic } => {
4340 Some(ThreadSubscription { automatic })
4341 }
4342 })
4343 })?)
4344 }
4345}
4346
4347#[cfg(feature = "e2e-encryption")]
4348impl RoomIdentityProvider for Room {
4349 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4350 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4351 }
4352
4353 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4354 Box::pin(async {
4355 let members = self
4356 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4357 .await
4358 .unwrap_or_else(|_| Default::default());
4359
4360 let mut ret: Vec<UserIdentity> = Vec::new();
4361 for member in members {
4362 if let Some(i) = self.user_identity(member.user_id()).await {
4363 ret.push(i);
4364 }
4365 }
4366 ret
4367 })
4368 }
4369
4370 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4371 Box::pin(async {
4372 self.client
4373 .encryption()
4374 .get_user_identity(user_id)
4375 .await
4376 .unwrap_or(None)
4377 .map(|u| u.underlying_identity())
4378 })
4379 }
4380}
4381
4382#[derive(Clone, Debug)]
4385pub(crate) struct WeakRoom {
4386 client: WeakClient,
4387 room_id: OwnedRoomId,
4388}
4389
4390impl WeakRoom {
4391 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4393 Self { client, room_id }
4394 }
4395
4396 pub fn get(&self) -> Option<Room> {
4398 self.client.get().and_then(|client| client.get_room(&self.room_id))
4399 }
4400
4401 pub fn room_id(&self) -> &RoomId {
4403 &self.room_id
4404 }
4405}
4406
4407#[derive(Debug, Clone)]
4409pub struct Invite {
4410 pub invitee: RoomMember,
4412 pub inviter: Option<RoomMember>,
4414}
4415
4416#[derive(Error, Debug)]
4417enum InvitationError {
4418 #[error("No membership event found")]
4419 EventMissing,
4420}
4421
4422#[derive(Debug, Clone, Default)]
4424#[non_exhaustive]
4425pub struct Receipts {
4426 pub fully_read: Option<OwnedEventId>,
4428 pub public_read_receipt: Option<OwnedEventId>,
4430 pub private_read_receipt: Option<OwnedEventId>,
4432}
4433
4434impl Receipts {
4435 pub fn new() -> Self {
4437 Self::default()
4438 }
4439
4440 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4449 self.fully_read = event_id.into();
4450 self
4451 }
4452
4453 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4459 self.public_read_receipt = event_id.into();
4460 self
4461 }
4462
4463 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4467 self.private_read_receipt = event_id.into();
4468 self
4469 }
4470
4471 pub fn is_empty(&self) -> bool {
4473 self.fully_read.is_none()
4474 && self.public_read_receipt.is_none()
4475 && self.private_read_receipt.is_none()
4476 }
4477}
4478
4479#[derive(Debug)]
4482pub enum ParentSpace {
4483 Reciprocal(Room),
4486 WithPowerlevel(Room),
4491 Illegitimate(Room),
4494 Unverifiable(OwnedRoomId),
4497}
4498
4499#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4503pub struct ReportedContentScore(i8);
4504
4505impl ReportedContentScore {
4506 pub const MIN: Self = Self(-100);
4510
4511 pub const MAX: Self = Self(0);
4515
4516 pub fn new(value: i8) -> Option<Self> {
4525 value.try_into().ok()
4526 }
4527
4528 pub fn new_saturating(value: i8) -> Self {
4534 if value > Self::MAX {
4535 Self::MAX
4536 } else if value < Self::MIN {
4537 Self::MIN
4538 } else {
4539 Self(value)
4540 }
4541 }
4542
4543 pub fn value(&self) -> i8 {
4545 self.0
4546 }
4547}
4548
4549impl PartialEq<i8> for ReportedContentScore {
4550 fn eq(&self, other: &i8) -> bool {
4551 self.0.eq(other)
4552 }
4553}
4554
4555impl PartialEq<ReportedContentScore> for i8 {
4556 fn eq(&self, other: &ReportedContentScore) -> bool {
4557 self.eq(&other.0)
4558 }
4559}
4560
4561impl PartialOrd<i8> for ReportedContentScore {
4562 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4563 self.0.partial_cmp(other)
4564 }
4565}
4566
4567impl PartialOrd<ReportedContentScore> for i8 {
4568 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4569 self.partial_cmp(&other.0)
4570 }
4571}
4572
4573impl From<ReportedContentScore> for Int {
4574 fn from(value: ReportedContentScore) -> Self {
4575 value.0.into()
4576 }
4577}
4578
4579impl TryFrom<i8> for ReportedContentScore {
4580 type Error = TryFromReportedContentScoreError;
4581
4582 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4583 if value > Self::MAX || value < Self::MIN {
4584 Err(TryFromReportedContentScoreError(()))
4585 } else {
4586 Ok(Self(value))
4587 }
4588 }
4589}
4590
4591impl TryFrom<i16> for ReportedContentScore {
4592 type Error = TryFromReportedContentScoreError;
4593
4594 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4595 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4596 value.try_into()
4597 }
4598}
4599
4600impl TryFrom<i32> for ReportedContentScore {
4601 type Error = TryFromReportedContentScoreError;
4602
4603 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4604 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4605 value.try_into()
4606 }
4607}
4608
4609impl TryFrom<i64> for ReportedContentScore {
4610 type Error = TryFromReportedContentScoreError;
4611
4612 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4613 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4614 value.try_into()
4615 }
4616}
4617
4618impl TryFrom<Int> for ReportedContentScore {
4619 type Error = TryFromReportedContentScoreError;
4620
4621 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4622 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4623 value.try_into()
4624 }
4625}
4626
4627trait EventSource {
4628 fn get_event(
4629 &self,
4630 event_id: &EventId,
4631 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4632}
4633
4634impl EventSource for &Room {
4635 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4636 self.load_or_fetch_event(event_id, None).await
4637 }
4638}
4639
4640#[derive(Debug, Clone, Error)]
4643#[error("out of range conversion attempted")]
4644pub struct TryFromReportedContentScoreError(());
4645
4646#[derive(Debug)]
4649pub struct RoomMemberWithSenderInfo {
4650 pub room_member: RoomMember,
4652 pub sender_info: Option<RoomMember>,
4655}
4656
4657#[cfg(all(test, not(target_family = "wasm")))]
4658mod tests {
4659 use std::collections::BTreeMap;
4660
4661 use matrix_sdk_base::{ComposerDraft, DraftAttachment, store::ComposerDraftType};
4662 use matrix_sdk_test::{
4663 JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
4664 event_factory::EventFactory, test_json,
4665 };
4666 use ruma::{
4667 RoomVersionId, event_id,
4668 events::{relation::RelationType, room::member::MembershipState},
4669 int, owned_event_id, room_id, user_id,
4670 };
4671 use wiremock::{
4672 Mock, MockServer, ResponseTemplate,
4673 matchers::{header, method, path_regex},
4674 };
4675
4676 use super::ReportedContentScore;
4677 use crate::{
4678 Client,
4679 config::RequestConfig,
4680 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4681 test_utils::{
4682 client::mock_matrix_session,
4683 logged_in_client,
4684 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4685 },
4686 };
4687
4688 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4689 #[async_test]
4690 async fn test_cache_invalidation_while_encrypt() {
4691 use matrix_sdk_base::store::RoomLoadSettings;
4692 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4693
4694 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4695 let session = mock_matrix_session();
4696
4697 let client = Client::builder()
4698 .homeserver_url("http://localhost:1234")
4699 .request_config(RequestConfig::new().disable_retry())
4700 .sqlite_store(&sqlite_path, None)
4701 .build()
4702 .await
4703 .unwrap();
4704 client
4705 .matrix_auth()
4706 .restore_session(session.clone(), RoomLoadSettings::default())
4707 .await
4708 .unwrap();
4709
4710 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4711
4712 let server = MockServer::start().await;
4714 {
4715 Mock::given(method("GET"))
4716 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4717 .and(header("authorization", "Bearer 1234"))
4718 .respond_with(
4719 ResponseTemplate::new(200)
4720 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4721 )
4722 .mount(&server)
4723 .await;
4724 let response = SyncResponseBuilder::default()
4725 .add_joined_room(
4726 JoinedRoomBuilder::default()
4727 .add_state_event(StateTestEvent::Member)
4728 .add_state_event(StateTestEvent::PowerLevels)
4729 .add_state_event(StateTestEvent::Encryption),
4730 )
4731 .build_sync_response();
4732 client.base_client().receive_sync_response(response).await.unwrap();
4733 }
4734
4735 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4736
4737 room.preshare_room_key().await.unwrap();
4739
4740 {
4743 let client = Client::builder()
4744 .homeserver_url("http://localhost:1234")
4745 .request_config(RequestConfig::new().disable_retry())
4746 .sqlite_store(&sqlite_path, None)
4747 .build()
4748 .await
4749 .unwrap();
4750 client
4751 .matrix_auth()
4752 .restore_session(session.clone(), RoomLoadSettings::default())
4753 .await
4754 .unwrap();
4755 client
4756 .encryption()
4757 .enable_cross_process_store_lock("client2".to_owned())
4758 .await
4759 .unwrap();
4760
4761 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4762 assert!(guard.is_some());
4763 }
4764
4765 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4767 assert!(guard.is_some());
4768
4769 let olm = client.olm_machine().await;
4771 let olm = olm.as_ref().expect("Olm machine wasn't started");
4772
4773 let _encrypted_content = olm
4776 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4777 .await
4778 .unwrap();
4779 }
4780
4781 #[test]
4782 fn reported_content_score() {
4783 let score = ReportedContentScore::new(0).unwrap();
4785 assert_eq!(score.value(), 0);
4786 let score = ReportedContentScore::new(-50).unwrap();
4787 assert_eq!(score.value(), -50);
4788 let score = ReportedContentScore::new(-100).unwrap();
4789 assert_eq!(score.value(), -100);
4790 assert_eq!(ReportedContentScore::new(10), None);
4791 assert_eq!(ReportedContentScore::new(-110), None);
4792
4793 let score = ReportedContentScore::new_saturating(0);
4794 assert_eq!(score.value(), 0);
4795 let score = ReportedContentScore::new_saturating(-50);
4796 assert_eq!(score.value(), -50);
4797 let score = ReportedContentScore::new_saturating(-100);
4798 assert_eq!(score.value(), -100);
4799 let score = ReportedContentScore::new_saturating(10);
4800 assert_eq!(score, ReportedContentScore::MAX);
4801 let score = ReportedContentScore::new_saturating(-110);
4802 assert_eq!(score, ReportedContentScore::MIN);
4803
4804 let score = ReportedContentScore::try_from(0i16).unwrap();
4806 assert_eq!(score.value(), 0);
4807 let score = ReportedContentScore::try_from(-100i16).unwrap();
4808 assert_eq!(score.value(), -100);
4809 ReportedContentScore::try_from(10i16).unwrap_err();
4810 ReportedContentScore::try_from(-110i16).unwrap_err();
4811
4812 let score = ReportedContentScore::try_from(0i32).unwrap();
4814 assert_eq!(score.value(), 0);
4815 let score = ReportedContentScore::try_from(-100i32).unwrap();
4816 assert_eq!(score.value(), -100);
4817 ReportedContentScore::try_from(10i32).unwrap_err();
4818 ReportedContentScore::try_from(-110i32).unwrap_err();
4819
4820 let score = ReportedContentScore::try_from(0i64).unwrap();
4822 assert_eq!(score.value(), 0);
4823 let score = ReportedContentScore::try_from(-100i64).unwrap();
4824 assert_eq!(score.value(), -100);
4825 ReportedContentScore::try_from(10i64).unwrap_err();
4826 ReportedContentScore::try_from(-110i64).unwrap_err();
4827
4828 let score = ReportedContentScore::try_from(int!(0)).unwrap();
4830 assert_eq!(score.value(), 0);
4831 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4832 assert_eq!(score.value(), -100);
4833 ReportedContentScore::try_from(int!(10)).unwrap_err();
4834 ReportedContentScore::try_from(int!(-110)).unwrap_err();
4835 }
4836
4837 #[async_test]
4838 async fn test_composer_draft() {
4839 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4840
4841 let client = logged_in_client(None).await;
4842
4843 let response = SyncResponseBuilder::default()
4844 .add_joined_room(JoinedRoomBuilder::default())
4845 .build_sync_response();
4846 client.base_client().receive_sync_response(response).await.unwrap();
4847 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4848
4849 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4850
4851 let draft = ComposerDraft {
4854 plain_text: "Hello, world!".to_owned(),
4855 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4856 draft_type: ComposerDraftType::NewMessage,
4857 attachments: vec![DraftAttachment {
4858 filename: "cat.txt".to_owned(),
4859 content: matrix_sdk_base::DraftAttachmentContent::File {
4860 data: b"meow".to_vec(),
4861 mimetype: Some("text/plain".to_owned()),
4862 size: Some(5),
4863 },
4864 }],
4865 };
4866
4867 room.save_composer_draft(draft.clone(), None).await.unwrap();
4868
4869 let thread_root = owned_event_id!("$thread_root:b.c");
4870 let thread_draft = ComposerDraft {
4871 plain_text: "Hello, thread!".to_owned(),
4872 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4873 draft_type: ComposerDraftType::NewMessage,
4874 attachments: vec![DraftAttachment {
4875 filename: "dog.txt".to_owned(),
4876 content: matrix_sdk_base::DraftAttachmentContent::File {
4877 data: b"wuv".to_vec(),
4878 mimetype: Some("text/plain".to_owned()),
4879 size: Some(4),
4880 },
4881 }],
4882 };
4883
4884 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4885
4886 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4888
4889 assert_eq!(
4891 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4892 Some(thread_draft.clone())
4893 );
4894
4895 room.clear_composer_draft(None).await.unwrap();
4897 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4898
4899 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4901
4902 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4904 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4905 }
4906
4907 #[async_test]
4908 async fn test_mark_join_requests_as_seen() {
4909 let server = MatrixMockServer::new().await;
4910 let client = server.client_builder().build().await;
4911 let event_id = event_id!("$a:b.c");
4912 let room_id = room_id!("!a:b.c");
4913 let user_id = user_id!("@alice:b.c");
4914
4915 let f = EventFactory::new().room(room_id);
4916 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4917 f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4918 ]);
4919 let room = server.sync_room(&client, joined_room_builder).await;
4920
4921 let seen_ids =
4923 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4924 assert!(seen_ids.is_empty());
4925
4926 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4928 .await
4929 .expect("Couldn't mark join request as seen");
4930
4931 let seen_ids =
4933 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4934 assert_eq!(seen_ids.len(), 1);
4935 assert_eq!(
4936 seen_ids.into_iter().next().expect("No next value"),
4937 (event_id.to_owned(), user_id.to_owned())
4938 )
4939 }
4940
4941 #[async_test]
4942 async fn test_own_room_membership_with_no_own_member_event() {
4943 let server = MatrixMockServer::new().await;
4944 let client = server.client_builder().build().await;
4945 let room_id = room_id!("!a:b.c");
4946
4947 let room = server.sync_joined_room(&client, room_id).await;
4948
4949 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4952 assert!(error.is_some());
4953 }
4954
4955 #[async_test]
4956 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4957 let server = MatrixMockServer::new().await;
4958 let client = server.client_builder().build().await;
4959 let room_id = room_id!("!a:b.c");
4960 let user_id = user_id!("@example:localhost");
4961
4962 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4963 let joined_room_builder =
4964 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
4965 let room = server.sync_room(&client, joined_room_builder).await;
4966
4967 let ret = room
4969 .member_with_sender_info(client.user_id().unwrap())
4970 .await
4971 .expect("Room member info should be available");
4972
4973 assert_eq!(ret.room_member.event().user_id(), user_id);
4975
4976 assert!(ret.sender_info.is_none());
4978 }
4979
4980 #[async_test]
4981 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4982 let server = MatrixMockServer::new().await;
4983 let client = server.client_builder().build().await;
4984 let room_id = room_id!("!a:b.c");
4985 let user_id = user_id!("@example:localhost");
4986
4987 let f = EventFactory::new().room(room_id).sender(user_id);
4988 let joined_room_builder =
4989 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
4990 let room = server.sync_room(&client, joined_room_builder).await;
4991
4992 let ret = room
4994 .member_with_sender_info(client.user_id().unwrap())
4995 .await
4996 .expect("Room member info should be available");
4997
4998 assert_eq!(ret.room_member.event().user_id(), user_id);
5000
5001 assert!(ret.sender_info.is_some());
5003 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5004 }
5005
5006 #[async_test]
5007 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5008 let server = MatrixMockServer::new().await;
5009 let client = server.client_builder().build().await;
5010 let room_id = room_id!("!a:b.c");
5011 let user_id = user_id!("@example:localhost");
5012 let sender_id = user_id!("@alice:b.c");
5013
5014 let f = EventFactory::new().room(room_id).sender(sender_id);
5015 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5016 f.member(user_id).into(),
5017 f.member(sender_id).into(),
5019 ]);
5020 let room = server.sync_room(&client, joined_room_builder).await;
5021
5022 let ret = room
5024 .member_with_sender_info(client.user_id().unwrap())
5025 .await
5026 .expect("Room member info should be available");
5027
5028 assert_eq!(ret.room_member.event().user_id(), user_id);
5030
5031 assert!(ret.sender_info.is_some());
5033 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5034 }
5035
5036 #[async_test]
5037 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5038 let server = MatrixMockServer::new().await;
5039 let client = server.client_builder().build().await;
5040 let room_id = room_id!("!a:b.c");
5041 let user_id = user_id!("@example:localhost");
5042 let sender_id = user_id!("@alice:b.c");
5043
5044 let f = EventFactory::new().room(room_id).sender(sender_id);
5045 let joined_room_builder =
5046 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5047 let room = server.sync_room(&client, joined_room_builder).await;
5048
5049 server
5051 .mock_get_members()
5052 .ok(vec![f.member(sender_id).into_raw()])
5053 .mock_once()
5054 .mount()
5055 .await;
5056
5057 let ret = room
5059 .member_with_sender_info(client.user_id().unwrap())
5060 .await
5061 .expect("Room member info should be available");
5062
5063 assert_eq!(ret.room_member.event().user_id(), user_id);
5065
5066 assert!(ret.sender_info.is_some());
5068 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5069 }
5070
5071 #[async_test]
5072 async fn test_list_threads() {
5073 let server = MatrixMockServer::new().await;
5074 let client = server.client_builder().build().await;
5075
5076 let room_id = room_id!("!a:b.c");
5077 let sender_id = user_id!("@alice:b.c");
5078 let f = EventFactory::new().room(room_id).sender(sender_id);
5079
5080 let eid1 = event_id!("$1");
5081 let eid2 = event_id!("$2");
5082 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5083 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5084
5085 server
5086 .mock_room_threads()
5087 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5088 .mock_once()
5089 .mount()
5090 .await;
5091 server
5092 .mock_room_threads()
5093 .match_from("prev_batch")
5094 .ok(batch2, None)
5095 .mock_once()
5096 .mount()
5097 .await;
5098
5099 let room = server.sync_joined_room(&client, room_id).await;
5100 let result =
5101 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5102 assert_eq!(result.chunk.len(), 1);
5103 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5104 assert!(result.prev_batch_token.is_some());
5105
5106 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5107 let result = room.list_threads(opts).await.expect("Failed to list threads");
5108 assert_eq!(result.chunk.len(), 1);
5109 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5110 assert!(result.prev_batch_token.is_none());
5111 }
5112
5113 #[async_test]
5114 async fn test_relations() {
5115 let server = MatrixMockServer::new().await;
5116 let client = server.client_builder().build().await;
5117
5118 let room_id = room_id!("!a:b.c");
5119 let sender_id = user_id!("@alice:b.c");
5120 let f = EventFactory::new().room(room_id).sender(sender_id);
5121
5122 let target_event_id = owned_event_id!("$target");
5123 let eid1 = event_id!("$1");
5124 let eid2 = event_id!("$2");
5125 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5126 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5127
5128 server
5129 .mock_room_relations()
5130 .match_target_event(target_event_id.clone())
5131 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5132 .mock_once()
5133 .mount()
5134 .await;
5135
5136 server
5137 .mock_room_relations()
5138 .match_target_event(target_event_id.clone())
5139 .match_from("next_batch")
5140 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5141 .mock_once()
5142 .mount()
5143 .await;
5144
5145 let room = server.sync_joined_room(&client, room_id).await;
5146
5147 let mut opts = RelationsOptions {
5149 include_relations: IncludeRelations::AllRelations,
5150 ..Default::default()
5151 };
5152 let result = room
5153 .relations(target_event_id.clone(), opts.clone())
5154 .await
5155 .expect("Failed to list relations the first time");
5156 assert_eq!(result.chunk.len(), 1);
5157 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5158 assert!(result.prev_batch_token.is_none());
5159 assert!(result.next_batch_token.is_some());
5160 assert!(result.recursion_depth.is_none());
5161
5162 opts.from = result.next_batch_token;
5163 let result = room
5164 .relations(target_event_id, opts)
5165 .await
5166 .expect("Failed to list relations the second time");
5167 assert_eq!(result.chunk.len(), 1);
5168 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5169 assert!(result.prev_batch_token.is_none());
5170 assert!(result.next_batch_token.is_none());
5171 assert!(result.recursion_depth.is_none());
5172 }
5173
5174 #[async_test]
5175 async fn test_relations_with_reltype() {
5176 let server = MatrixMockServer::new().await;
5177 let client = server.client_builder().build().await;
5178
5179 let room_id = room_id!("!a:b.c");
5180 let sender_id = user_id!("@alice:b.c");
5181 let f = EventFactory::new().room(room_id).sender(sender_id);
5182
5183 let target_event_id = owned_event_id!("$target");
5184 let eid1 = event_id!("$1");
5185 let eid2 = event_id!("$2");
5186 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5187 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5188
5189 server
5190 .mock_room_relations()
5191 .match_target_event(target_event_id.clone())
5192 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5193 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5194 .mock_once()
5195 .mount()
5196 .await;
5197
5198 server
5199 .mock_room_relations()
5200 .match_target_event(target_event_id.clone())
5201 .match_from("next_batch")
5202 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5203 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5204 .mock_once()
5205 .mount()
5206 .await;
5207
5208 let room = server.sync_joined_room(&client, room_id).await;
5209
5210 let mut opts = RelationsOptions {
5212 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5213 ..Default::default()
5214 };
5215 let result = room
5216 .relations(target_event_id.clone(), opts.clone())
5217 .await
5218 .expect("Failed to list relations the first time");
5219 assert_eq!(result.chunk.len(), 1);
5220 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5221 assert!(result.prev_batch_token.is_none());
5222 assert!(result.next_batch_token.is_some());
5223 assert!(result.recursion_depth.is_none());
5224
5225 opts.from = result.next_batch_token;
5226 let result = room
5227 .relations(target_event_id, opts)
5228 .await
5229 .expect("Failed to list relations the second time");
5230 assert_eq!(result.chunk.len(), 1);
5231 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5232 assert!(result.prev_batch_token.is_none());
5233 assert!(result.next_batch_token.is_none());
5234 assert!(result.recursion_depth.is_none());
5235 }
5236
5237 #[async_test]
5238 async fn test_power_levels_computation() {
5239 let server = MatrixMockServer::new().await;
5240 let client = server.client_builder().build().await;
5241
5242 let room_id = room_id!("!a:b.c");
5243 let sender_id = client.user_id().expect("No session id");
5244 let f = EventFactory::new().room(room_id).sender(sender_id);
5245 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5246
5247 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5249 let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5250 let room_member_event = f.member(sender_id).into();
5251
5252 let room = server
5254 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5255 .await;
5256 let ctx = room
5257 .push_condition_room_ctx()
5258 .await
5259 .expect("Failed to get push condition context")
5260 .expect("Could not get push condition context");
5261
5262 assert!(ctx.power_levels.is_none());
5264
5265 let room = server
5267 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5268 .await;
5269 let ctx = room
5270 .push_condition_room_ctx()
5271 .await
5272 .expect("Failed to get push condition context")
5273 .expect("Could not get push condition context");
5274
5275 assert!(ctx.power_levels.is_none());
5277
5278 let room = server
5280 .sync_room(
5281 &client,
5282 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5283 )
5284 .await;
5285 let ctx = room
5286 .push_condition_room_ctx()
5287 .await
5288 .expect("Failed to get push condition context")
5289 .expect("Could not get push condition context");
5290
5291 assert!(ctx.power_levels.is_some());
5293 }
5294}