1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 future::join_all, stream as futures_stream, stream::FuturesUnordered, StreamExt,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
39pub use matrix_sdk_base::store::ThreadSubscription;
40#[cfg(feature = "e2e-encryption")]
41use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
42use matrix_sdk_base::{
43 deserialized_responses::{
44 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
45 },
46 event_cache::store::media::IgnoreMediaRetentionPolicy,
47 media::MediaThumbnailSettings,
48 store::StateStoreExt,
49 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
50 StateChanges, StateStoreDataKey, StateStoreDataValue,
51};
52#[cfg(feature = "e2e-encryption")]
53use matrix_sdk_common::BoxFuture;
54use matrix_sdk_common::{
55 deserialized_responses::TimelineEvent,
56 executor::{spawn, JoinHandle},
57 timeout::timeout,
58};
59#[cfg(feature = "experimental-search")]
60#[cfg(doc)]
61use matrix_sdk_search::index::RoomIndex;
62use mime::Mime;
63use reply::Reply;
64#[cfg(feature = "unstable-msc4274")]
65use ruma::events::room::message::GalleryItemType;
66#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
67use ruma::events::AnySyncMessageLikeEvent;
68#[cfg(feature = "experimental-encrypted-state-events")]
69use ruma::events::AnySyncStateEvent;
70#[cfg(feature = "e2e-encryption")]
71use ruma::events::{
72 room::encrypted::OriginalSyncRoomEncryptedEvent, AnySyncTimelineEvent, SyncMessageLikeEvent,
73};
74use ruma::{
75 api::client::{
76 config::{set_global_account_data, set_room_account_data},
77 context,
78 error::ErrorKind,
79 filter::LazyLoadOptions,
80 membership::{
81 ban_user, forget_room, get_member_events,
82 invite_user::{self, v3::InvitationRecipient},
83 kick_user, leave_room, unban_user, Invite3pid,
84 },
85 message::send_message_event,
86 read_marker::set_read_marker,
87 receipt::create_receipt,
88 redact::redact_event,
89 room::{get_room_event, report_content, report_room},
90 state::{get_state_event_for_key, send_state_event},
91 tag::{create_tag, delete_tag},
92 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
93 typing::create_typing_event::{self, v3::Typing},
94 },
95 assign,
96 events::{
97 beacon::BeaconEventContent,
98 beacon_info::BeaconInfoEventContent,
99 direct::DirectEventContent,
100 marked_unread::MarkedUnreadEventContent,
101 receipt::{Receipt, ReceiptThread, ReceiptType},
102 room::{
103 avatar::{self, RoomAvatarEventContent},
104 encryption::RoomEncryptionEventContent,
105 history_visibility::HistoryVisibility,
106 member::{MembershipChange, SyncRoomMemberEvent},
107 message::{
108 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
109 FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
110 UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo,
111 VideoMessageEventContent,
112 },
113 name::RoomNameEventContent,
114 pinned_events::RoomPinnedEventsEventContent,
115 power_levels::{
116 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
117 },
118 server_acl::RoomServerAclEventContent,
119 topic::RoomTopicEventContent,
120 ImageInfo, MediaSource, ThumbnailInfo,
121 },
122 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
123 tag::{TagInfo, TagName},
124 typing::SyncTypingEvent,
125 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
126 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
127 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
128 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
129 StaticStateEventContent, SyncStateEvent,
130 },
131 int,
132 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
133 serde::Raw,
134 time::Instant,
135 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
136 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
137};
138#[cfg(feature = "experimental-encrypted-state-events")]
139use ruma::{
140 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
141 serde::JsonCastable,
142};
143use serde::de::DeserializeOwned;
144use thiserror::Error;
145use tokio::{join, sync::broadcast};
146use tracing::{debug, error, info, instrument, trace, warn};
147
148use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
149pub use self::{
150 member::{RoomMember, RoomMemberRole},
151 messages::{
152 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
153 Relations, RelationsOptions, ThreadRoots,
154 },
155};
156#[cfg(doc)]
157use crate::event_cache::EventCache;
158#[cfg(feature = "experimental-encrypted-state-events")]
159use crate::room::futures::{SendRawStateEvent, SendStateEvent};
160use crate::{
161 attachment::{AttachmentConfig, AttachmentInfo},
162 client::WeakClient,
163 config::RequestConfig,
164 error::{BeaconError, WrongRoomState},
165 event_cache::{self, EventCacheDropHandles, RoomEventCache},
166 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
167 live_location_share::ObservableLiveLocation,
168 media::{MediaFormat, MediaRequestParameters},
169 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
170 room::{
171 knock_requests::{KnockRequest, KnockRequestMemberInfo},
172 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
173 privacy_settings::RoomPrivacySettings,
174 },
175 sync::RoomUpdate,
176 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
177 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
178};
179#[cfg(feature = "e2e-encryption")]
180use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
181
182pub mod edit;
183pub mod futures;
184pub mod identity_status_changes;
185pub mod knock_requests;
187mod member;
188mod messages;
189pub mod power_levels;
190pub mod reply;
191
192pub mod privacy_settings;
194
195#[cfg(feature = "e2e-encryption")]
196pub(crate) mod shared_room_history;
197
198#[derive(Debug, Clone)]
201pub struct Room {
202 inner: BaseRoom,
203 pub(crate) client: Client,
204}
205
206impl Deref for Room {
207 type Target = BaseRoom;
208
209 fn deref(&self) -> &Self::Target {
210 &self.inner
211 }
212}
213
214const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
215const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
216
217#[derive(Debug)]
219pub struct PushContext {
220 push_condition_room_ctx: PushConditionRoomCtx,
222
223 push_rules: Ruleset,
226}
227
228impl PushContext {
229 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
231 Self { push_condition_room_ctx, push_rules }
232 }
233
234 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
236 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
237 }
238
239 #[doc(hidden)]
242 #[instrument(skip_all)]
243 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
244 let rules = self
245 .push_rules
246 .iter()
247 .filter_map(|r| {
248 if !r.enabled() {
249 return None;
250 }
251
252 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
253
254 let conditions = match r {
255 AnyPushRuleRef::Override(r) => {
256 format!("{:?}", r.conditions)
257 }
258 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
259 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
260 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
261 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
262 _ => "<unknown push rule kind>".to_owned(),
263 };
264
265 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
266 })
267 .collect::<Vec<_>>()
268 .join("\n");
269 trace!("rules:\n\n{rules}\n\n");
270
271 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
272
273 if let Some(found) = found {
274 trace!("rule {} matched", found.rule_id());
275 found.actions().to_owned()
276 } else {
277 trace!("no match");
278 Vec::new()
279 }
280 }
281}
282
283macro_rules! make_media_type {
284 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $formatted_caption: ident, $info: ident, $thumbnail: ident) => {{
285 let (body, filename) = match $caption {
289 Some(caption) => (caption, Some($filename)),
290 None => ($filename, None),
291 };
292
293 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
294
295 match $content_type.type_() {
296 mime::IMAGE => {
297 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
298 mimetype: Some($content_type.as_ref().to_owned()),
299 thumbnail_source,
300 thumbnail_info
301 });
302 let content = assign!(ImageMessageEventContent::new(body, $source), {
303 info: Some(Box::new(info)),
304 formatted: $formatted_caption,
305 filename
306 });
307 <$t>::Image(content)
308 }
309
310 mime::AUDIO => {
311 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
312 formatted: $formatted_caption,
313 filename
314 });
315
316 if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) =
317 &$info
318 {
319 if let Some(duration) = audio_info.duration {
320 let waveform = waveform_vec.iter().map(|v| (*v).into()).collect();
321 content.audio =
322 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
323 }
324 content.voice = Some(UnstableVoiceContentBlock::new());
325 }
326
327 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
328 audio_info.mimetype = Some($content_type.as_ref().to_owned());
329 let content = content.info(Box::new(audio_info));
330
331 <$t>::Audio(content)
332 }
333
334 mime::VIDEO => {
335 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
336 mimetype: Some($content_type.as_ref().to_owned()),
337 thumbnail_source,
338 thumbnail_info
339 });
340 let content = assign!(VideoMessageEventContent::new(body, $source), {
341 info: Some(Box::new(info)),
342 formatted: $formatted_caption,
343 filename
344 });
345 <$t>::Video(content)
346 }
347
348 _ => {
349 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
350 mimetype: Some($content_type.as_ref().to_owned()),
351 thumbnail_source,
352 thumbnail_info
353 });
354 let content = assign!(FileMessageEventContent::new(body, $source), {
355 info: Some(Box::new(info)),
356 formatted: $formatted_caption,
357 filename,
358 });
359 <$t>::File(content)
360 }
361 }
362 }};
363}
364
365impl Room {
366 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
373 Self { inner: room, client }
374 }
375
376 #[doc(alias = "reject_invitation")]
382 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
383 async fn leave_impl(&self) -> (Result<()>, &Room) {
384 let state = self.state();
385 if state == RoomState::Left {
386 return (
387 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
388 "Joined or Invited",
389 state,
390 )))),
391 self,
392 );
393 }
394
395 let should_forget = matches!(self.state(), RoomState::Invited);
398
399 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
400 let response = self.client.send(request).await;
401
402 if let Err(error) = response {
405 #[allow(clippy::collapsible_match)]
406 let ignore_error = if let Some(error) = error.client_api_error_kind() {
407 match error {
408 ErrorKind::Forbidden { .. } => true,
411 _ => false,
412 }
413 } else {
414 false
415 };
416
417 error!(?error, ignore_error, should_forget, "Failed to leave the room");
418
419 if !ignore_error {
420 return (Err(error.into()), self);
421 }
422 }
423
424 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
425 return (Err(e.into()), self);
426 }
427
428 if should_forget {
429 trace!("Trying to forget the room");
430
431 if let Err(error) = self.forget().await {
432 error!(?error, "Failed to forget the room");
433 }
434 }
435
436 (Ok(()), self)
437 }
438
439 pub async fn leave(&self) -> Result<()> {
447 let mut rooms: Vec<Room> = vec![self.clone()];
448 let mut current_room = self;
449
450 while let Some(predecessor) = current_room.predecessor_room() {
451 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
452
453 if let Some(predecessor_room) = maybe_predecessor_room {
454 rooms.push(predecessor_room.clone());
455 current_room = rooms.last().expect("Room just pushed so can't be empty");
456 } else {
457 warn!("Cannot find predecessor room");
458 break;
459 }
460 }
461
462 let batch_size = 5;
463
464 let rooms_futures: Vec<_> = rooms
465 .iter()
466 .filter_map(|room| match room.state() {
467 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
468 Some(room.leave_impl())
469 }
470 RoomState::Banned | RoomState::Left => None,
471 })
472 .collect();
473
474 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
475
476 let mut maybe_this_room_failed_with: Option<Error> = None;
477
478 while let Some(result) = futures_stream.next().await {
479 if let (Err(e), room) = result {
480 if room.room_id() == self.room_id() {
481 maybe_this_room_failed_with = Some(e);
482 } else {
483 warn!("Failure while attempting to leave predecessor room: {e:?}");
484 }
485 }
486 }
487
488 maybe_this_room_failed_with.map_or(Ok(()), Err)
489 }
490
491 #[doc(alias = "accept_invitation")]
495 pub async fn join(&self) -> Result<()> {
496 let prev_room_state = self.inner.state();
497
498 if prev_room_state == RoomState::Joined {
499 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
500 "Invited or Left",
501 prev_room_state,
502 ))));
503 }
504
505 self.client.join_room_by_id(self.room_id()).await?;
506
507 Ok(())
508 }
509
510 pub fn client(&self) -> Client {
514 self.client.clone()
515 }
516
517 pub fn is_synced(&self) -> bool {
520 self.inner.is_state_fully_synced()
521 }
522
523 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
553 let Some(url) = self.avatar_url() else { return Ok(None) };
554 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
555 Ok(Some(self.client.media().get_media_content(&request, true).await?))
556 }
557
558 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
587 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
588 let room_id = self.inner.room_id();
589 let request = options.into_request(room_id);
590 let http_response = self.client.send(request).await?;
591
592 let push_ctx = self.push_context().await?;
593 let chunk = join_all(
594 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
595 )
596 .await;
597
598 Ok(Messages {
599 start: http_response.start,
600 end: http_response.end,
601 chunk,
602 state: http_response.state,
603 })
604 }
605
606 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
616 where
617 Ev: SyncEvent + DeserializeOwned + Send + 'static,
618 H: EventHandler<Ev, Ctx>,
619 {
620 self.client.add_room_event_handler(self.room_id(), handler)
621 }
622
623 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
628 self.client.subscribe_to_room_updates(self.room_id())
629 }
630
631 pub fn subscribe_to_typing_notifications(
637 &self,
638 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
639 let (sender, receiver) = broadcast::channel(16);
640 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
641 let own_user_id = self.own_user_id().to_owned();
642 move |event: SyncTypingEvent| async move {
643 let typing_user_ids = event
645 .content
646 .user_ids
647 .into_iter()
648 .filter(|user_id| *user_id != own_user_id)
649 .collect();
650 let _ = sender.send(typing_user_ids);
652 }
653 });
654 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
655 (drop_guard, receiver)
656 }
657
658 #[cfg(feature = "e2e-encryption")]
681 pub async fn subscribe_to_identity_status_changes(
682 &self,
683 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>>> {
684 IdentityStatusChanges::create_stream(self.clone()).await
685 }
686
687 #[cfg(not(feature = "experimental-encrypted-state-events"))]
692 #[allow(clippy::unused_async)] async fn try_decrypt_event(
694 &self,
695 event: Raw<AnyTimelineEvent>,
696 push_ctx: Option<&PushContext>,
697 ) -> TimelineEvent {
698 #[cfg(feature = "e2e-encryption")]
699 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
700 SyncMessageLikeEvent::Original(_),
701 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
702 {
703 if let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await {
704 return event;
705 }
706 }
707
708 let mut event = TimelineEvent::from_plaintext(event.cast());
709 if let Some(push_ctx) = push_ctx {
710 event.set_push_actions(push_ctx.for_event(event.raw()).await);
711 }
712
713 event
714 }
715
716 #[cfg(feature = "experimental-encrypted-state-events")]
721 #[allow(clippy::unused_async)] async fn try_decrypt_event(
723 &self,
724 event: Raw<AnyTimelineEvent>,
725 push_ctx: Option<&PushContext>,
726 ) -> TimelineEvent {
727 match event.deserialize_as::<AnySyncTimelineEvent>() {
729 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
730 SyncMessageLikeEvent::Original(_),
731 ))) => {
732 if let Ok(event) = self
733 .decrypt_event(
734 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
735 push_ctx,
736 )
737 .await
738 {
739 return event;
740 }
741 }
742 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
743 SyncStateEvent::Original(_),
744 ))) => {
745 if let Ok(event) = self
746 .decrypt_event(
747 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
748 push_ctx,
749 )
750 .await
751 {
752 return event;
753 }
754 }
755 _ => {}
756 }
757
758 let mut event = TimelineEvent::from_plaintext(event.cast());
759 if let Some(push_ctx) = push_ctx {
760 event.set_push_actions(push_ctx.for_event(event.raw()).await);
761 }
762
763 event
764 }
765
766 pub async fn event(
771 &self,
772 event_id: &EventId,
773 request_config: Option<RequestConfig>,
774 ) -> Result<TimelineEvent> {
775 let request =
776 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
777
778 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
779 let push_ctx = self.push_context().await?;
780 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
781
782 if let Ok((cache, _handles)) = self.event_cache().await {
784 cache.save_events([event.clone()]).await;
785 }
786
787 Ok(event)
788 }
789
790 pub async fn load_or_fetch_event(
797 &self,
798 event_id: &EventId,
799 request_config: Option<RequestConfig>,
800 ) -> Result<TimelineEvent> {
801 match self.event_cache().await {
802 Ok((event_cache, _drop_handles)) => {
803 if let Some(event) = event_cache.find_event(event_id).await {
804 return Ok(event);
805 }
806 }
808 Err(err) => {
809 debug!("error when getting the event cache: {err}");
810 }
811 }
812 self.event(event_id, request_config).await
813 }
814
815 pub async fn event_with_context(
818 &self,
819 event_id: &EventId,
820 lazy_load_members: bool,
821 context_size: UInt,
822 request_config: Option<RequestConfig>,
823 ) -> Result<EventWithContextResponse> {
824 let mut request =
825 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
826
827 request.limit = context_size;
828
829 if lazy_load_members {
830 request.filter.lazy_load_options =
831 LazyLoadOptions::Enabled { include_redundant_members: false };
832 }
833
834 let response = self.client.send(request).with_request_config(request_config).await?;
835
836 let push_ctx = self.push_context().await?;
837 let push_ctx = push_ctx.as_ref();
838 let target_event = if let Some(event) = response.event {
839 Some(self.try_decrypt_event(event, push_ctx).await)
840 } else {
841 None
842 };
843
844 let (events_before, events_after) = join!(
848 join_all(
849 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
850 ),
851 join_all(
852 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
853 ),
854 );
855
856 if let Ok((cache, _handles)) = self.event_cache().await {
858 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
859 if let Some(event) = &target_event {
860 events_to_save.push(event.clone());
861 }
862
863 for event in &events_before {
864 events_to_save.push(event.clone());
865 }
866
867 for event in &events_after {
868 events_to_save.push(event.clone());
869 }
870
871 cache.save_events(events_to_save).await;
872 }
873
874 Ok(EventWithContextResponse {
875 event: target_event,
876 events_before,
877 events_after,
878 state: response.state,
879 prev_batch_token: response.start,
880 next_batch_token: response.end,
881 })
882 }
883
884 pub(crate) async fn request_members(&self) -> Result<()> {
885 self.client
886 .locks()
887 .members_request_deduplicated_handler
888 .run(self.room_id().to_owned(), async move {
889 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
890 let response = self
891 .client
892 .send(request.clone())
893 .with_request_config(
894 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
897 )
898 .await?;
899
900 Box::pin(self.client.base_client().receive_all_members(
902 self.room_id(),
903 &request,
904 &response,
905 ))
906 .await?;
907
908 Ok(())
909 })
910 .await
911 }
912
913 pub async fn request_encryption_state(&self) -> Result<()> {
918 if !self.inner.encryption_state().is_unknown() {
919 return Ok(());
920 }
921
922 self.client
923 .locks()
924 .encryption_state_deduplicated_handler
925 .run(self.room_id().to_owned(), async move {
926 let request = get_state_event_for_key::v3::Request::new(
928 self.room_id().to_owned(),
929 StateEventType::RoomEncryption,
930 "".to_owned(),
931 );
932 let response = match self.client.send(request).await {
933 Ok(response) => Some(
934 response
935 .into_content()
936 .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
937 ),
938 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
939 Err(err) => return Err(err.into()),
940 };
941
942 let _sync_lock = self.client.base_client().sync_lock().lock().await;
943
944 let mut room_info = self.clone_info();
947 room_info.mark_encryption_state_synced();
948 room_info.set_encryption_event(response.clone());
949 let mut changes = StateChanges::default();
950 changes.add_room(room_info.clone());
951
952 self.client.state_store().save_changes(&changes).await?;
953 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
954
955 Ok(())
956 })
957 .await
958 }
959
960 pub fn encryption_state(&self) -> EncryptionState {
965 self.inner.encryption_state()
966 }
967
968 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
974 self.request_encryption_state().await?;
975
976 Ok(self.encryption_state())
977 }
978
979 #[cfg(feature = "e2e-encryption")]
981 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
982 let encryption = self.client.encryption();
983
984 let this_device_is_verified = match encryption.get_own_device().await {
985 Ok(Some(device)) => device.is_verified_with_cross_signing(),
986
987 _ => true,
989 };
990
991 let backup_exists_on_server =
992 encryption.backups().exists_on_server().await.unwrap_or(false);
993
994 CryptoContextInfo {
995 device_creation_ts: encryption.device_creation_timestamp().await,
996 this_device_is_verified,
997 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
998 backup_exists_on_server,
999 }
1000 }
1001
1002 fn are_events_visible(&self) -> bool {
1003 if let RoomState::Invited = self.inner.state() {
1004 return matches!(
1005 self.inner.history_visibility_or_default(),
1006 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1007 );
1008 }
1009
1010 true
1011 }
1012
1013 pub async fn sync_members(&self) -> Result<()> {
1019 if !self.are_events_visible() {
1020 return Ok(());
1021 }
1022
1023 if !self.are_members_synced() {
1024 self.request_members().await
1025 } else {
1026 Ok(())
1027 }
1028 }
1029
1030 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1044 self.sync_members().await?;
1045 self.get_member_no_sync(user_id).await
1046 }
1047
1048 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1062 Ok(self
1063 .inner
1064 .get_member(user_id)
1065 .await?
1066 .map(|member| RoomMember::new(self.client.clone(), member)))
1067 }
1068
1069 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1078 self.sync_members().await?;
1079 self.members_no_sync(memberships).await
1080 }
1081
1082 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1091 Ok(self
1092 .inner
1093 .members(memberships)
1094 .await?
1095 .into_iter()
1096 .map(|member| RoomMember::new(self.client.clone(), member))
1097 .collect())
1098 }
1099
1100 pub async fn get_state_events(
1102 &self,
1103 event_type: StateEventType,
1104 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1105 self.client
1106 .state_store()
1107 .get_state_events(self.room_id(), event_type)
1108 .await
1109 .map_err(Into::into)
1110 }
1111
1112 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1129 where
1130 C: StaticEventContent<IsPrefix = ruma::events::False>
1131 + StaticStateEventContent
1132 + RedactContent,
1133 C::Redacted: RedactedStateEventContent,
1134 {
1135 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1136 }
1137
1138 pub async fn get_state_events_for_keys(
1141 &self,
1142 event_type: StateEventType,
1143 state_keys: &[&str],
1144 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1145 self.client
1146 .state_store()
1147 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1148 .await
1149 .map_err(Into::into)
1150 }
1151
1152 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1172 &self,
1173 state_keys: I,
1174 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1175 where
1176 C: StaticEventContent<IsPrefix = ruma::events::False>
1177 + StaticStateEventContent
1178 + RedactContent,
1179 C::StateKey: Borrow<K>,
1180 C::Redacted: RedactedStateEventContent,
1181 K: AsRef<str> + Sized + Sync + 'a,
1182 I: IntoIterator<Item = &'a K> + Send,
1183 I::IntoIter: Send,
1184 {
1185 Ok(self
1186 .client
1187 .state_store()
1188 .get_state_events_for_keys_static(self.room_id(), state_keys)
1189 .await?)
1190 }
1191
1192 pub async fn get_state_event(
1194 &self,
1195 event_type: StateEventType,
1196 state_key: &str,
1197 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1198 self.client
1199 .state_store()
1200 .get_state_event(self.room_id(), event_type, state_key)
1201 .await
1202 .map_err(Into::into)
1203 }
1204
1205 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1224 where
1225 C: StaticEventContent<IsPrefix = ruma::events::False>
1226 + StaticStateEventContent<StateKey = EmptyStateKey>
1227 + RedactContent,
1228 C::Redacted: RedactedStateEventContent,
1229 {
1230 self.get_state_event_static_for_key(&EmptyStateKey).await
1231 }
1232
1233 pub async fn get_state_event_static_for_key<C, K>(
1253 &self,
1254 state_key: &K,
1255 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1256 where
1257 C: StaticEventContent<IsPrefix = ruma::events::False>
1258 + StaticStateEventContent
1259 + RedactContent,
1260 C::StateKey: Borrow<K>,
1261 C::Redacted: RedactedStateEventContent,
1262 K: AsRef<str> + ?Sized + Sync,
1263 {
1264 Ok(self
1265 .client
1266 .state_store()
1267 .get_state_event_static_for_key(self.room_id(), state_key)
1268 .await?)
1269 }
1270
1271 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1275 Ok(self
1280 .get_state_events_static::<SpaceParentEventContent>()
1281 .await?
1282 .into_iter()
1283 .filter_map(|parent_event| match parent_event.deserialize() {
1285 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1286 Some((e.state_key.to_owned(), e.sender))
1287 }
1288 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1289 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1290 Err(e) => {
1291 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1292 None
1293 }
1294 })
1295 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1297 let Some(parent_room) = self.client.get_room(&state_key) else {
1298 return Ok(ParentSpace::Unverifiable(state_key));
1301 };
1302 if let Some(child_event) = parent_room
1305 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1306 .await?
1307 {
1308 match child_event.deserialize() {
1309 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1310 return Ok(ParentSpace::Reciprocal(parent_room));
1313 }
1314 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1315 Ok(SyncOrStrippedState::Stripped(_)) => {}
1316 Err(e) => {
1317 info!(
1318 room_id = ?self.room_id(), parent_room_id = ?state_key,
1319 "Could not deserialize m.space.child: {e}"
1320 );
1321 }
1322 }
1323 }
1328
1329 let Some(member) = parent_room.get_member(&sender).await? else {
1332 return Ok(ParentSpace::Illegitimate(parent_room));
1334 };
1335
1336 if member.can_send_state(StateEventType::SpaceChild) {
1337 Ok(ParentSpace::WithPowerlevel(parent_room))
1339 } else {
1340 Ok(ParentSpace::Illegitimate(parent_room))
1341 }
1342 })
1343 .collect::<FuturesUnordered<_>>())
1344 }
1345
1346 pub async fn account_data(
1348 &self,
1349 data_type: RoomAccountDataEventType,
1350 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1351 self.client
1352 .state_store()
1353 .get_room_account_data_event(self.room_id(), data_type)
1354 .await
1355 .map_err(Into::into)
1356 }
1357
1358 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1377 where
1378 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1379 {
1380 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1381 }
1382
1383 #[cfg(feature = "e2e-encryption")]
1388 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1389 let user_ids = self
1390 .client
1391 .state_store()
1392 .get_user_ids(self.room_id(), RoomMemberships::empty())
1393 .await?;
1394
1395 for user_id in user_ids {
1396 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1397 let any_unverified = devices.devices().any(|d| !d.is_verified());
1398
1399 if any_unverified {
1400 return Ok(false);
1401 }
1402 }
1403
1404 Ok(true)
1405 }
1406
1407 pub async fn set_account_data<T>(
1422 &self,
1423 content: T,
1424 ) -> Result<set_room_account_data::v3::Response>
1425 where
1426 T: RoomAccountDataEventContent,
1427 {
1428 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1429
1430 let request = set_room_account_data::v3::Request::new(
1431 own_user.to_owned(),
1432 self.room_id().to_owned(),
1433 &content,
1434 )?;
1435
1436 Ok(self.client.send(request).await?)
1437 }
1438
1439 pub async fn set_account_data_raw(
1464 &self,
1465 event_type: RoomAccountDataEventType,
1466 content: Raw<AnyRoomAccountDataEventContent>,
1467 ) -> Result<set_room_account_data::v3::Response> {
1468 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1469
1470 let request = set_room_account_data::v3::Request::new_raw(
1471 own_user.to_owned(),
1472 self.room_id().to_owned(),
1473 event_type,
1474 content,
1475 );
1476
1477 Ok(self.client.send(request).await?)
1478 }
1479
1480 pub async fn set_tag(
1511 &self,
1512 tag: TagName,
1513 tag_info: TagInfo,
1514 ) -> Result<create_tag::v3::Response> {
1515 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1516 let request = create_tag::v3::Request::new(
1517 user_id.to_owned(),
1518 self.inner.room_id().to_owned(),
1519 tag.to_string(),
1520 tag_info,
1521 );
1522 Ok(self.client.send(request).await?)
1523 }
1524
1525 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1532 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1533 let request = delete_tag::v3::Request::new(
1534 user_id.to_owned(),
1535 self.inner.room_id().to_owned(),
1536 tag.to_string(),
1537 );
1538 Ok(self.client.send(request).await?)
1539 }
1540
1541 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1551 if is_favourite {
1552 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1553
1554 self.set_tag(TagName::Favorite, tag_info).await?;
1555
1556 if self.is_low_priority() {
1557 self.remove_tag(TagName::LowPriority).await?;
1558 }
1559 } else {
1560 self.remove_tag(TagName::Favorite).await?;
1561 }
1562 Ok(())
1563 }
1564
1565 pub async fn set_is_low_priority(
1575 &self,
1576 is_low_priority: bool,
1577 tag_order: Option<f64>,
1578 ) -> Result<()> {
1579 if is_low_priority {
1580 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1581
1582 self.set_tag(TagName::LowPriority, tag_info).await?;
1583
1584 if self.is_favourite() {
1585 self.remove_tag(TagName::Favorite).await?;
1586 }
1587 } else {
1588 self.remove_tag(TagName::LowPriority).await?;
1589 }
1590 Ok(())
1591 }
1592
1593 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1602 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1603
1604 let mut content = self
1605 .client
1606 .account()
1607 .account_data::<DirectEventContent>()
1608 .await?
1609 .map(|c| c.deserialize())
1610 .transpose()?
1611 .unwrap_or_default();
1612
1613 let this_room_id = self.inner.room_id();
1614
1615 if is_direct {
1616 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1617 room_members.retain(|member| member.user_id() != self.own_user_id());
1618
1619 for member in room_members {
1620 let entry = content.entry(member.user_id().into()).or_default();
1621 if !entry.iter().any(|room_id| room_id == this_room_id) {
1622 entry.push(this_room_id.to_owned());
1623 }
1624 }
1625 } else {
1626 for (_, list) in content.iter_mut() {
1627 list.retain(|room_id| *room_id != this_room_id);
1628 }
1629
1630 content.retain(|_, list| !list.is_empty());
1632 }
1633
1634 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1635
1636 self.client.send(request).await?;
1637 Ok(())
1638 }
1639
1640 #[cfg(feature = "e2e-encryption")]
1648 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1649 pub async fn decrypt_event(
1650 &self,
1651 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1652 push_ctx: Option<&PushContext>,
1653 ) -> Result<TimelineEvent> {
1654 let machine = self.client.olm_machine().await;
1655 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1656
1657 match machine
1658 .try_decrypt_room_event(
1659 event.cast_ref(),
1660 self.inner.room_id(),
1661 self.client.decryption_settings(),
1662 )
1663 .await?
1664 {
1665 RoomEventDecryptionResult::Decrypted(decrypted) => {
1666 let push_actions = if let Some(push_ctx) = push_ctx {
1667 Some(push_ctx.for_event(&decrypted.event).await)
1668 } else {
1669 None
1670 };
1671 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1672 }
1673 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1674 self.client
1675 .encryption()
1676 .backups()
1677 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1678 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1679 }
1680 }
1681 }
1682
1683 #[cfg(feature = "experimental-encrypted-state-events")]
1691 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1692 &self,
1693 event: &Raw<T>,
1694 push_ctx: Option<&PushContext>,
1695 ) -> Result<TimelineEvent> {
1696 let machine = self.client.olm_machine().await;
1697 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1698
1699 match machine
1700 .try_decrypt_room_event(
1701 event.cast_ref(),
1702 self.inner.room_id(),
1703 self.client.decryption_settings(),
1704 )
1705 .await?
1706 {
1707 RoomEventDecryptionResult::Decrypted(decrypted) => {
1708 let push_actions = if let Some(push_ctx) = push_ctx {
1709 Some(push_ctx.for_event(&decrypted.event).await)
1710 } else {
1711 None
1712 };
1713 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1714 }
1715 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1716 self.client
1717 .encryption()
1718 .backups()
1719 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1720 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1723 }
1724 }
1725 }
1726
1727 #[cfg(feature = "e2e-encryption")]
1740 pub async fn get_encryption_info(
1741 &self,
1742 session_id: &str,
1743 sender: &UserId,
1744 ) -> Option<Arc<EncryptionInfo>> {
1745 let machine = self.client.olm_machine().await;
1746 let machine = machine.as_ref()?;
1747 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1748 }
1749
1750 #[cfg(feature = "e2e-encryption")]
1763 pub async fn discard_room_key(&self) -> Result<()> {
1764 let machine = self.client.olm_machine().await;
1765 if let Some(machine) = machine.as_ref() {
1766 machine.discard_room_key(self.inner.room_id()).await?;
1767 Ok(())
1768 } else {
1769 Err(Error::NoOlmMachine)
1770 }
1771 }
1772
1773 #[instrument(skip_all)]
1781 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1782 let request = assign!(
1783 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1784 { reason: reason.map(ToOwned::to_owned) }
1785 );
1786 self.client.send(request).await?;
1787 Ok(())
1788 }
1789
1790 #[instrument(skip_all)]
1798 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1799 let request = assign!(
1800 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1801 { reason: reason.map(ToOwned::to_owned) }
1802 );
1803 self.client.send(request).await?;
1804 Ok(())
1805 }
1806
1807 #[instrument(skip_all)]
1816 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1817 let request = assign!(
1818 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1819 { reason: reason.map(ToOwned::to_owned) }
1820 );
1821 self.client.send(request).await?;
1822 Ok(())
1823 }
1824
1825 #[instrument(skip_all)]
1831 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1832 #[cfg(feature = "e2e-encryption")]
1833 if self.client.inner.enable_share_history_on_invite {
1834 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1835 }
1836
1837 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1838 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1839 self.client.send(request).await?;
1840
1841 self.mark_members_missing();
1845
1846 Ok(())
1847 }
1848
1849 #[instrument(skip_all)]
1855 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1856 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1857 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1858 self.client.send(request).await?;
1859
1860 self.mark_members_missing();
1864
1865 Ok(())
1866 }
1867
1868 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1903 self.ensure_room_joined()?;
1904
1905 let send = if let Some(typing_time) =
1908 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1909 {
1910 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1911 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1915 } else {
1916 !typing
1918 }
1919 } else {
1920 typing
1923 };
1924
1925 if send {
1926 self.send_typing_notice(typing).await?;
1927 }
1928
1929 Ok(())
1930 }
1931
1932 #[instrument(name = "typing_notice", skip(self))]
1933 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1934 let typing = if typing {
1935 self.client
1936 .inner
1937 .typing_notice_times
1938 .write()
1939 .unwrap()
1940 .insert(self.room_id().to_owned(), Instant::now());
1941 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1942 } else {
1943 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1944 Typing::No
1945 };
1946
1947 let request = create_typing_event::v3::Request::new(
1948 self.own_user_id().to_owned(),
1949 self.room_id().to_owned(),
1950 typing,
1951 );
1952
1953 self.client.send(request).await?;
1954
1955 Ok(())
1956 }
1957
1958 #[instrument(skip_all)]
1975 pub async fn send_single_receipt(
1976 &self,
1977 receipt_type: create_receipt::v3::ReceiptType,
1978 thread: ReceiptThread,
1979 event_id: OwnedEventId,
1980 ) -> Result<()> {
1981 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1984
1985 self.client
1986 .inner
1987 .locks
1988 .read_receipt_deduplicated_handler
1989 .run((request_key, event_id.clone()), async {
1990 let is_unthreaded = thread == ReceiptThread::Unthreaded;
1992
1993 let mut request = create_receipt::v3::Request::new(
1994 self.room_id().to_owned(),
1995 receipt_type,
1996 event_id,
1997 );
1998 request.thread = thread;
1999
2000 self.client.send(request).await?;
2001
2002 if is_unthreaded {
2003 self.set_unread_flag(false).await?;
2004 }
2005
2006 Ok(())
2007 })
2008 .await
2009 }
2010
2011 #[instrument(skip_all)]
2021 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2022 if receipts.is_empty() {
2023 return Ok(());
2024 }
2025
2026 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2027 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2028 fully_read,
2029 read_receipt: public_read_receipt,
2030 private_read_receipt,
2031 });
2032
2033 self.client.send(request).await?;
2034
2035 self.set_unread_flag(false).await?;
2036
2037 Ok(())
2038 }
2039
2040 #[allow(unused_variables, unused_mut)]
2044 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2045 use ruma::{
2046 events::room::encryption::RoomEncryptionEventContent, EventEncryptionAlgorithm,
2047 };
2048 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2049
2050 if !self.latest_encryption_state().await?.is_encrypted() {
2051 let mut content =
2052 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2053 #[cfg(feature = "experimental-encrypted-state-events")]
2054 if encrypted_state_events {
2055 content = content.with_encrypted_state();
2056 }
2057 self.send_state_event(content).await?;
2058
2059 let res = timeout(
2066 async {
2067 loop {
2068 self.client.inner.sync_beat.listen().await;
2070 let _sync_lock = self.client.base_client().sync_lock().lock().await;
2071 if !self.inner.encryption_state().is_unknown() {
2072 break;
2073 }
2074 }
2075 },
2076 SYNC_WAIT_TIME,
2077 )
2078 .await;
2079
2080 let _sync_lock = self.client.base_client().sync_lock().lock().await;
2081
2082 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2084 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2085 debug!("room successfully marked as encrypted");
2086 return Ok(());
2087 }
2088
2089 #[cfg(feature = "experimental-encrypted-state-events")]
2091 if res.is_ok() && {
2092 if encrypted_state_events {
2093 self.inner.encryption_state().is_state_encrypted()
2094 } else {
2095 self.inner.encryption_state().is_encrypted()
2096 }
2097 } {
2098 debug!("room successfully marked as encrypted");
2099 return Ok(());
2100 }
2101
2102 debug!("still not marked as encrypted, marking encryption state as missing");
2107
2108 let mut room_info = self.clone_info();
2109 room_info.mark_encryption_state_missing();
2110 let mut changes = StateChanges::default();
2111 changes.add_room(room_info.clone());
2112
2113 self.client.state_store().save_changes(&changes).await?;
2114 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2115 }
2116
2117 Ok(())
2118 }
2119
2120 #[instrument(skip_all)]
2152 pub async fn enable_encryption(&self) -> Result<()> {
2153 self.enable_encryption_inner(false).await
2154 }
2155
2156 #[instrument(skip_all)]
2189 #[cfg(feature = "experimental-encrypted-state-events")]
2190 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2191 self.enable_encryption_inner(true).await
2192 }
2193
2194 #[cfg(feature = "e2e-encryption")]
2203 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2204 async fn preshare_room_key(&self) -> Result<()> {
2205 self.ensure_room_joined()?;
2206
2207 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2209 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2210
2211 self.client
2212 .locks()
2213 .group_session_deduplicated_handler
2214 .run(self.room_id().to_owned(), async move {
2215 {
2216 let members = self
2217 .client
2218 .state_store()
2219 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2220 .await?;
2221 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2222 };
2223
2224 let response = self.share_room_key().await;
2225
2226 if let Err(r) = response {
2230 let machine = self.client.olm_machine().await;
2231 if let Some(machine) = machine.as_ref() {
2232 machine.discard_room_key(self.room_id()).await?;
2233 }
2234 return Err(r);
2235 }
2236
2237 Ok(())
2238 })
2239 .await
2240 }
2241
2242 #[cfg(feature = "e2e-encryption")]
2248 #[instrument(skip_all)]
2249 async fn share_room_key(&self) -> Result<()> {
2250 self.ensure_room_joined()?;
2251
2252 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2253
2254 for request in requests {
2255 let response = self.client.send_to_device(&request).await?;
2256 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2257 }
2258
2259 Ok(())
2260 }
2261
2262 #[instrument(skip_all)]
2271 pub async fn sync_up(&self) {
2272 while !self.is_synced() && self.state() == RoomState::Joined {
2273 let wait_for_beat = self.client.inner.sync_beat.listen();
2274 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2276 }
2277 }
2278
2279 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2349 SendMessageLikeEvent::new(self, content)
2350 }
2351
2352 #[cfg(feature = "e2e-encryption")]
2355 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2356 let olm = self.client.olm_machine().await;
2357 let olm = olm.as_ref().expect("Olm machine wasn't started");
2358
2359 let members =
2360 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2361
2362 let tracked: HashMap<_, _> = olm
2363 .store()
2364 .load_tracked_users()
2365 .await?
2366 .into_iter()
2367 .map(|tracked| (tracked.user_id, tracked.dirty))
2368 .collect();
2369
2370 let members_with_unknown_devices =
2373 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2374
2375 let (req_id, request) =
2376 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2377
2378 if !request.device_keys.is_empty() {
2379 self.client.keys_query(&req_id, request.device_keys).await?;
2380 }
2381
2382 Ok(())
2383 }
2384
2385 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2429 pub fn send_raw<'a>(
2430 &'a self,
2431 event_type: &'a str,
2432 content: impl IntoRawMessageLikeEventContent,
2433 ) -> SendRawMessageLikeEvent<'a> {
2434 SendRawMessageLikeEvent::new(self, event_type, content)
2437 }
2438
2439 #[instrument(skip_all)]
2487 pub fn send_attachment<'a>(
2488 &'a self,
2489 filename: impl Into<String>,
2490 content_type: &'a Mime,
2491 data: Vec<u8>,
2492 config: AttachmentConfig,
2493 ) -> SendAttachment<'a> {
2494 SendAttachment::new(self, filename.into(), content_type, data, config)
2495 }
2496
2497 #[instrument(skip_all)]
2525 pub(super) async fn prepare_and_send_attachment<'a>(
2526 &'a self,
2527 filename: String,
2528 content_type: &'a Mime,
2529 data: Vec<u8>,
2530 mut config: AttachmentConfig,
2531 send_progress: SharedObservable<TransmissionProgress>,
2532 store_in_cache: bool,
2533 ) -> Result<send_message_event::v3::Response> {
2534 self.ensure_room_joined()?;
2535
2536 let txn_id = config.txn_id.take();
2537 let mentions = config.mentions.take();
2538
2539 let thumbnail = config.thumbnail.take();
2540
2541 let thumbnail_cache_info = if store_in_cache {
2543 thumbnail
2544 .as_ref()
2545 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2546 } else {
2547 None
2548 };
2549
2550 #[cfg(feature = "e2e-encryption")]
2551 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2552 self.client
2553 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2554 .await?
2555 } else {
2556 self.client
2557 .media()
2558 .upload_plain_media_and_thumbnail(
2559 content_type,
2560 data.clone(),
2563 thumbnail,
2564 send_progress,
2565 )
2566 .await?
2567 };
2568
2569 #[cfg(not(feature = "e2e-encryption"))]
2570 let (media_source, thumbnail) = self
2571 .client
2572 .media()
2573 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2574 .await?;
2575
2576 if store_in_cache {
2577 let cache_store_lock_guard = self.client.event_cache_store().lock().await?;
2578
2579 debug!("caching the media");
2583 let request =
2584 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2585
2586 if let Err(err) = cache_store_lock_guard
2587 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2588 .await
2589 {
2590 warn!("unable to cache the media after uploading it: {err}");
2591 }
2592
2593 if let Some(((data, height, width), source)) =
2594 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2595 {
2596 debug!("caching the thumbnail");
2597
2598 let request = MediaRequestParameters {
2599 source: source.clone(),
2600 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2601 };
2602
2603 if let Err(err) = cache_store_lock_guard
2604 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2605 .await
2606 {
2607 warn!("unable to cache the media after uploading it: {err}");
2608 }
2609 }
2610 }
2611
2612 let content = self
2613 .make_media_event(
2614 Room::make_attachment_type(
2615 content_type,
2616 filename,
2617 media_source,
2618 config.caption,
2619 config.formatted_caption,
2620 config.info,
2621 thumbnail,
2622 ),
2623 mentions,
2624 config.reply,
2625 )
2626 .await?;
2627
2628 let mut fut = self.send(content);
2629 if let Some(txn_id) = txn_id {
2630 fut = fut.with_transaction_id(txn_id);
2631 }
2632 fut.await
2633 }
2634
2635 #[allow(clippy::too_many_arguments)]
2638 pub(crate) fn make_attachment_type(
2639 content_type: &Mime,
2640 filename: String,
2641 source: MediaSource,
2642 caption: Option<String>,
2643 formatted_caption: Option<FormattedBody>,
2644 info: Option<AttachmentInfo>,
2645 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2646 ) -> MessageType {
2647 make_media_type!(
2648 MessageType,
2649 content_type,
2650 filename,
2651 source,
2652 caption,
2653 formatted_caption,
2654 info,
2655 thumbnail
2656 )
2657 }
2658
2659 pub(crate) async fn make_media_event(
2662 &self,
2663 msg_type: MessageType,
2664 mentions: Option<Mentions>,
2665 reply: Option<Reply>,
2666 ) -> Result<RoomMessageEventContent> {
2667 let mut content = RoomMessageEventContent::new(msg_type);
2668 if let Some(mentions) = mentions {
2669 content = content.add_mentions(mentions);
2670 }
2671 if let Some(reply) = reply {
2672 content = self.make_reply_event(content.into(), reply).await?;
2675 }
2676 Ok(content)
2677 }
2678
2679 #[cfg(feature = "unstable-msc4274")]
2682 #[allow(clippy::too_many_arguments)]
2683 pub(crate) fn make_gallery_item_type(
2684 content_type: &Mime,
2685 filename: String,
2686 source: MediaSource,
2687 caption: Option<String>,
2688 formatted_caption: Option<FormattedBody>,
2689 info: Option<AttachmentInfo>,
2690 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2691 ) -> GalleryItemType {
2692 make_media_type!(
2693 GalleryItemType,
2694 content_type,
2695 filename,
2696 source,
2697 caption,
2698 formatted_caption,
2699 info,
2700 thumbnail
2701 )
2702 }
2703
2704 pub async fn update_power_levels(
2713 &self,
2714 updates: Vec<(&UserId, Int)>,
2715 ) -> Result<send_state_event::v3::Response> {
2716 let mut power_levels = self.power_levels().await?;
2717
2718 for (user_id, new_level) in updates {
2719 if new_level == power_levels.users_default {
2720 power_levels.users.remove(user_id);
2721 } else {
2722 power_levels.users.insert(user_id.to_owned(), new_level);
2723 }
2724 }
2725
2726 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2727 }
2728
2729 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2734 let mut power_levels = self.power_levels().await?;
2735 power_levels.apply(changes)?;
2736 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2737 Ok(())
2738 }
2739
2740 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2744 let creators = self.creators().unwrap_or_default();
2745 let rules = self.clone_info().room_version_rules_or_default();
2746
2747 let default_power_levels =
2748 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2749 let changes = RoomPowerLevelChanges::from(default_power_levels);
2750 self.apply_power_level_changes(changes).await?;
2751 Ok(self.power_levels().await?)
2752 }
2753
2754 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2759 let power_level = self.get_user_power_level(user_id).await?;
2760 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2761 }
2762
2763 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2768 let event = self.power_levels().await?;
2769 Ok(event.for_user(user_id))
2770 }
2771
2772 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2775 let power_levels = self.power_levels().await.ok();
2776 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2777 if let Some(power_levels) = power_levels {
2778 for (id, level) in power_levels.users.into_iter() {
2779 user_power_levels.insert(id, level.into());
2780 }
2781 }
2782 user_power_levels
2783 }
2784
2785 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2787 self.send_state_event(RoomNameEventContent::new(name)).await
2788 }
2789
2790 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2792 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2793 }
2794
2795 pub async fn set_avatar_url(
2801 &self,
2802 url: &MxcUri,
2803 info: Option<avatar::ImageInfo>,
2804 ) -> Result<send_state_event::v3::Response> {
2805 self.ensure_room_joined()?;
2806
2807 let mut room_avatar_event = RoomAvatarEventContent::new();
2808 room_avatar_event.url = Some(url.to_owned());
2809 room_avatar_event.info = info.map(Box::new);
2810
2811 self.send_state_event(room_avatar_event).await
2812 }
2813
2814 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2816 self.send_state_event(RoomAvatarEventContent::new()).await
2817 }
2818
2819 pub async fn upload_avatar(
2827 &self,
2828 mime: &Mime,
2829 data: Vec<u8>,
2830 info: Option<avatar::ImageInfo>,
2831 ) -> Result<send_state_event::v3::Response> {
2832 self.ensure_room_joined()?;
2833
2834 let upload_response = self.client.media().upload(mime, data, None).await?;
2835 let mut info = info.unwrap_or_default();
2836 info.blurhash = upload_response.blurhash;
2837 info.mimetype = Some(mime.to_string());
2838
2839 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2840 }
2841
2842 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2886 #[instrument(skip_all)]
2887 pub async fn send_state_event(
2888 &self,
2889 content: impl StateEventContent<StateKey = EmptyStateKey>,
2890 ) -> Result<send_state_event::v3::Response> {
2891 self.send_state_event_for_key(&EmptyStateKey, content).await
2892 }
2893
2894 #[cfg(feature = "experimental-encrypted-state-events")]
2945 #[instrument(skip_all)]
2946 pub fn send_state_event<'a>(
2947 &'a self,
2948 content: impl StateEventContent<StateKey = EmptyStateKey>,
2949 ) -> SendStateEvent<'a> {
2950 self.send_state_event_for_key(&EmptyStateKey, content)
2951 }
2952
2953 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2994 pub async fn send_state_event_for_key<C, K>(
2995 &self,
2996 state_key: &K,
2997 content: C,
2998 ) -> Result<send_state_event::v3::Response>
2999 where
3000 C: StateEventContent,
3001 C::StateKey: Borrow<K>,
3002 K: AsRef<str> + ?Sized,
3003 {
3004 self.ensure_room_joined()?;
3005 let request =
3006 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3007 let response = self.client.send(request).await?;
3008 Ok(response)
3009 }
3010
3011 #[cfg(feature = "experimental-encrypted-state-events")]
3060 pub fn send_state_event_for_key<'a, C, K>(
3061 &'a self,
3062 state_key: &K,
3063 content: C,
3064 ) -> SendStateEvent<'a>
3065 where
3066 C: StateEventContent,
3067 C::StateKey: Borrow<K>,
3068 K: AsRef<str> + ?Sized,
3069 {
3070 SendStateEvent::new(self, state_key, content)
3071 }
3072
3073 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3108 #[instrument(skip_all)]
3109 pub async fn send_state_event_raw(
3110 &self,
3111 event_type: &str,
3112 state_key: &str,
3113 content: impl IntoRawStateEventContent,
3114 ) -> Result<send_state_event::v3::Response> {
3115 self.ensure_room_joined()?;
3116
3117 let request = send_state_event::v3::Request::new_raw(
3118 self.room_id().to_owned(),
3119 event_type.into(),
3120 state_key.to_owned(),
3121 content.into_raw_state_event_content(),
3122 );
3123
3124 Ok(self.client.send(request).await?)
3125 }
3126
3127 #[cfg(feature = "experimental-encrypted-state-events")]
3169 #[instrument(skip_all)]
3170 pub fn send_state_event_raw<'a>(
3171 &'a self,
3172 event_type: &'a str,
3173 state_key: &'a str,
3174 content: impl IntoRawStateEventContent,
3175 ) -> SendRawStateEvent<'a> {
3176 SendRawStateEvent::new(self, event_type, state_key, content)
3177 }
3178
3179 #[instrument(skip_all)]
3214 pub async fn redact(
3215 &self,
3216 event_id: &EventId,
3217 reason: Option<&str>,
3218 txn_id: Option<OwnedTransactionId>,
3219 ) -> HttpResult<redact_event::v3::Response> {
3220 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3221 let request = assign!(
3222 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3223 { reason: reason.map(ToOwned::to_owned) }
3224 );
3225
3226 self.client.send(request).await
3227 }
3228
3229 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3238 let acl_ev = self
3239 .get_state_event_static::<RoomServerAclEventContent>()
3240 .await?
3241 .and_then(|ev| ev.deserialize().ok());
3242 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3243 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3244 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3245 });
3246
3247 let members: Vec<_> = self
3251 .members_no_sync(RoomMemberships::JOIN)
3252 .await?
3253 .into_iter()
3254 .filter(|member| {
3255 let server = member.user_id().server_name();
3256 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3257 })
3258 .collect();
3259
3260 let max = members
3263 .iter()
3264 .max_by_key(|member| member.power_level())
3265 .filter(|max| max.power_level() >= int!(50))
3266 .map(|member| member.user_id().server_name());
3267
3268 let servers = members
3270 .iter()
3271 .map(|member| member.user_id().server_name())
3272 .filter(|server| max.filter(|max| max == server).is_none())
3273 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3274 *servers.entry(server).or_default() += 1;
3275 servers
3276 });
3277 let mut servers: Vec<_> = servers.into_iter().collect();
3278 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3279
3280 Ok(max
3281 .into_iter()
3282 .chain(servers.into_iter().map(|(name, _)| name))
3283 .take(3)
3284 .map(ToOwned::to_owned)
3285 .collect())
3286 }
3287
3288 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3295 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3296 return Ok(alias.matrix_to_uri());
3297 }
3298
3299 let via = self.route().await?;
3300 Ok(self.room_id().matrix_to_uri_via(via))
3301 }
3302
3303 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3314 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3315 return Ok(alias.matrix_uri(join));
3316 }
3317
3318 let via = self.route().await?;
3319 Ok(self.room_id().matrix_uri_via(via, join))
3320 }
3321
3322 pub async fn matrix_to_event_permalink(
3336 &self,
3337 event_id: impl Into<OwnedEventId>,
3338 ) -> Result<MatrixToUri> {
3339 let via = self.route().await?;
3342 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3343 }
3344
3345 pub async fn matrix_event_permalink(
3359 &self,
3360 event_id: impl Into<OwnedEventId>,
3361 ) -> Result<MatrixUri> {
3362 let via = self.route().await?;
3365 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3366 }
3367
3368 pub async fn load_user_receipt(
3381 &self,
3382 receipt_type: ReceiptType,
3383 thread: ReceiptThread,
3384 user_id: &UserId,
3385 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3386 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3387 }
3388
3389 pub async fn load_event_receipts(
3402 &self,
3403 receipt_type: ReceiptType,
3404 thread: ReceiptThread,
3405 event_id: &EventId,
3406 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3407 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3408 }
3409
3410 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3415 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3416 }
3417
3418 pub(crate) async fn push_condition_room_ctx_internal(
3425 &self,
3426 with_threads_subscriptions: bool,
3427 ) -> Result<Option<PushConditionRoomCtx>> {
3428 let room_id = self.room_id();
3429 let user_id = self.own_user_id();
3430 let room_info = self.clone_info();
3431 let member_count = room_info.active_members_count();
3432
3433 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3434 member.name().to_owned()
3435 } else {
3436 return Ok(None);
3437 };
3438
3439 let power_levels = match self.power_levels().await {
3440 Ok(power_levels) => Some(power_levels.into()),
3441 Err(error) => {
3442 if matches!(room_info.state(), RoomState::Joined) {
3443 error!("Could not compute power levels for push conditions: {error}");
3446 }
3447 None
3448 }
3449 };
3450
3451 let mut ctx = assign!(PushConditionRoomCtx::new(
3452 room_id.to_owned(),
3453 UInt::new(member_count).unwrap_or(UInt::MAX),
3454 user_id.to_owned(),
3455 user_display_name,
3456 ),
3457 {
3458 power_levels,
3459 });
3460
3461 if with_threads_subscriptions {
3462 let this = self.clone();
3463 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3464 let room = this.clone();
3465 Box::pin(async move {
3466 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3467 maybe_sub.is_some()
3468 } else {
3469 false
3470 }
3471 })
3472 });
3473 }
3474
3475 Ok(Some(ctx))
3476 }
3477
3478 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3481 self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3482 }
3483
3484 #[instrument(skip(self))]
3488 pub(crate) async fn push_context_internal(
3489 &self,
3490 with_threads_subscriptions: bool,
3491 ) -> Result<Option<PushContext>> {
3492 let Some(push_condition_room_ctx) =
3493 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3494 else {
3495 debug!("Could not aggregate push context");
3496 return Ok(None);
3497 };
3498 let push_rules = self.client().account().push_rules().await?;
3499 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3500 }
3501
3502 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3507 if let Some(ctx) = self.push_context().await? {
3508 Ok(Some(ctx.for_event(event).await))
3509 } else {
3510 Ok(None)
3511 }
3512 }
3513
3514 pub async fn invite_details(&self) -> Result<Invite> {
3517 let state = self.state();
3518
3519 if state != RoomState::Invited {
3520 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3521 }
3522
3523 let invitee = self
3524 .get_member_no_sync(self.own_user_id())
3525 .await?
3526 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3527 let event = invitee.event();
3528 let inviter_id = event.sender();
3529 let inviter = self.get_member_no_sync(inviter_id).await?;
3530 Ok(Invite { invitee, inviter })
3531 }
3532
3533 pub async fn member_with_sender_info(
3541 &self,
3542 user_id: &UserId,
3543 ) -> Result<RoomMemberWithSenderInfo> {
3544 let Some(member) = self.get_member_no_sync(user_id).await? else {
3545 return Err(Error::InsufficientData);
3546 };
3547
3548 let sender_member =
3549 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3550 Some(member)
3552 } else if self.are_members_synced() {
3553 None
3555 } else if self.sync_members().await.is_ok() {
3556 self.get_member_no_sync(member.event().sender()).await?
3558 } else {
3559 None
3560 };
3561
3562 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3563 }
3564
3565 pub async fn forget(&self) -> Result<()> {
3571 let state = self.state();
3572 match state {
3573 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3574 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3575 "Left / Banned",
3576 state,
3577 ))));
3578 }
3579 RoomState::Left | RoomState::Banned => {}
3580 }
3581
3582 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3583 let _response = self.client.send(request).await?;
3584
3585 if self.inner.direct_targets_length() != 0 {
3587 if let Err(e) = self.set_is_direct(false).await {
3588 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3591 }
3592 }
3593
3594 self.client.base_client().forget_room(self.inner.room_id()).await?;
3595
3596 Ok(())
3597 }
3598
3599 fn ensure_room_joined(&self) -> Result<()> {
3600 let state = self.state();
3601 if state == RoomState::Joined {
3602 Ok(())
3603 } else {
3604 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3605 }
3606 }
3607
3608 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3610 if !matches!(self.state(), RoomState::Joined) {
3611 return None;
3612 }
3613
3614 let notification_settings = self.client().notification_settings().await;
3615
3616 let notification_mode =
3618 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3619
3620 if notification_mode.is_some() {
3621 notification_mode
3622 } else if let Ok(is_encrypted) =
3623 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3624 {
3625 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3630 let default_mode = notification_settings
3631 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3632 .await;
3633 Some(default_mode)
3634 } else {
3635 None
3636 }
3637 }
3638
3639 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3650 if !matches!(self.state(), RoomState::Joined) {
3651 return None;
3652 }
3653
3654 let notification_settings = self.client().notification_settings().await;
3655
3656 let mode =
3658 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3659
3660 if let Some(mode) = mode {
3661 self.update_cached_user_defined_notification_mode(mode);
3662 }
3663
3664 mode
3665 }
3666
3667 pub async fn report_content(
3680 &self,
3681 event_id: OwnedEventId,
3682 score: Option<ReportedContentScore>,
3683 reason: Option<String>,
3684 ) -> Result<report_content::v3::Response> {
3685 let state = self.state();
3686 if state != RoomState::Joined {
3687 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3688 }
3689
3690 let request = report_content::v3::Request::new(
3691 self.inner.room_id().to_owned(),
3692 event_id,
3693 score.map(Into::into),
3694 reason,
3695 );
3696 Ok(self.client.send(request).await?)
3697 }
3698
3699 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3710 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3711
3712 Ok(self.client.send(request).await?)
3713 }
3714
3715 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3721 if self.is_marked_unread() == unread {
3722 return Ok(());
3724 }
3725
3726 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3727
3728 let content = MarkedUnreadEventContent::new(unread);
3729
3730 let request = set_room_account_data::v3::Request::new(
3731 user_id.to_owned(),
3732 self.inner.room_id().to_owned(),
3733 &content,
3734 )?;
3735
3736 self.client.send(request).await?;
3737 Ok(())
3738 }
3739
3740 pub async fn event_cache(
3743 &self,
3744 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3745 self.client.event_cache().for_room(self.room_id()).await
3746 }
3747
3748 pub(crate) async fn get_user_beacon_info(
3755 &self,
3756 user_id: &UserId,
3757 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3758 let raw_event = self
3759 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3760 .await?
3761 .ok_or(BeaconError::NotFound)?;
3762
3763 match raw_event.deserialize()? {
3764 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3765 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3766 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3767 }
3768 }
3769
3770 pub async fn start_live_location_share(
3783 &self,
3784 duration_millis: u64,
3785 description: Option<String>,
3786 ) -> Result<send_state_event::v3::Response> {
3787 self.ensure_room_joined()?;
3788
3789 self.send_state_event_for_key(
3790 self.own_user_id(),
3791 BeaconInfoEventContent::new(
3792 description,
3793 Duration::from_millis(duration_millis),
3794 true,
3795 None,
3796 ),
3797 )
3798 .await
3799 }
3800
3801 pub async fn stop_live_location_share(
3808 &self,
3809 ) -> Result<send_state_event::v3::Response, BeaconError> {
3810 self.ensure_room_joined()?;
3811
3812 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3813 beacon_info_event.content.stop();
3814 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3815 }
3816
3817 pub async fn send_location_beacon(
3829 &self,
3830 geo_uri: String,
3831 ) -> Result<send_message_event::v3::Response, BeaconError> {
3832 self.ensure_room_joined()?;
3833
3834 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3835
3836 if beacon_info_event.content.is_live() {
3837 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3838 Ok(self.send(content).await?)
3839 } else {
3840 Err(BeaconError::NotLive)
3841 }
3842 }
3843
3844 pub async fn save_composer_draft(
3847 &self,
3848 draft: ComposerDraft,
3849 thread_root: Option<&EventId>,
3850 ) -> Result<()> {
3851 self.client
3852 .state_store()
3853 .set_kv_data(
3854 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3855 StateStoreDataValue::ComposerDraft(draft),
3856 )
3857 .await?;
3858 Ok(())
3859 }
3860
3861 pub async fn load_composer_draft(
3864 &self,
3865 thread_root: Option<&EventId>,
3866 ) -> Result<Option<ComposerDraft>> {
3867 let data = self
3868 .client
3869 .state_store()
3870 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3871 .await?;
3872 Ok(data.and_then(|d| d.into_composer_draft()))
3873 }
3874
3875 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3878 self.client
3879 .state_store()
3880 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3881 .await?;
3882 Ok(())
3883 }
3884
3885 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3888 let response = self
3889 .client
3890 .send(get_state_event_for_key::v3::Request::new(
3891 self.room_id().to_owned(),
3892 StateEventType::RoomPinnedEvents,
3893 "".to_owned(),
3894 ))
3895 .await;
3896
3897 match response {
3898 Ok(response) => Ok(Some(
3899 response
3900 .into_content()
3901 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3902 .pinned,
3903 )),
3904 Err(http_error) => match http_error.as_client_api_error() {
3905 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3906 _ => Err(http_error.into()),
3907 },
3908 }
3909 }
3910
3911 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3919 ObservableLiveLocation::new(&self.client, self.room_id())
3920 }
3921
3922 pub async fn subscribe_to_knock_requests(
3936 &self,
3937 ) -> Result<(impl Stream<Item = Vec<KnockRequest>>, JoinHandle<()>)> {
3938 let this = Arc::new(self.clone());
3939
3940 let room_member_events_observer =
3941 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3942
3943 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3944 let mut seen_request_ids_stream = self
3945 .seen_knock_request_ids_map
3946 .subscribe()
3947 .await
3948 .map(|values| values.unwrap_or_default());
3949
3950 let mut room_info_stream = self.subscribe_info();
3951
3952 let clear_seen_ids_handle = spawn({
3955 let this = self.clone();
3956 async move {
3957 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3958 while member_updates_stream.recv().await.is_ok() {
3959 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3961 warn!("Failed to remove seen knock requests: {err}")
3962 }
3963 }
3964 }
3965 });
3966
3967 let combined_stream = stream! {
3968 match this.get_current_join_requests(¤t_seen_ids).await {
3970 Ok(initial_requests) => yield initial_requests,
3971 Err(err) => warn!("Failed to get initial requests to join: {err}")
3972 }
3973
3974 let mut requests_stream = room_member_events_observer.subscribe();
3975 let mut seen_ids = current_seen_ids.clone();
3976
3977 loop {
3978 tokio::select! {
3981 Some((event, _)) = requests_stream.next() => {
3982 if let Some(event) = event.as_original() {
3983 let emit = if event.prev_content().is_some() {
3985 matches!(event.membership_change(),
3986 MembershipChange::Banned |
3987 MembershipChange::Knocked |
3988 MembershipChange::KnockAccepted |
3989 MembershipChange::KnockDenied |
3990 MembershipChange::KnockRetracted
3991 )
3992 } else {
3993 true
3996 };
3997
3998 if emit {
3999 match this.get_current_join_requests(&seen_ids).await {
4000 Ok(requests) => yield requests,
4001 Err(err) => {
4002 warn!("Failed to get updated knock requests on new member event: {err}")
4003 }
4004 }
4005 }
4006 }
4007 }
4008
4009 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4010 seen_ids = new_seen_ids;
4012
4013 match this.get_current_join_requests(&seen_ids).await {
4016 Ok(requests) => yield requests,
4017 Err(err) => {
4018 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4019 }
4020 }
4021 }
4022
4023 Some(room_info) = room_info_stream.next() => {
4024 if !room_info.are_members_synced() {
4027 match this.get_current_join_requests(&seen_ids).await {
4028 Ok(requests) => yield requests,
4029 Err(err) => {
4030 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4031 }
4032 }
4033 }
4034 }
4035 else => break,
4037 }
4038 }
4039 };
4040
4041 Ok((combined_stream, clear_seen_ids_handle))
4042 }
4043
4044 async fn get_current_join_requests(
4045 &self,
4046 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4047 ) -> Result<Vec<KnockRequest>> {
4048 Ok(self
4049 .members(RoomMemberships::KNOCK)
4050 .await?
4051 .into_iter()
4052 .filter_map(|member| {
4053 let event_id = member.event().event_id()?;
4054 Some(KnockRequest::new(
4055 self,
4056 event_id,
4057 member.event().timestamp(),
4058 KnockRequestMemberInfo::from_member(&member),
4059 seen_request_ids.contains_key(event_id),
4060 ))
4061 })
4062 .collect())
4063 }
4064
4065 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4067 RoomPrivacySettings::new(&self.inner, &self.client)
4068 }
4069
4070 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4078 let request = opts.into_request(self.room_id());
4079
4080 let response = self.client.send(request).await?;
4081
4082 let push_ctx = self.push_context().await?;
4083 let chunk = join_all(
4084 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4085 )
4086 .await;
4087
4088 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4089 }
4090
4091 pub async fn relations(
4105 &self,
4106 event_id: OwnedEventId,
4107 opts: RelationsOptions,
4108 ) -> Result<Relations> {
4109 opts.send(self, event_id).await
4110 }
4111
4112 #[cfg(feature = "experimental-search")]
4115 pub async fn search(
4116 &self,
4117 query: &str,
4118 max_number_of_results: usize,
4119 ) -> Option<Vec<OwnedEventId>> {
4120 let mut search_index_guard = self.client.search_index().lock().await;
4121 search_index_guard.commit_and_reload(self.room_id());
4122 search_index_guard.search(query, max_number_of_results, self.room_id())
4123 }
4124
4125 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4147 pub async fn subscribe_thread(
4148 &self,
4149 thread_root: OwnedEventId,
4150 automatic: Option<OwnedEventId>,
4151 ) -> Result<()> {
4152 let is_automatic = automatic.is_some();
4153
4154 match self
4155 .client
4156 .send(subscribe_thread::unstable::Request::new(
4157 self.room_id().to_owned(),
4158 thread_root.clone(),
4159 automatic,
4160 ))
4161 .await
4162 {
4163 Ok(_response) => {
4164 trace!("Server acknowledged the thread subscription; saving in db");
4165 self.client
4167 .state_store()
4168 .upsert_thread_subscription(
4169 self.room_id(),
4170 &thread_root,
4171 ThreadSubscription { automatic: is_automatic },
4172 )
4173 .await?;
4174
4175 Ok(())
4176 }
4177
4178 Err(err) => {
4179 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4180 trace!("Thread subscription skipped: {err}");
4185 Ok(())
4186 } else {
4187 Err(err.into())
4189 }
4190 }
4191 }
4192 }
4193
4194 pub async fn subscribe_thread_if_needed(
4200 &self,
4201 thread_root: &EventId,
4202 automatic: Option<OwnedEventId>,
4203 ) -> Result<()> {
4204 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4205 if !prev_sub.automatic || automatic.is_some() {
4208 return Ok(());
4209 }
4210 }
4211 self.subscribe_thread(thread_root.to_owned(), automatic).await
4212 }
4213
4214 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4226 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4227 self.client
4228 .send(unsubscribe_thread::unstable::Request::new(
4229 self.room_id().to_owned(),
4230 thread_root.clone(),
4231 ))
4232 .await?;
4233
4234 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4235
4236 self.client.state_store().remove_thread_subscription(self.room_id(), &thread_root).await?;
4238
4239 Ok(())
4240 }
4241
4242 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4259 pub async fn fetch_thread_subscription(
4260 &self,
4261 thread_root: OwnedEventId,
4262 ) -> Result<Option<ThreadSubscription>> {
4263 let result = self
4264 .client
4265 .send(get_thread_subscription::unstable::Request::new(
4266 self.room_id().to_owned(),
4267 thread_root.clone(),
4268 ))
4269 .await;
4270
4271 let subscription = match result {
4272 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4273 Err(http_error) => match http_error.as_client_api_error() {
4274 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4275 _ => return Err(http_error.into()),
4276 },
4277 };
4278
4279 if let Some(sub) = &subscription {
4281 self.client
4282 .state_store()
4283 .upsert_thread_subscription(self.room_id(), &thread_root, *sub)
4284 .await?;
4285 } else {
4286 self.client
4288 .state_store()
4289 .remove_thread_subscription(self.room_id(), &thread_root)
4290 .await?;
4291 }
4292
4293 Ok(subscription)
4294 }
4295
4296 pub async fn load_or_fetch_thread_subscription(
4303 &self,
4304 thread_root: &EventId,
4305 ) -> Result<Option<ThreadSubscription>> {
4306 self.fetch_thread_subscription(thread_root.to_owned()).await
4308 }
4309}
4310
4311#[cfg(feature = "e2e-encryption")]
4312impl RoomIdentityProvider for Room {
4313 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4314 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4315 }
4316
4317 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4318 Box::pin(async {
4319 let members = self
4320 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4321 .await
4322 .unwrap_or_else(|_| Default::default());
4323
4324 let mut ret: Vec<UserIdentity> = Vec::new();
4325 for member in members {
4326 if let Some(i) = self.user_identity(member.user_id()).await {
4327 ret.push(i);
4328 }
4329 }
4330 ret
4331 })
4332 }
4333
4334 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4335 Box::pin(async {
4336 self.client
4337 .encryption()
4338 .get_user_identity(user_id)
4339 .await
4340 .unwrap_or(None)
4341 .map(|u| u.underlying_identity())
4342 })
4343 }
4344}
4345
4346#[derive(Clone, Debug)]
4349pub(crate) struct WeakRoom {
4350 client: WeakClient,
4351 room_id: OwnedRoomId,
4352}
4353
4354impl WeakRoom {
4355 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4357 Self { client, room_id }
4358 }
4359
4360 pub fn get(&self) -> Option<Room> {
4362 self.client.get().and_then(|client| client.get_room(&self.room_id))
4363 }
4364
4365 pub fn room_id(&self) -> &RoomId {
4367 &self.room_id
4368 }
4369}
4370
4371#[derive(Debug, Clone)]
4373pub struct Invite {
4374 pub invitee: RoomMember,
4376 pub inviter: Option<RoomMember>,
4378}
4379
4380#[derive(Error, Debug)]
4381enum InvitationError {
4382 #[error("No membership event found")]
4383 EventMissing,
4384}
4385
4386#[derive(Debug, Clone, Default)]
4388#[non_exhaustive]
4389pub struct Receipts {
4390 pub fully_read: Option<OwnedEventId>,
4392 pub public_read_receipt: Option<OwnedEventId>,
4394 pub private_read_receipt: Option<OwnedEventId>,
4396}
4397
4398impl Receipts {
4399 pub fn new() -> Self {
4401 Self::default()
4402 }
4403
4404 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4413 self.fully_read = event_id.into();
4414 self
4415 }
4416
4417 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4423 self.public_read_receipt = event_id.into();
4424 self
4425 }
4426
4427 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4431 self.private_read_receipt = event_id.into();
4432 self
4433 }
4434
4435 pub fn is_empty(&self) -> bool {
4437 self.fully_read.is_none()
4438 && self.public_read_receipt.is_none()
4439 && self.private_read_receipt.is_none()
4440 }
4441}
4442
4443#[derive(Debug)]
4446pub enum ParentSpace {
4447 Reciprocal(Room),
4450 WithPowerlevel(Room),
4455 Illegitimate(Room),
4458 Unverifiable(OwnedRoomId),
4461}
4462
4463#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4467pub struct ReportedContentScore(i8);
4468
4469impl ReportedContentScore {
4470 pub const MIN: Self = Self(-100);
4474
4475 pub const MAX: Self = Self(0);
4479
4480 pub fn new(value: i8) -> Option<Self> {
4489 value.try_into().ok()
4490 }
4491
4492 pub fn new_saturating(value: i8) -> Self {
4498 if value > Self::MAX {
4499 Self::MAX
4500 } else if value < Self::MIN {
4501 Self::MIN
4502 } else {
4503 Self(value)
4504 }
4505 }
4506
4507 pub fn value(&self) -> i8 {
4509 self.0
4510 }
4511}
4512
4513impl PartialEq<i8> for ReportedContentScore {
4514 fn eq(&self, other: &i8) -> bool {
4515 self.0.eq(other)
4516 }
4517}
4518
4519impl PartialEq<ReportedContentScore> for i8 {
4520 fn eq(&self, other: &ReportedContentScore) -> bool {
4521 self.eq(&other.0)
4522 }
4523}
4524
4525impl PartialOrd<i8> for ReportedContentScore {
4526 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4527 self.0.partial_cmp(other)
4528 }
4529}
4530
4531impl PartialOrd<ReportedContentScore> for i8 {
4532 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4533 self.partial_cmp(&other.0)
4534 }
4535}
4536
4537impl From<ReportedContentScore> for Int {
4538 fn from(value: ReportedContentScore) -> Self {
4539 value.0.into()
4540 }
4541}
4542
4543impl TryFrom<i8> for ReportedContentScore {
4544 type Error = TryFromReportedContentScoreError;
4545
4546 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4547 if value > Self::MAX || value < Self::MIN {
4548 Err(TryFromReportedContentScoreError(()))
4549 } else {
4550 Ok(Self(value))
4551 }
4552 }
4553}
4554
4555impl TryFrom<i16> for ReportedContentScore {
4556 type Error = TryFromReportedContentScoreError;
4557
4558 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4559 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4560 value.try_into()
4561 }
4562}
4563
4564impl TryFrom<i32> for ReportedContentScore {
4565 type Error = TryFromReportedContentScoreError;
4566
4567 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4568 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4569 value.try_into()
4570 }
4571}
4572
4573impl TryFrom<i64> for ReportedContentScore {
4574 type Error = TryFromReportedContentScoreError;
4575
4576 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4577 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4578 value.try_into()
4579 }
4580}
4581
4582impl TryFrom<Int> for ReportedContentScore {
4583 type Error = TryFromReportedContentScoreError;
4584
4585 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4586 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4587 value.try_into()
4588 }
4589}
4590
4591trait EventSource {
4592 fn get_event(
4593 &self,
4594 event_id: &EventId,
4595 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4596}
4597
4598impl EventSource for &Room {
4599 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4600 self.load_or_fetch_event(event_id, None).await
4601 }
4602}
4603
4604#[derive(Debug, Clone, Error)]
4607#[error("out of range conversion attempted")]
4608pub struct TryFromReportedContentScoreError(());
4609
4610#[derive(Debug)]
4613pub struct RoomMemberWithSenderInfo {
4614 pub room_member: RoomMember,
4616 pub sender_info: Option<RoomMember>,
4619}
4620
4621#[cfg(all(test, not(target_family = "wasm")))]
4622mod tests {
4623 use std::collections::BTreeMap;
4624
4625 use matrix_sdk_base::{store::ComposerDraftType, ComposerDraft};
4626 use matrix_sdk_test::{
4627 async_test, event_factory::EventFactory, test_json, JoinedRoomBuilder, StateTestEvent,
4628 SyncResponseBuilder,
4629 };
4630 use ruma::{
4631 event_id,
4632 events::{relation::RelationType, room::member::MembershipState},
4633 int, owned_event_id, room_id, user_id, RoomVersionId,
4634 };
4635 use wiremock::{
4636 matchers::{header, method, path_regex},
4637 Mock, MockServer, ResponseTemplate,
4638 };
4639
4640 use super::ReportedContentScore;
4641 use crate::{
4642 config::RequestConfig,
4643 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4644 test_utils::{
4645 client::mock_matrix_session,
4646 logged_in_client,
4647 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4648 },
4649 Client,
4650 };
4651
4652 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4653 #[async_test]
4654 async fn test_cache_invalidation_while_encrypt() {
4655 use matrix_sdk_base::store::RoomLoadSettings;
4656 use matrix_sdk_test::{message_like_event_content, DEFAULT_TEST_ROOM_ID};
4657
4658 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4659 let session = mock_matrix_session();
4660
4661 let client = Client::builder()
4662 .homeserver_url("http://localhost:1234")
4663 .request_config(RequestConfig::new().disable_retry())
4664 .sqlite_store(&sqlite_path, None)
4665 .build()
4666 .await
4667 .unwrap();
4668 client
4669 .matrix_auth()
4670 .restore_session(session.clone(), RoomLoadSettings::default())
4671 .await
4672 .unwrap();
4673
4674 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4675
4676 let server = MockServer::start().await;
4678 {
4679 Mock::given(method("GET"))
4680 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4681 .and(header("authorization", "Bearer 1234"))
4682 .respond_with(
4683 ResponseTemplate::new(200)
4684 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4685 )
4686 .mount(&server)
4687 .await;
4688 let response = SyncResponseBuilder::default()
4689 .add_joined_room(
4690 JoinedRoomBuilder::default()
4691 .add_state_event(StateTestEvent::Member)
4692 .add_state_event(StateTestEvent::PowerLevels)
4693 .add_state_event(StateTestEvent::Encryption),
4694 )
4695 .build_sync_response();
4696 client.base_client().receive_sync_response(response).await.unwrap();
4697 }
4698
4699 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4700
4701 room.preshare_room_key().await.unwrap();
4703
4704 {
4707 let client = Client::builder()
4708 .homeserver_url("http://localhost:1234")
4709 .request_config(RequestConfig::new().disable_retry())
4710 .sqlite_store(&sqlite_path, None)
4711 .build()
4712 .await
4713 .unwrap();
4714 client
4715 .matrix_auth()
4716 .restore_session(session.clone(), RoomLoadSettings::default())
4717 .await
4718 .unwrap();
4719 client
4720 .encryption()
4721 .enable_cross_process_store_lock("client2".to_owned())
4722 .await
4723 .unwrap();
4724
4725 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4726 assert!(guard.is_some());
4727 }
4728
4729 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4731 assert!(guard.is_some());
4732
4733 let olm = client.olm_machine().await;
4735 let olm = olm.as_ref().expect("Olm machine wasn't started");
4736
4737 let _encrypted_content = olm
4740 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4741 .await
4742 .unwrap();
4743 }
4744
4745 #[test]
4746 fn reported_content_score() {
4747 let score = ReportedContentScore::new(0).unwrap();
4749 assert_eq!(score.value(), 0);
4750 let score = ReportedContentScore::new(-50).unwrap();
4751 assert_eq!(score.value(), -50);
4752 let score = ReportedContentScore::new(-100).unwrap();
4753 assert_eq!(score.value(), -100);
4754 assert_eq!(ReportedContentScore::new(10), None);
4755 assert_eq!(ReportedContentScore::new(-110), None);
4756
4757 let score = ReportedContentScore::new_saturating(0);
4758 assert_eq!(score.value(), 0);
4759 let score = ReportedContentScore::new_saturating(-50);
4760 assert_eq!(score.value(), -50);
4761 let score = ReportedContentScore::new_saturating(-100);
4762 assert_eq!(score.value(), -100);
4763 let score = ReportedContentScore::new_saturating(10);
4764 assert_eq!(score, ReportedContentScore::MAX);
4765 let score = ReportedContentScore::new_saturating(-110);
4766 assert_eq!(score, ReportedContentScore::MIN);
4767
4768 let score = ReportedContentScore::try_from(0i16).unwrap();
4770 assert_eq!(score.value(), 0);
4771 let score = ReportedContentScore::try_from(-100i16).unwrap();
4772 assert_eq!(score.value(), -100);
4773 ReportedContentScore::try_from(10i16).unwrap_err();
4774 ReportedContentScore::try_from(-110i16).unwrap_err();
4775
4776 let score = ReportedContentScore::try_from(0i32).unwrap();
4778 assert_eq!(score.value(), 0);
4779 let score = ReportedContentScore::try_from(-100i32).unwrap();
4780 assert_eq!(score.value(), -100);
4781 ReportedContentScore::try_from(10i32).unwrap_err();
4782 ReportedContentScore::try_from(-110i32).unwrap_err();
4783
4784 let score = ReportedContentScore::try_from(0i64).unwrap();
4786 assert_eq!(score.value(), 0);
4787 let score = ReportedContentScore::try_from(-100i64).unwrap();
4788 assert_eq!(score.value(), -100);
4789 ReportedContentScore::try_from(10i64).unwrap_err();
4790 ReportedContentScore::try_from(-110i64).unwrap_err();
4791
4792 let score = ReportedContentScore::try_from(int!(0)).unwrap();
4794 assert_eq!(score.value(), 0);
4795 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4796 assert_eq!(score.value(), -100);
4797 ReportedContentScore::try_from(int!(10)).unwrap_err();
4798 ReportedContentScore::try_from(int!(-110)).unwrap_err();
4799 }
4800
4801 #[async_test]
4802 async fn test_composer_draft() {
4803 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4804
4805 let client = logged_in_client(None).await;
4806
4807 let response = SyncResponseBuilder::default()
4808 .add_joined_room(JoinedRoomBuilder::default())
4809 .build_sync_response();
4810 client.base_client().receive_sync_response(response).await.unwrap();
4811 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4812
4813 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4814
4815 let draft = ComposerDraft {
4818 plain_text: "Hello, world!".to_owned(),
4819 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4820 draft_type: ComposerDraftType::NewMessage,
4821 };
4822
4823 room.save_composer_draft(draft.clone(), None).await.unwrap();
4824
4825 let thread_root = owned_event_id!("$thread_root:b.c");
4826 let thread_draft = ComposerDraft {
4827 plain_text: "Hello, thread!".to_owned(),
4828 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4829 draft_type: ComposerDraftType::NewMessage,
4830 };
4831
4832 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4833
4834 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4836
4837 assert_eq!(
4839 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4840 Some(thread_draft.clone())
4841 );
4842
4843 room.clear_composer_draft(None).await.unwrap();
4845 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4846
4847 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4849
4850 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4852 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4853 }
4854
4855 #[async_test]
4856 async fn test_mark_join_requests_as_seen() {
4857 let server = MatrixMockServer::new().await;
4858 let client = server.client_builder().build().await;
4859 let event_id = event_id!("$a:b.c");
4860 let room_id = room_id!("!a:b.c");
4861 let user_id = user_id!("@alice:b.c");
4862
4863 let f = EventFactory::new().room(room_id);
4864 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f
4865 .member(user_id)
4866 .membership(MembershipState::Knock)
4867 .event_id(event_id)
4868 .into_raw()]);
4869 let room = server.sync_room(&client, joined_room_builder).await;
4870
4871 let seen_ids =
4873 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4874 assert!(seen_ids.is_empty());
4875
4876 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4878 .await
4879 .expect("Couldn't mark join request as seen");
4880
4881 let seen_ids =
4883 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4884 assert_eq!(seen_ids.len(), 1);
4885 assert_eq!(
4886 seen_ids.into_iter().next().expect("No next value"),
4887 (event_id.to_owned(), user_id.to_owned())
4888 )
4889 }
4890
4891 #[async_test]
4892 async fn test_own_room_membership_with_no_own_member_event() {
4893 let server = MatrixMockServer::new().await;
4894 let client = server.client_builder().build().await;
4895 let room_id = room_id!("!a:b.c");
4896
4897 let room = server.sync_joined_room(&client, room_id).await;
4898
4899 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4902 assert!(error.is_some());
4903 }
4904
4905 #[async_test]
4906 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4907 let server = MatrixMockServer::new().await;
4908 let client = server.client_builder().build().await;
4909 let room_id = room_id!("!a:b.c");
4910 let user_id = user_id!("@example:localhost");
4911
4912 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4913 let joined_room_builder =
4914 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4915 let room = server.sync_room(&client, joined_room_builder).await;
4916
4917 let ret = room
4919 .member_with_sender_info(client.user_id().unwrap())
4920 .await
4921 .expect("Room member info should be available");
4922
4923 assert_eq!(ret.room_member.event().user_id(), user_id);
4925
4926 assert!(ret.sender_info.is_none());
4928 }
4929
4930 #[async_test]
4931 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4932 let server = MatrixMockServer::new().await;
4933 let client = server.client_builder().build().await;
4934 let room_id = room_id!("!a:b.c");
4935 let user_id = user_id!("@example:localhost");
4936
4937 let f = EventFactory::new().room(room_id).sender(user_id);
4938 let joined_room_builder =
4939 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4940 let room = server.sync_room(&client, joined_room_builder).await;
4941
4942 let ret = room
4944 .member_with_sender_info(client.user_id().unwrap())
4945 .await
4946 .expect("Room member info should be available");
4947
4948 assert_eq!(ret.room_member.event().user_id(), user_id);
4950
4951 assert!(ret.sender_info.is_some());
4953 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
4954 }
4955
4956 #[async_test]
4957 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
4958 let server = MatrixMockServer::new().await;
4959 let client = server.client_builder().build().await;
4960 let room_id = room_id!("!a:b.c");
4961 let user_id = user_id!("@example:localhost");
4962 let sender_id = user_id!("@alice:b.c");
4963
4964 let f = EventFactory::new().room(room_id).sender(sender_id);
4965 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4966 f.member(user_id).into_raw(),
4967 f.member(sender_id).into_raw(),
4969 ]);
4970 let room = server.sync_room(&client, joined_room_builder).await;
4971
4972 let ret = room
4974 .member_with_sender_info(client.user_id().unwrap())
4975 .await
4976 .expect("Room member info should be available");
4977
4978 assert_eq!(ret.room_member.event().user_id(), user_id);
4980
4981 assert!(ret.sender_info.is_some());
4983 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
4984 }
4985
4986 #[async_test]
4987 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
4988 let server = MatrixMockServer::new().await;
4989 let client = server.client_builder().build().await;
4990 let room_id = room_id!("!a:b.c");
4991 let user_id = user_id!("@example:localhost");
4992 let sender_id = user_id!("@alice:b.c");
4993
4994 let f = EventFactory::new().room(room_id).sender(sender_id);
4995 let joined_room_builder =
4996 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4997 let room = server.sync_room(&client, joined_room_builder).await;
4998
4999 server
5001 .mock_get_members()
5002 .ok(vec![f.member(sender_id).into_raw()])
5003 .mock_once()
5004 .mount()
5005 .await;
5006
5007 let ret = room
5009 .member_with_sender_info(client.user_id().unwrap())
5010 .await
5011 .expect("Room member info should be available");
5012
5013 assert_eq!(ret.room_member.event().user_id(), user_id);
5015
5016 assert!(ret.sender_info.is_some());
5018 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5019 }
5020
5021 #[async_test]
5022 async fn test_list_threads() {
5023 let server = MatrixMockServer::new().await;
5024 let client = server.client_builder().build().await;
5025
5026 let room_id = room_id!("!a:b.c");
5027 let sender_id = user_id!("@alice:b.c");
5028 let f = EventFactory::new().room(room_id).sender(sender_id);
5029
5030 let eid1 = event_id!("$1");
5031 let eid2 = event_id!("$2");
5032 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5033 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5034
5035 server
5036 .mock_room_threads()
5037 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5038 .mock_once()
5039 .mount()
5040 .await;
5041 server
5042 .mock_room_threads()
5043 .match_from("prev_batch")
5044 .ok(batch2, None)
5045 .mock_once()
5046 .mount()
5047 .await;
5048
5049 let room = server.sync_joined_room(&client, room_id).await;
5050 let result =
5051 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5052 assert_eq!(result.chunk.len(), 1);
5053 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5054 assert!(result.prev_batch_token.is_some());
5055
5056 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5057 let result = room.list_threads(opts).await.expect("Failed to list threads");
5058 assert_eq!(result.chunk.len(), 1);
5059 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5060 assert!(result.prev_batch_token.is_none());
5061 }
5062
5063 #[async_test]
5064 async fn test_relations() {
5065 let server = MatrixMockServer::new().await;
5066 let client = server.client_builder().build().await;
5067
5068 let room_id = room_id!("!a:b.c");
5069 let sender_id = user_id!("@alice:b.c");
5070 let f = EventFactory::new().room(room_id).sender(sender_id);
5071
5072 let target_event_id = owned_event_id!("$target");
5073 let eid1 = event_id!("$1");
5074 let eid2 = event_id!("$2");
5075 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5076 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5077
5078 server
5079 .mock_room_relations()
5080 .match_target_event(target_event_id.clone())
5081 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5082 .mock_once()
5083 .mount()
5084 .await;
5085
5086 server
5087 .mock_room_relations()
5088 .match_target_event(target_event_id.clone())
5089 .match_from("next_batch")
5090 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5091 .mock_once()
5092 .mount()
5093 .await;
5094
5095 let room = server.sync_joined_room(&client, room_id).await;
5096
5097 let mut opts = RelationsOptions {
5099 include_relations: IncludeRelations::AllRelations,
5100 ..Default::default()
5101 };
5102 let result = room
5103 .relations(target_event_id.clone(), opts.clone())
5104 .await
5105 .expect("Failed to list relations the first time");
5106 assert_eq!(result.chunk.len(), 1);
5107 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5108 assert!(result.prev_batch_token.is_none());
5109 assert!(result.next_batch_token.is_some());
5110 assert!(result.recursion_depth.is_none());
5111
5112 opts.from = result.next_batch_token;
5113 let result = room
5114 .relations(target_event_id, opts)
5115 .await
5116 .expect("Failed to list relations the second time");
5117 assert_eq!(result.chunk.len(), 1);
5118 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5119 assert!(result.prev_batch_token.is_none());
5120 assert!(result.next_batch_token.is_none());
5121 assert!(result.recursion_depth.is_none());
5122 }
5123
5124 #[async_test]
5125 async fn test_relations_with_reltype() {
5126 let server = MatrixMockServer::new().await;
5127 let client = server.client_builder().build().await;
5128
5129 let room_id = room_id!("!a:b.c");
5130 let sender_id = user_id!("@alice:b.c");
5131 let f = EventFactory::new().room(room_id).sender(sender_id);
5132
5133 let target_event_id = owned_event_id!("$target");
5134 let eid1 = event_id!("$1");
5135 let eid2 = event_id!("$2");
5136 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5137 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5138
5139 server
5140 .mock_room_relations()
5141 .match_target_event(target_event_id.clone())
5142 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5143 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5144 .mock_once()
5145 .mount()
5146 .await;
5147
5148 server
5149 .mock_room_relations()
5150 .match_target_event(target_event_id.clone())
5151 .match_from("next_batch")
5152 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5153 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5154 .mock_once()
5155 .mount()
5156 .await;
5157
5158 let room = server.sync_joined_room(&client, room_id).await;
5159
5160 let mut opts = RelationsOptions {
5162 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5163 ..Default::default()
5164 };
5165 let result = room
5166 .relations(target_event_id.clone(), opts.clone())
5167 .await
5168 .expect("Failed to list relations the first time");
5169 assert_eq!(result.chunk.len(), 1);
5170 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5171 assert!(result.prev_batch_token.is_none());
5172 assert!(result.next_batch_token.is_some());
5173 assert!(result.recursion_depth.is_none());
5174
5175 opts.from = result.next_batch_token;
5176 let result = room
5177 .relations(target_event_id, opts)
5178 .await
5179 .expect("Failed to list relations the second time");
5180 assert_eq!(result.chunk.len(), 1);
5181 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5182 assert!(result.prev_batch_token.is_none());
5183 assert!(result.next_batch_token.is_none());
5184 assert!(result.recursion_depth.is_none());
5185 }
5186
5187 #[async_test]
5188 async fn test_power_levels_computation() {
5189 let server = MatrixMockServer::new().await;
5190 let client = server.client_builder().build().await;
5191
5192 let room_id = room_id!("!a:b.c");
5193 let sender_id = client.user_id().expect("No session id");
5194 let f = EventFactory::new().room(room_id).sender(sender_id);
5195 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5196
5197 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into_raw();
5199 let power_levels_event = f.power_levels(&mut user_map).state_key("").into_raw();
5200 let room_member_event = f.member(sender_id).into_raw();
5201
5202 let room = server
5204 .sync_room(
5205 &client,
5206 JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event.clone()]),
5207 )
5208 .await;
5209 let ctx = room
5210 .push_condition_room_ctx()
5211 .await
5212 .expect("Failed to get push condition context")
5213 .expect("Could not get push condition context");
5214
5215 assert!(ctx.power_levels.is_none());
5217
5218 let room = server
5220 .sync_room(
5221 &client,
5222 JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event.clone()]),
5223 )
5224 .await;
5225 let ctx = room
5226 .push_condition_room_ctx()
5227 .await
5228 .expect("Failed to get push condition context")
5229 .expect("Could not get push condition context");
5230
5231 assert!(ctx.power_levels.is_none());
5233
5234 let room = server
5236 .sync_room(
5237 &client,
5238 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5239 )
5240 .await;
5241 let ctx = room
5242 .push_condition_room_ctx()
5243 .await
5244 .expect("Failed to get push condition context")
5245 .expect("Could not get push condition context");
5246
5247 assert!(ctx.power_levels.is_some());
5249 }
5250}