1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
39pub use matrix_sdk_base::store::StoredThreadSubscription;
40use matrix_sdk_base::{
41 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
42 StateChanges, StateStoreDataKey, StateStoreDataValue,
43 deserialized_responses::{
44 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
45 },
46 media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
47 store::{StateStoreExt, ThreadSubscriptionStatus},
48};
49#[cfg(feature = "e2e-encryption")]
50use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
51#[cfg(feature = "e2e-encryption")]
52use matrix_sdk_common::BoxFuture;
53use matrix_sdk_common::{
54 deserialized_responses::TimelineEvent,
55 executor::{JoinHandle, spawn},
56 timeout::timeout,
57};
58#[cfg(feature = "experimental-search")]
59use matrix_sdk_search::error::IndexError;
60#[cfg(feature = "experimental-search")]
61#[cfg(doc)]
62use matrix_sdk_search::index::RoomIndex;
63use mime::Mime;
64use reply::Reply;
65#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
66use ruma::events::AnySyncMessageLikeEvent;
67#[cfg(feature = "experimental-encrypted-state-events")]
68use ruma::events::AnySyncStateEvent;
69#[cfg(feature = "unstable-msc4274")]
70use ruma::events::room::message::GalleryItemType;
71#[cfg(feature = "e2e-encryption")]
72use ruma::events::{
73 AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
74};
75use ruma::{
76 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
77 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
78 api::client::{
79 config::{set_global_account_data, set_room_account_data},
80 context,
81 error::ErrorKind,
82 filter::LazyLoadOptions,
83 membership::{
84 Invite3pid, ban_user, forget_room, get_member_events,
85 invite_user::{self, v3::InvitationRecipient},
86 kick_user, leave_room, unban_user,
87 },
88 message::send_message_event,
89 read_marker::set_read_marker,
90 receipt::create_receipt,
91 redact::redact_event,
92 room::{get_room_event, report_content, report_room},
93 state::{get_state_event_for_key, send_state_event},
94 tag::{create_tag, delete_tag},
95 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
96 typing::create_typing_event::{self, v3::Typing},
97 },
98 assign,
99 events::{
100 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
101 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
102 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
103 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
104 StaticStateEventContent, SyncStateEvent,
105 beacon::BeaconEventContent,
106 beacon_info::BeaconInfoEventContent,
107 direct::DirectEventContent,
108 marked_unread::MarkedUnreadEventContent,
109 receipt::{Receipt, ReceiptThread, ReceiptType},
110 room::{
111 ImageInfo, MediaSource, ThumbnailInfo,
112 avatar::{self, RoomAvatarEventContent},
113 encryption::RoomEncryptionEventContent,
114 history_visibility::HistoryVisibility,
115 member::{MembershipChange, SyncRoomMemberEvent},
116 message::{
117 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
118 ImageMessageEventContent, MessageType, RoomMessageEventContent,
119 TextMessageEventContent, UnstableAmplitude, UnstableAudioDetailsContentBlock,
120 UnstableVoiceContentBlock, VideoInfo, VideoMessageEventContent,
121 },
122 name::RoomNameEventContent,
123 pinned_events::RoomPinnedEventsEventContent,
124 power_levels::{
125 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
126 },
127 server_acl::RoomServerAclEventContent,
128 topic::RoomTopicEventContent,
129 },
130 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
131 tag::{TagInfo, TagName},
132 typing::SyncTypingEvent,
133 },
134 int,
135 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
136 serde::Raw,
137 time::Instant,
138};
139#[cfg(feature = "experimental-encrypted-state-events")]
140use ruma::{
141 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
142 serde::JsonCastable,
143};
144use serde::de::DeserializeOwned;
145use thiserror::Error;
146use tokio::{join, sync::broadcast};
147use tracing::{debug, error, info, instrument, trace, warn};
148
149use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
150pub use self::{
151 member::{RoomMember, RoomMemberRole},
152 messages::{
153 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
154 Relations, RelationsOptions, ThreadRoots,
155 },
156};
157#[cfg(doc)]
158use crate::event_cache::EventCache;
159#[cfg(feature = "experimental-encrypted-state-events")]
160use crate::room::futures::{SendRawStateEvent, SendStateEvent};
161use crate::{
162 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
163 attachment::{AttachmentConfig, AttachmentInfo},
164 client::WeakClient,
165 config::RequestConfig,
166 error::{BeaconError, WrongRoomState},
167 event_cache::{self, EventCacheDropHandles, RoomEventCache},
168 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
169 live_location_share::ObservableLiveLocation,
170 media::{MediaFormat, MediaRequestParameters},
171 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
172 room::{
173 knock_requests::{KnockRequest, KnockRequestMemberInfo},
174 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
175 privacy_settings::RoomPrivacySettings,
176 },
177 sync::RoomUpdate,
178 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
179};
180#[cfg(feature = "e2e-encryption")]
181use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
182
183pub mod edit;
184pub mod futures;
185pub mod identity_status_changes;
186pub mod knock_requests;
188mod member;
189mod messages;
190pub mod power_levels;
191pub mod reply;
192
193pub mod calls;
194
195pub mod privacy_settings;
197
198#[cfg(feature = "e2e-encryption")]
199pub(crate) mod shared_room_history;
200
201#[derive(Debug, Clone)]
204pub struct Room {
205 inner: BaseRoom,
206 pub(crate) client: Client,
207}
208
209impl Deref for Room {
210 type Target = BaseRoom;
211
212 fn deref(&self) -> &Self::Target {
213 &self.inner
214 }
215}
216
217const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
218const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub struct ThreadSubscription {
223 pub automatic: bool,
226}
227
228#[derive(Debug)]
230pub struct PushContext {
231 push_condition_room_ctx: PushConditionRoomCtx,
233
234 push_rules: Ruleset,
237}
238
239impl PushContext {
240 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
242 Self { push_condition_room_ctx, push_rules }
243 }
244
245 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
247 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
248 }
249
250 #[doc(hidden)]
253 #[instrument(skip_all)]
254 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
255 let rules = self
256 .push_rules
257 .iter()
258 .filter_map(|r| {
259 if !r.enabled() {
260 return None;
261 }
262
263 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
264
265 let conditions = match r {
266 AnyPushRuleRef::Override(r) => {
267 format!("{:?}", r.conditions)
268 }
269 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
270 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
271 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
272 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
273 _ => "<unknown push rule kind>".to_owned(),
274 };
275
276 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
277 })
278 .collect::<Vec<_>>()
279 .join("\n");
280 trace!("rules:\n\n{rules}\n\n");
281
282 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
283
284 if let Some(found) = found {
285 trace!("rule {} matched", found.rule_id());
286 found.actions().to_owned()
287 } else {
288 trace!("no match");
289 Vec::new()
290 }
291 }
292}
293
294macro_rules! make_media_type {
295 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $info: ident, $thumbnail: ident) => {{
296 let (body, formatted, filename) = match $caption {
300 Some(TextMessageEventContent { body, formatted, .. }) => (body, formatted, Some($filename)),
301 None => ($filename, None, None),
302 };
303
304 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
305
306 match $content_type.type_() {
307 mime::IMAGE => {
308 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
309 mimetype: Some($content_type.as_ref().to_owned()),
310 thumbnail_source,
311 thumbnail_info
312 });
313 let content = assign!(ImageMessageEventContent::new(body, $source), {
314 info: Some(Box::new(info)),
315 formatted,
316 filename
317 });
318 <$t>::Image(content)
319 }
320
321 mime::AUDIO => {
322 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
323 formatted,
324 filename
325 });
326
327 if let Some(AttachmentInfo::Audio(audio_info) | AttachmentInfo::Voice(audio_info)) = &$info &&
328 let Some(duration) = audio_info.duration && let Some(waveform_vec) = &audio_info.waveform {
329 let waveform = waveform_vec
330 .iter()
331 .map(|v| ((*v).clamp(0.0, 1.0) * UnstableAmplitude::MAX as f32) as u16)
332 .map(Into::into)
333 .collect();
334 content.audio =
335 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
336 }
337
338 if matches!($info, Some(AttachmentInfo::Voice(_))) {
339 content.voice = Some(UnstableVoiceContentBlock::new());
340 }
341
342 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
343 audio_info.mimetype = Some($content_type.as_ref().to_owned());
344 let content = content.info(Box::new(audio_info));
345
346 <$t>::Audio(content)
347 }
348
349 mime::VIDEO => {
350 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
351 mimetype: Some($content_type.as_ref().to_owned()),
352 thumbnail_source,
353 thumbnail_info
354 });
355 let content = assign!(VideoMessageEventContent::new(body, $source), {
356 info: Some(Box::new(info)),
357 formatted,
358 filename
359 });
360 <$t>::Video(content)
361 }
362
363 _ => {
364 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
365 mimetype: Some($content_type.as_ref().to_owned()),
366 thumbnail_source,
367 thumbnail_info
368 });
369 let content = assign!(FileMessageEventContent::new(body, $source), {
370 info: Some(Box::new(info)),
371 formatted,
372 filename,
373 });
374 <$t>::File(content)
375 }
376 }
377 }};
378}
379
380impl Room {
381 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
388 Self { inner: room, client }
389 }
390
391 #[doc(alias = "reject_invitation")]
397 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
398 async fn leave_impl(&self) -> (Result<()>, &Room) {
399 let state = self.state();
400 if state == RoomState::Left {
401 return (
402 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
403 "Joined or Invited",
404 state,
405 )))),
406 self,
407 );
408 }
409
410 let should_forget = matches!(self.state(), RoomState::Invited);
413
414 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
415 let response = self.client.send(request).await;
416
417 if let Err(error) = response {
420 #[allow(clippy::collapsible_match)]
421 let ignore_error = if let Some(error) = error.client_api_error_kind() {
422 match error {
423 ErrorKind::Forbidden { .. } => true,
426 _ => false,
427 }
428 } else {
429 false
430 };
431
432 error!(?error, ignore_error, should_forget, "Failed to leave the room");
433
434 if !ignore_error {
435 return (Err(error.into()), self);
436 }
437 }
438
439 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
440 return (Err(e.into()), self);
441 }
442
443 if should_forget {
444 trace!("Trying to forget the room");
445
446 if let Err(error) = self.forget().await {
447 error!(?error, "Failed to forget the room");
448 }
449 }
450
451 (Ok(()), self)
452 }
453
454 pub async fn leave(&self) -> Result<()> {
462 let mut rooms: Vec<Room> = vec![self.clone()];
463 let mut current_room = self;
464
465 while let Some(predecessor) = current_room.predecessor_room() {
466 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
467
468 if let Some(predecessor_room) = maybe_predecessor_room {
469 rooms.push(predecessor_room.clone());
470 current_room = rooms.last().expect("Room just pushed so can't be empty");
471 } else {
472 warn!("Cannot find predecessor room");
473 break;
474 }
475 }
476
477 let batch_size = 5;
478
479 let rooms_futures: Vec<_> = rooms
480 .iter()
481 .filter_map(|room| match room.state() {
482 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
483 Some(room.leave_impl())
484 }
485 RoomState::Banned | RoomState::Left => None,
486 })
487 .collect();
488
489 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
490
491 let mut maybe_this_room_failed_with: Option<Error> = None;
492
493 while let Some(result) = futures_stream.next().await {
494 if let (Err(e), room) = result {
495 if room.room_id() == self.room_id() {
496 maybe_this_room_failed_with = Some(e);
497 } else {
498 warn!("Failure while attempting to leave predecessor room: {e:?}");
499 }
500 }
501 }
502
503 maybe_this_room_failed_with.map_or(Ok(()), Err)
504 }
505
506 #[doc(alias = "accept_invitation")]
510 pub async fn join(&self) -> Result<()> {
511 let prev_room_state = self.inner.state();
512
513 if prev_room_state == RoomState::Joined {
514 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
515 "Invited or Left",
516 prev_room_state,
517 ))));
518 }
519
520 self.client.join_room_by_id(self.room_id()).await?;
521
522 Ok(())
523 }
524
525 pub fn client(&self) -> Client {
529 self.client.clone()
530 }
531
532 pub fn is_synced(&self) -> bool {
535 self.inner.is_state_fully_synced()
536 }
537
538 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
568 let Some(url) = self.avatar_url() else { return Ok(None) };
569 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
570 Ok(Some(self.client.media().get_media_content(&request, true).await?))
571 }
572
573 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
602 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
603 let room_id = self.inner.room_id();
604 let request = options.into_request(room_id);
605 let http_response = self.client.send(request).await?;
606
607 let push_ctx = self.push_context().await?;
608 let chunk = join_all(
609 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
610 )
611 .await;
612
613 Ok(Messages {
614 start: http_response.start,
615 end: http_response.end,
616 chunk,
617 state: http_response.state,
618 })
619 }
620
621 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
631 where
632 Ev: SyncEvent + DeserializeOwned + Send + 'static,
633 H: EventHandler<Ev, Ctx>,
634 {
635 self.client.add_room_event_handler(self.room_id(), handler)
636 }
637
638 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
643 self.client.subscribe_to_room_updates(self.room_id())
644 }
645
646 pub fn subscribe_to_typing_notifications(
652 &self,
653 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
654 let (sender, receiver) = broadcast::channel(16);
655 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
656 let own_user_id = self.own_user_id().to_owned();
657 move |event: SyncTypingEvent| async move {
658 let typing_user_ids = event
660 .content
661 .user_ids
662 .into_iter()
663 .filter(|user_id| *user_id != own_user_id)
664 .collect();
665 let _ = sender.send(typing_user_ids);
667 }
668 });
669 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
670 (drop_guard, receiver)
671 }
672
673 #[cfg(feature = "e2e-encryption")]
696 pub async fn subscribe_to_identity_status_changes(
697 &self,
698 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
699 IdentityStatusChanges::create_stream(self.clone()).await
700 }
701
702 #[cfg(not(feature = "experimental-encrypted-state-events"))]
707 #[allow(clippy::unused_async)] async fn try_decrypt_event(
709 &self,
710 event: Raw<AnyTimelineEvent>,
711 push_ctx: Option<&PushContext>,
712 ) -> TimelineEvent {
713 #[cfg(feature = "e2e-encryption")]
714 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
715 SyncMessageLikeEvent::Original(_),
716 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
717 && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
718 {
719 return event;
720 }
721
722 let mut event = TimelineEvent::from_plaintext(event.cast());
723 if let Some(push_ctx) = push_ctx {
724 event.set_push_actions(push_ctx.for_event(event.raw()).await);
725 }
726
727 event
728 }
729
730 #[cfg(feature = "experimental-encrypted-state-events")]
735 #[allow(clippy::unused_async)] async fn try_decrypt_event(
737 &self,
738 event: Raw<AnyTimelineEvent>,
739 push_ctx: Option<&PushContext>,
740 ) -> TimelineEvent {
741 match event.deserialize_as::<AnySyncTimelineEvent>() {
743 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
744 SyncMessageLikeEvent::Original(_),
745 ))) => {
746 if let Ok(event) = self
747 .decrypt_event(
748 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
749 push_ctx,
750 )
751 .await
752 {
753 return event;
754 }
755 }
756 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
757 SyncStateEvent::Original(_),
758 ))) => {
759 if let Ok(event) = self
760 .decrypt_event(
761 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
762 push_ctx,
763 )
764 .await
765 {
766 return event;
767 }
768 }
769 _ => {}
770 }
771
772 let mut event = TimelineEvent::from_plaintext(event.cast());
773 if let Some(push_ctx) = push_ctx {
774 event.set_push_actions(push_ctx.for_event(event.raw()).await);
775 }
776
777 event
778 }
779
780 pub async fn event(
785 &self,
786 event_id: &EventId,
787 request_config: Option<RequestConfig>,
788 ) -> Result<TimelineEvent> {
789 let request =
790 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
791
792 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
793 let push_ctx = self.push_context().await?;
794 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
795
796 if let Ok((cache, _handles)) = self.event_cache().await {
798 cache.save_events([event.clone()]).await;
799 }
800
801 Ok(event)
802 }
803
804 pub async fn load_or_fetch_event(
811 &self,
812 event_id: &EventId,
813 request_config: Option<RequestConfig>,
814 ) -> Result<TimelineEvent> {
815 match self.event_cache().await {
816 Ok((event_cache, _drop_handles)) => {
817 if let Some(event) = event_cache.find_event(event_id).await {
818 return Ok(event);
819 }
820 }
822 Err(err) => {
823 debug!("error when getting the event cache: {err}");
824 }
825 }
826 self.event(event_id, request_config).await
827 }
828
829 pub async fn event_with_context(
832 &self,
833 event_id: &EventId,
834 lazy_load_members: bool,
835 context_size: UInt,
836 request_config: Option<RequestConfig>,
837 ) -> Result<EventWithContextResponse> {
838 let mut request =
839 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
840
841 request.limit = context_size;
842
843 if lazy_load_members {
844 request.filter.lazy_load_options =
845 LazyLoadOptions::Enabled { include_redundant_members: false };
846 }
847
848 let response = self.client.send(request).with_request_config(request_config).await?;
849
850 let push_ctx = self.push_context().await?;
851 let push_ctx = push_ctx.as_ref();
852 let target_event = if let Some(event) = response.event {
853 Some(self.try_decrypt_event(event, push_ctx).await)
854 } else {
855 None
856 };
857
858 let (events_before, events_after) = join!(
862 join_all(
863 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
864 ),
865 join_all(
866 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
867 ),
868 );
869
870 if let Ok((cache, _handles)) = self.event_cache().await {
872 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
873 if let Some(event) = &target_event {
874 events_to_save.push(event.clone());
875 }
876
877 for event in &events_before {
878 events_to_save.push(event.clone());
879 }
880
881 for event in &events_after {
882 events_to_save.push(event.clone());
883 }
884
885 cache.save_events(events_to_save).await;
886 }
887
888 Ok(EventWithContextResponse {
889 event: target_event,
890 events_before,
891 events_after,
892 state: response.state,
893 prev_batch_token: response.start,
894 next_batch_token: response.end,
895 })
896 }
897
898 pub(crate) async fn request_members(&self) -> Result<()> {
899 self.client
900 .locks()
901 .members_request_deduplicated_handler
902 .run(self.room_id().to_owned(), async move {
903 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
904 let response = self
905 .client
906 .send(request.clone())
907 .with_request_config(
908 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
911 )
912 .await?;
913
914 Box::pin(self.client.base_client().receive_all_members(
916 self.room_id(),
917 &request,
918 &response,
919 ))
920 .await?;
921
922 Ok(())
923 })
924 .await
925 }
926
927 pub async fn request_encryption_state(&self) -> Result<()> {
932 if !self.inner.encryption_state().is_unknown() {
933 return Ok(());
934 }
935
936 self.client
937 .locks()
938 .encryption_state_deduplicated_handler
939 .run(self.room_id().to_owned(), async move {
940 let request = get_state_event_for_key::v3::Request::new(
942 self.room_id().to_owned(),
943 StateEventType::RoomEncryption,
944 "".to_owned(),
945 );
946 let response = match self.client.send(request).await {
947 Ok(response) => Some(
948 response
949 .into_content()
950 .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
951 ),
952 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
953 Err(err) => return Err(err.into()),
954 };
955
956 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
957
958 let mut room_info = self.clone_info();
961 room_info.mark_encryption_state_synced();
962 room_info.set_encryption_event(response.clone());
963 let mut changes = StateChanges::default();
964 changes.add_room(room_info.clone());
965
966 self.client.state_store().save_changes(&changes).await?;
967 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
968
969 Ok(())
970 })
971 .await
972 }
973
974 pub fn encryption_state(&self) -> EncryptionState {
979 self.inner.encryption_state()
980 }
981
982 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
988 self.request_encryption_state().await?;
989
990 Ok(self.encryption_state())
991 }
992
993 #[cfg(feature = "e2e-encryption")]
995 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
996 let encryption = self.client.encryption();
997
998 let this_device_is_verified = match encryption.get_own_device().await {
999 Ok(Some(device)) => device.is_verified_with_cross_signing(),
1000
1001 _ => true,
1003 };
1004
1005 let backup_exists_on_server =
1006 encryption.backups().exists_on_server().await.unwrap_or(false);
1007
1008 CryptoContextInfo {
1009 device_creation_ts: encryption.device_creation_timestamp().await,
1010 this_device_is_verified,
1011 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1012 backup_exists_on_server,
1013 }
1014 }
1015
1016 fn are_events_visible(&self) -> bool {
1017 if let RoomState::Invited = self.inner.state() {
1018 return matches!(
1019 self.inner.history_visibility_or_default(),
1020 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1021 );
1022 }
1023
1024 true
1025 }
1026
1027 pub async fn sync_members(&self) -> Result<()> {
1033 if !self.are_events_visible() {
1034 return Ok(());
1035 }
1036
1037 if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1038 }
1039
1040 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1054 self.sync_members().await?;
1055 self.get_member_no_sync(user_id).await
1056 }
1057
1058 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1072 Ok(self
1073 .inner
1074 .get_member(user_id)
1075 .await?
1076 .map(|member| RoomMember::new(self.client.clone(), member)))
1077 }
1078
1079 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1088 self.sync_members().await?;
1089 self.members_no_sync(memberships).await
1090 }
1091
1092 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1101 Ok(self
1102 .inner
1103 .members(memberships)
1104 .await?
1105 .into_iter()
1106 .map(|member| RoomMember::new(self.client.clone(), member))
1107 .collect())
1108 }
1109
1110 pub async fn get_state_events(
1112 &self,
1113 event_type: StateEventType,
1114 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1115 self.client
1116 .state_store()
1117 .get_state_events(self.room_id(), event_type)
1118 .await
1119 .map_err(Into::into)
1120 }
1121
1122 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1139 where
1140 C: StaticEventContent<IsPrefix = ruma::events::False>
1141 + StaticStateEventContent
1142 + RedactContent,
1143 C::Redacted: RedactedStateEventContent,
1144 {
1145 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1146 }
1147
1148 pub async fn get_state_events_for_keys(
1151 &self,
1152 event_type: StateEventType,
1153 state_keys: &[&str],
1154 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1155 self.client
1156 .state_store()
1157 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1158 .await
1159 .map_err(Into::into)
1160 }
1161
1162 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1182 &self,
1183 state_keys: I,
1184 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1185 where
1186 C: StaticEventContent<IsPrefix = ruma::events::False>
1187 + StaticStateEventContent
1188 + RedactContent,
1189 C::StateKey: Borrow<K>,
1190 C::Redacted: RedactedStateEventContent,
1191 K: AsRef<str> + Sized + Sync + 'a,
1192 I: IntoIterator<Item = &'a K> + Send,
1193 I::IntoIter: Send,
1194 {
1195 Ok(self
1196 .client
1197 .state_store()
1198 .get_state_events_for_keys_static(self.room_id(), state_keys)
1199 .await?)
1200 }
1201
1202 pub async fn get_state_event(
1204 &self,
1205 event_type: StateEventType,
1206 state_key: &str,
1207 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1208 self.client
1209 .state_store()
1210 .get_state_event(self.room_id(), event_type, state_key)
1211 .await
1212 .map_err(Into::into)
1213 }
1214
1215 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1234 where
1235 C: StaticEventContent<IsPrefix = ruma::events::False>
1236 + StaticStateEventContent<StateKey = EmptyStateKey>
1237 + RedactContent,
1238 C::Redacted: RedactedStateEventContent,
1239 {
1240 self.get_state_event_static_for_key(&EmptyStateKey).await
1241 }
1242
1243 pub async fn get_state_event_static_for_key<C, K>(
1263 &self,
1264 state_key: &K,
1265 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1266 where
1267 C: StaticEventContent<IsPrefix = ruma::events::False>
1268 + StaticStateEventContent
1269 + RedactContent,
1270 C::StateKey: Borrow<K>,
1271 C::Redacted: RedactedStateEventContent,
1272 K: AsRef<str> + ?Sized + Sync,
1273 {
1274 Ok(self
1275 .client
1276 .state_store()
1277 .get_state_event_static_for_key(self.room_id(), state_key)
1278 .await?)
1279 }
1280
1281 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1285 Ok(self
1290 .get_state_events_static::<SpaceParentEventContent>()
1291 .await?
1292 .into_iter()
1293 .filter_map(|parent_event| match parent_event.deserialize() {
1295 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1296 Some((e.state_key.to_owned(), e.sender))
1297 }
1298 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1299 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1300 Err(e) => {
1301 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1302 None
1303 }
1304 })
1305 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1307 let Some(parent_room) = self.client.get_room(&state_key) else {
1308 return Ok(ParentSpace::Unverifiable(state_key));
1311 };
1312 if let Some(child_event) = parent_room
1315 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1316 .await?
1317 {
1318 match child_event.deserialize() {
1319 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1320 return Ok(ParentSpace::Reciprocal(parent_room));
1323 }
1324 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1325 Ok(SyncOrStrippedState::Stripped(_)) => {}
1326 Err(e) => {
1327 info!(
1328 room_id = ?self.room_id(), parent_room_id = ?state_key,
1329 "Could not deserialize m.space.child: {e}"
1330 );
1331 }
1332 }
1333 }
1338
1339 let Some(member) = parent_room.get_member(&sender).await? else {
1342 return Ok(ParentSpace::Illegitimate(parent_room));
1344 };
1345
1346 if member.can_send_state(StateEventType::SpaceChild) {
1347 Ok(ParentSpace::WithPowerlevel(parent_room))
1349 } else {
1350 Ok(ParentSpace::Illegitimate(parent_room))
1351 }
1352 })
1353 .collect::<FuturesUnordered<_>>())
1354 }
1355
1356 pub async fn account_data(
1358 &self,
1359 data_type: RoomAccountDataEventType,
1360 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1361 self.client
1362 .state_store()
1363 .get_room_account_data_event(self.room_id(), data_type)
1364 .await
1365 .map_err(Into::into)
1366 }
1367
1368 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1387 where
1388 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1389 {
1390 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1391 }
1392
1393 #[cfg(feature = "e2e-encryption")]
1398 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1399 let user_ids = self
1400 .client
1401 .state_store()
1402 .get_user_ids(self.room_id(), RoomMemberships::empty())
1403 .await?;
1404
1405 for user_id in user_ids {
1406 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1407 let any_unverified = devices.devices().any(|d| !d.is_verified());
1408
1409 if any_unverified {
1410 return Ok(false);
1411 }
1412 }
1413
1414 Ok(true)
1415 }
1416
1417 pub async fn set_account_data<T>(
1432 &self,
1433 content: T,
1434 ) -> Result<set_room_account_data::v3::Response>
1435 where
1436 T: RoomAccountDataEventContent,
1437 {
1438 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1439
1440 let request = set_room_account_data::v3::Request::new(
1441 own_user.to_owned(),
1442 self.room_id().to_owned(),
1443 &content,
1444 )?;
1445
1446 Ok(self.client.send(request).await?)
1447 }
1448
1449 pub async fn set_account_data_raw(
1474 &self,
1475 event_type: RoomAccountDataEventType,
1476 content: Raw<AnyRoomAccountDataEventContent>,
1477 ) -> Result<set_room_account_data::v3::Response> {
1478 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1479
1480 let request = set_room_account_data::v3::Request::new_raw(
1481 own_user.to_owned(),
1482 self.room_id().to_owned(),
1483 event_type,
1484 content,
1485 );
1486
1487 Ok(self.client.send(request).await?)
1488 }
1489
1490 pub async fn set_tag(
1521 &self,
1522 tag: TagName,
1523 tag_info: TagInfo,
1524 ) -> Result<create_tag::v3::Response> {
1525 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1526 let request = create_tag::v3::Request::new(
1527 user_id.to_owned(),
1528 self.inner.room_id().to_owned(),
1529 tag.to_string(),
1530 tag_info,
1531 );
1532 Ok(self.client.send(request).await?)
1533 }
1534
1535 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1542 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1543 let request = delete_tag::v3::Request::new(
1544 user_id.to_owned(),
1545 self.inner.room_id().to_owned(),
1546 tag.to_string(),
1547 );
1548 Ok(self.client.send(request).await?)
1549 }
1550
1551 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1561 if is_favourite {
1562 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1563
1564 self.set_tag(TagName::Favorite, tag_info).await?;
1565
1566 if self.is_low_priority() {
1567 self.remove_tag(TagName::LowPriority).await?;
1568 }
1569 } else {
1570 self.remove_tag(TagName::Favorite).await?;
1571 }
1572 Ok(())
1573 }
1574
1575 pub async fn set_is_low_priority(
1585 &self,
1586 is_low_priority: bool,
1587 tag_order: Option<f64>,
1588 ) -> Result<()> {
1589 if is_low_priority {
1590 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1591
1592 self.set_tag(TagName::LowPriority, tag_info).await?;
1593
1594 if self.is_favourite() {
1595 self.remove_tag(TagName::Favorite).await?;
1596 }
1597 } else {
1598 self.remove_tag(TagName::LowPriority).await?;
1599 }
1600 Ok(())
1601 }
1602
1603 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1612 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1613
1614 let mut content = self
1615 .client
1616 .account()
1617 .account_data::<DirectEventContent>()
1618 .await?
1619 .map(|c| c.deserialize())
1620 .transpose()?
1621 .unwrap_or_default();
1622
1623 let this_room_id = self.inner.room_id();
1624
1625 if is_direct {
1626 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1627 room_members.retain(|member| member.user_id() != self.own_user_id());
1628
1629 for member in room_members {
1630 let entry = content.entry(member.user_id().into()).or_default();
1631 if !entry.iter().any(|room_id| room_id == this_room_id) {
1632 entry.push(this_room_id.to_owned());
1633 }
1634 }
1635 } else {
1636 for (_, list) in content.iter_mut() {
1637 list.retain(|room_id| *room_id != this_room_id);
1638 }
1639
1640 content.retain(|_, list| !list.is_empty());
1642 }
1643
1644 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1645
1646 self.client.send(request).await?;
1647 Ok(())
1648 }
1649
1650 #[cfg(feature = "e2e-encryption")]
1658 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1659 pub async fn decrypt_event(
1660 &self,
1661 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1662 push_ctx: Option<&PushContext>,
1663 ) -> Result<TimelineEvent> {
1664 let machine = self.client.olm_machine().await;
1665 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1666
1667 match machine
1668 .try_decrypt_room_event(
1669 event.cast_ref(),
1670 self.inner.room_id(),
1671 self.client.decryption_settings(),
1672 )
1673 .await?
1674 {
1675 RoomEventDecryptionResult::Decrypted(decrypted) => {
1676 let push_actions = if let Some(push_ctx) = push_ctx {
1677 Some(push_ctx.for_event(&decrypted.event).await)
1678 } else {
1679 None
1680 };
1681 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1682 }
1683 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1684 self.client
1685 .encryption()
1686 .backups()
1687 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1688 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1689 }
1690 }
1691 }
1692
1693 #[cfg(feature = "experimental-encrypted-state-events")]
1701 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1702 &self,
1703 event: &Raw<T>,
1704 push_ctx: Option<&PushContext>,
1705 ) -> Result<TimelineEvent> {
1706 let machine = self.client.olm_machine().await;
1707 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1708
1709 match machine
1710 .try_decrypt_room_event(
1711 event.cast_ref(),
1712 self.inner.room_id(),
1713 self.client.decryption_settings(),
1714 )
1715 .await?
1716 {
1717 RoomEventDecryptionResult::Decrypted(decrypted) => {
1718 let push_actions = if let Some(push_ctx) = push_ctx {
1719 Some(push_ctx.for_event(&decrypted.event).await)
1720 } else {
1721 None
1722 };
1723 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1724 }
1725 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1726 self.client
1727 .encryption()
1728 .backups()
1729 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1730 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1733 }
1734 }
1735 }
1736
1737 #[cfg(feature = "e2e-encryption")]
1750 pub async fn get_encryption_info(
1751 &self,
1752 session_id: &str,
1753 sender: &UserId,
1754 ) -> Option<Arc<EncryptionInfo>> {
1755 let machine = self.client.olm_machine().await;
1756 let machine = machine.as_ref()?;
1757 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1758 }
1759
1760 #[cfg(feature = "e2e-encryption")]
1773 pub async fn discard_room_key(&self) -> Result<()> {
1774 let machine = self.client.olm_machine().await;
1775 if let Some(machine) = machine.as_ref() {
1776 machine.discard_room_key(self.inner.room_id()).await?;
1777 Ok(())
1778 } else {
1779 Err(Error::NoOlmMachine)
1780 }
1781 }
1782
1783 #[instrument(skip_all)]
1791 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1792 let request = assign!(
1793 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1794 { reason: reason.map(ToOwned::to_owned) }
1795 );
1796 self.client.send(request).await?;
1797 Ok(())
1798 }
1799
1800 #[instrument(skip_all)]
1808 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1809 let request = assign!(
1810 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1811 { reason: reason.map(ToOwned::to_owned) }
1812 );
1813 self.client.send(request).await?;
1814 Ok(())
1815 }
1816
1817 #[instrument(skip_all)]
1826 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1827 let request = assign!(
1828 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1829 { reason: reason.map(ToOwned::to_owned) }
1830 );
1831 self.client.send(request).await?;
1832 Ok(())
1833 }
1834
1835 #[instrument(skip_all)]
1841 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1842 #[cfg(feature = "e2e-encryption")]
1843 if self.client.inner.enable_share_history_on_invite {
1844 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1845 }
1846
1847 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1848 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1849 self.client.send(request).await?;
1850
1851 self.mark_members_missing();
1855
1856 Ok(())
1857 }
1858
1859 #[instrument(skip_all)]
1865 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1866 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1867 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1868 self.client.send(request).await?;
1869
1870 self.mark_members_missing();
1874
1875 Ok(())
1876 }
1877
1878 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1913 self.ensure_room_joined()?;
1914
1915 let send = if let Some(typing_time) =
1918 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1919 {
1920 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1921 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1925 } else {
1926 !typing
1928 }
1929 } else {
1930 typing
1933 };
1934
1935 if send {
1936 self.send_typing_notice(typing).await?;
1937 }
1938
1939 Ok(())
1940 }
1941
1942 #[instrument(name = "typing_notice", skip(self))]
1943 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1944 let typing = if typing {
1945 self.client
1946 .inner
1947 .typing_notice_times
1948 .write()
1949 .unwrap()
1950 .insert(self.room_id().to_owned(), Instant::now());
1951 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1952 } else {
1953 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1954 Typing::No
1955 };
1956
1957 let request = create_typing_event::v3::Request::new(
1958 self.own_user_id().to_owned(),
1959 self.room_id().to_owned(),
1960 typing,
1961 );
1962
1963 self.client.send(request).await?;
1964
1965 Ok(())
1966 }
1967
1968 #[instrument(skip_all)]
1985 pub async fn send_single_receipt(
1986 &self,
1987 receipt_type: create_receipt::v3::ReceiptType,
1988 thread: ReceiptThread,
1989 event_id: OwnedEventId,
1990 ) -> Result<()> {
1991 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1994
1995 self.client
1996 .inner
1997 .locks
1998 .read_receipt_deduplicated_handler
1999 .run((request_key, event_id.clone()), async {
2000 let is_unthreaded = thread == ReceiptThread::Unthreaded;
2002
2003 let mut request = create_receipt::v3::Request::new(
2004 self.room_id().to_owned(),
2005 receipt_type,
2006 event_id,
2007 );
2008 request.thread = thread;
2009
2010 self.client.send(request).await?;
2011
2012 if is_unthreaded {
2013 self.set_unread_flag(false).await?;
2014 }
2015
2016 Ok(())
2017 })
2018 .await
2019 }
2020
2021 #[instrument(skip_all)]
2031 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2032 if receipts.is_empty() {
2033 return Ok(());
2034 }
2035
2036 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2037 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2038 fully_read,
2039 read_receipt: public_read_receipt,
2040 private_read_receipt,
2041 });
2042
2043 self.client.send(request).await?;
2044
2045 self.set_unread_flag(false).await?;
2046
2047 Ok(())
2048 }
2049
2050 #[allow(unused_variables, unused_mut)]
2054 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2055 use ruma::{
2056 EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2057 };
2058 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2059
2060 if !self.latest_encryption_state().await?.is_encrypted() {
2061 let mut content =
2062 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2063 #[cfg(feature = "experimental-encrypted-state-events")]
2064 if encrypted_state_events {
2065 content = content.with_encrypted_state();
2066 }
2067 self.send_state_event(content).await?;
2068
2069 let res = timeout(
2076 async {
2077 loop {
2078 self.client.inner.sync_beat.listen().await;
2080 let _state_store_lock =
2081 self.client.base_client().state_store_lock().lock().await;
2082
2083 if !self.inner.encryption_state().is_unknown() {
2084 break;
2085 }
2086 }
2087 },
2088 SYNC_WAIT_TIME,
2089 )
2090 .await;
2091
2092 let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
2093
2094 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2096 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2097 debug!("room successfully marked as encrypted");
2098 return Ok(());
2099 }
2100
2101 #[cfg(feature = "experimental-encrypted-state-events")]
2103 if res.is_ok() && {
2104 if encrypted_state_events {
2105 self.inner.encryption_state().is_state_encrypted()
2106 } else {
2107 self.inner.encryption_state().is_encrypted()
2108 }
2109 } {
2110 debug!("room successfully marked as encrypted");
2111 return Ok(());
2112 }
2113
2114 debug!("still not marked as encrypted, marking encryption state as missing");
2119
2120 let mut room_info = self.clone_info();
2121 room_info.mark_encryption_state_missing();
2122 let mut changes = StateChanges::default();
2123 changes.add_room(room_info.clone());
2124
2125 self.client.state_store().save_changes(&changes).await?;
2126 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2127 }
2128
2129 Ok(())
2130 }
2131
2132 #[instrument(skip_all)]
2164 pub async fn enable_encryption(&self) -> Result<()> {
2165 self.enable_encryption_inner(false).await
2166 }
2167
2168 #[instrument(skip_all)]
2201 #[cfg(feature = "experimental-encrypted-state-events")]
2202 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2203 self.enable_encryption_inner(true).await
2204 }
2205
2206 #[cfg(feature = "e2e-encryption")]
2215 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2216 async fn preshare_room_key(&self) -> Result<()> {
2217 self.ensure_room_joined()?;
2218
2219 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2221 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2222
2223 self.client
2224 .locks()
2225 .group_session_deduplicated_handler
2226 .run(self.room_id().to_owned(), async move {
2227 {
2228 let members = self
2229 .client
2230 .state_store()
2231 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2232 .await?;
2233 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2234 };
2235
2236 let response = self.share_room_key().await;
2237
2238 if let Err(r) = response {
2242 let machine = self.client.olm_machine().await;
2243 if let Some(machine) = machine.as_ref() {
2244 machine.discard_room_key(self.room_id()).await?;
2245 }
2246 return Err(r);
2247 }
2248
2249 Ok(())
2250 })
2251 .await
2252 }
2253
2254 #[cfg(feature = "e2e-encryption")]
2260 #[instrument(skip_all)]
2261 async fn share_room_key(&self) -> Result<()> {
2262 self.ensure_room_joined()?;
2263
2264 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2265
2266 for request in requests {
2267 let response = self.client.send_to_device(&request).await?;
2268 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2269 }
2270
2271 Ok(())
2272 }
2273
2274 #[instrument(skip_all)]
2283 pub async fn sync_up(&self) {
2284 while !self.is_synced() && self.state() == RoomState::Joined {
2285 let wait_for_beat = self.client.inner.sync_beat.listen();
2286 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2288 }
2289 }
2290
2291 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2362 SendMessageLikeEvent::new(self, content)
2363 }
2364
2365 #[cfg(feature = "e2e-encryption")]
2368 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2369 let olm = self.client.olm_machine().await;
2370 let olm = olm.as_ref().expect("Olm machine wasn't started");
2371
2372 let members =
2373 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2374
2375 let tracked: HashMap<_, _> = olm
2376 .store()
2377 .load_tracked_users()
2378 .await?
2379 .into_iter()
2380 .map(|tracked| (tracked.user_id, tracked.dirty))
2381 .collect();
2382
2383 let members_with_unknown_devices =
2386 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2387
2388 let (req_id, request) =
2389 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2390
2391 if !request.device_keys.is_empty() {
2392 self.client.keys_query(&req_id, request.device_keys).await?;
2393 }
2394
2395 Ok(())
2396 }
2397
2398 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2442 pub fn send_raw<'a>(
2443 &'a self,
2444 event_type: &'a str,
2445 content: impl IntoRawMessageLikeEventContent,
2446 ) -> SendRawMessageLikeEvent<'a> {
2447 SendRawMessageLikeEvent::new(self, event_type, content)
2450 }
2451
2452 #[instrument(skip_all)]
2500 pub fn send_attachment<'a>(
2501 &'a self,
2502 filename: impl Into<String>,
2503 content_type: &'a Mime,
2504 data: Vec<u8>,
2505 config: AttachmentConfig,
2506 ) -> SendAttachment<'a> {
2507 SendAttachment::new(self, filename.into(), content_type, data, config)
2508 }
2509
2510 #[instrument(skip_all)]
2538 pub(super) async fn prepare_and_send_attachment<'a>(
2539 &'a self,
2540 filename: String,
2541 content_type: &'a Mime,
2542 data: Vec<u8>,
2543 mut config: AttachmentConfig,
2544 send_progress: SharedObservable<TransmissionProgress>,
2545 store_in_cache: bool,
2546 ) -> Result<send_message_event::v3::Response> {
2547 self.ensure_room_joined()?;
2548
2549 let txn_id = config.txn_id.take();
2550 let mentions = config.mentions.take();
2551
2552 let thumbnail = config.thumbnail.take();
2553
2554 let thumbnail_cache_info = if store_in_cache {
2556 thumbnail
2557 .as_ref()
2558 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2559 } else {
2560 None
2561 };
2562
2563 #[cfg(feature = "e2e-encryption")]
2564 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2565 self.client
2566 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2567 .await?
2568 } else {
2569 self.client
2570 .media()
2571 .upload_plain_media_and_thumbnail(
2572 content_type,
2573 data.clone(),
2576 thumbnail,
2577 send_progress,
2578 )
2579 .await?
2580 };
2581
2582 #[cfg(not(feature = "e2e-encryption"))]
2583 let (media_source, thumbnail) = self
2584 .client
2585 .media()
2586 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2587 .await?;
2588
2589 if store_in_cache {
2590 let media_store_lock_guard = self.client.media_store().lock().await?;
2591
2592 debug!("caching the media");
2596 let request =
2597 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2598
2599 if let Err(err) = media_store_lock_guard
2600 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2601 .await
2602 {
2603 warn!("unable to cache the media after uploading it: {err}");
2604 }
2605
2606 if let Some(((data, height, width), source)) =
2607 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2608 {
2609 debug!("caching the thumbnail");
2610
2611 let request = MediaRequestParameters {
2612 source: source.clone(),
2613 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2614 };
2615
2616 if let Err(err) = media_store_lock_guard
2617 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2618 .await
2619 {
2620 warn!("unable to cache the media after uploading it: {err}");
2621 }
2622 }
2623 }
2624
2625 let content = self
2626 .make_media_event(
2627 Room::make_attachment_type(
2628 content_type,
2629 filename,
2630 media_source,
2631 config.caption,
2632 config.info,
2633 thumbnail,
2634 ),
2635 mentions,
2636 config.reply,
2637 )
2638 .await?;
2639
2640 let mut fut = self.send(content);
2641 if let Some(txn_id) = txn_id {
2642 fut = fut.with_transaction_id(txn_id);
2643 }
2644 fut.await
2645 }
2646
2647 #[allow(clippy::too_many_arguments)]
2650 pub(crate) fn make_attachment_type(
2651 content_type: &Mime,
2652 filename: String,
2653 source: MediaSource,
2654 caption: Option<TextMessageEventContent>,
2655 info: Option<AttachmentInfo>,
2656 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2657 ) -> MessageType {
2658 make_media_type!(MessageType, content_type, filename, source, caption, info, thumbnail)
2659 }
2660
2661 pub(crate) async fn make_media_event(
2664 &self,
2665 msg_type: MessageType,
2666 mentions: Option<Mentions>,
2667 reply: Option<Reply>,
2668 ) -> Result<RoomMessageEventContent> {
2669 let mut content = RoomMessageEventContent::new(msg_type);
2670 if let Some(mentions) = mentions {
2671 content = content.add_mentions(mentions);
2672 }
2673 if let Some(reply) = reply {
2674 content = self.make_reply_event(content.into(), reply).await?;
2677 }
2678 Ok(content)
2679 }
2680
2681 #[cfg(feature = "unstable-msc4274")]
2684 #[allow(clippy::too_many_arguments)]
2685 pub(crate) fn make_gallery_item_type(
2686 content_type: &Mime,
2687 filename: String,
2688 source: MediaSource,
2689 caption: Option<TextMessageEventContent>,
2690 info: Option<AttachmentInfo>,
2691 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2692 ) -> GalleryItemType {
2693 make_media_type!(GalleryItemType, content_type, filename, source, caption, info, thumbnail)
2694 }
2695
2696 pub async fn update_power_levels(
2705 &self,
2706 updates: Vec<(&UserId, Int)>,
2707 ) -> Result<send_state_event::v3::Response> {
2708 let mut power_levels = self.power_levels().await?;
2709
2710 for (user_id, new_level) in updates {
2711 if new_level == power_levels.users_default {
2712 power_levels.users.remove(user_id);
2713 } else {
2714 power_levels.users.insert(user_id.to_owned(), new_level);
2715 }
2716 }
2717
2718 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2719 }
2720
2721 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2726 let mut power_levels = self.power_levels().await?;
2727 power_levels.apply(changes)?;
2728 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2729 Ok(())
2730 }
2731
2732 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2736 let creators = self.creators().unwrap_or_default();
2737 let rules = self.clone_info().room_version_rules_or_default();
2738
2739 let default_power_levels =
2740 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2741 let changes = RoomPowerLevelChanges::from(default_power_levels);
2742 self.apply_power_level_changes(changes).await?;
2743 Ok(self.power_levels().await?)
2744 }
2745
2746 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2751 let power_level = self.get_user_power_level(user_id).await?;
2752 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2753 }
2754
2755 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2760 let event = self.power_levels().await?;
2761 Ok(event.for_user(user_id))
2762 }
2763
2764 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2767 let power_levels = self.power_levels().await.ok();
2768 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2769 if let Some(power_levels) = power_levels {
2770 for (id, level) in power_levels.users.into_iter() {
2771 user_power_levels.insert(id, level.into());
2772 }
2773 }
2774 user_power_levels
2775 }
2776
2777 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2779 self.send_state_event(RoomNameEventContent::new(name)).await
2780 }
2781
2782 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2784 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2785 }
2786
2787 pub async fn set_avatar_url(
2793 &self,
2794 url: &MxcUri,
2795 info: Option<avatar::ImageInfo>,
2796 ) -> Result<send_state_event::v3::Response> {
2797 self.ensure_room_joined()?;
2798
2799 let mut room_avatar_event = RoomAvatarEventContent::new();
2800 room_avatar_event.url = Some(url.to_owned());
2801 room_avatar_event.info = info.map(Box::new);
2802
2803 self.send_state_event(room_avatar_event).await
2804 }
2805
2806 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2808 self.send_state_event(RoomAvatarEventContent::new()).await
2809 }
2810
2811 pub async fn upload_avatar(
2819 &self,
2820 mime: &Mime,
2821 data: Vec<u8>,
2822 info: Option<avatar::ImageInfo>,
2823 ) -> Result<send_state_event::v3::Response> {
2824 self.ensure_room_joined()?;
2825
2826 let upload_response = self.client.media().upload(mime, data, None).await?;
2827 let mut info = info.unwrap_or_default();
2828 info.blurhash = upload_response.blurhash;
2829 info.mimetype = Some(mime.to_string());
2830
2831 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2832 }
2833
2834 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2878 #[instrument(skip_all)]
2879 pub async fn send_state_event(
2880 &self,
2881 content: impl StateEventContent<StateKey = EmptyStateKey>,
2882 ) -> Result<send_state_event::v3::Response> {
2883 self.send_state_event_for_key(&EmptyStateKey, content).await
2884 }
2885
2886 #[cfg(feature = "experimental-encrypted-state-events")]
2937 #[instrument(skip_all)]
2938 pub fn send_state_event<'a>(
2939 &'a self,
2940 content: impl StateEventContent<StateKey = EmptyStateKey>,
2941 ) -> SendStateEvent<'a> {
2942 self.send_state_event_for_key(&EmptyStateKey, content)
2943 }
2944
2945 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2986 pub async fn send_state_event_for_key<C, K>(
2987 &self,
2988 state_key: &K,
2989 content: C,
2990 ) -> Result<send_state_event::v3::Response>
2991 where
2992 C: StateEventContent,
2993 C::StateKey: Borrow<K>,
2994 K: AsRef<str> + ?Sized,
2995 {
2996 self.ensure_room_joined()?;
2997 let request =
2998 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
2999 let response = self.client.send(request).await?;
3000 Ok(response)
3001 }
3002
3003 #[cfg(feature = "experimental-encrypted-state-events")]
3052 pub fn send_state_event_for_key<'a, C, K>(
3053 &'a self,
3054 state_key: &K,
3055 content: C,
3056 ) -> SendStateEvent<'a>
3057 where
3058 C: StateEventContent,
3059 C::StateKey: Borrow<K>,
3060 K: AsRef<str> + ?Sized,
3061 {
3062 SendStateEvent::new(self, state_key, content)
3063 }
3064
3065 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3100 #[instrument(skip_all)]
3101 pub async fn send_state_event_raw(
3102 &self,
3103 event_type: &str,
3104 state_key: &str,
3105 content: impl IntoRawStateEventContent,
3106 ) -> Result<send_state_event::v3::Response> {
3107 self.ensure_room_joined()?;
3108
3109 let request = send_state_event::v3::Request::new_raw(
3110 self.room_id().to_owned(),
3111 event_type.into(),
3112 state_key.to_owned(),
3113 content.into_raw_state_event_content(),
3114 );
3115
3116 Ok(self.client.send(request).await?)
3117 }
3118
3119 #[cfg(feature = "experimental-encrypted-state-events")]
3161 #[instrument(skip_all)]
3162 pub fn send_state_event_raw<'a>(
3163 &'a self,
3164 event_type: &'a str,
3165 state_key: &'a str,
3166 content: impl IntoRawStateEventContent,
3167 ) -> SendRawStateEvent<'a> {
3168 SendRawStateEvent::new(self, event_type, state_key, content)
3169 }
3170
3171 #[instrument(skip_all)]
3206 pub async fn redact(
3207 &self,
3208 event_id: &EventId,
3209 reason: Option<&str>,
3210 txn_id: Option<OwnedTransactionId>,
3211 ) -> HttpResult<redact_event::v3::Response> {
3212 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3213 let request = assign!(
3214 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3215 { reason: reason.map(ToOwned::to_owned) }
3216 );
3217
3218 self.client.send(request).await
3219 }
3220
3221 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3230 let acl_ev = self
3231 .get_state_event_static::<RoomServerAclEventContent>()
3232 .await?
3233 .and_then(|ev| ev.deserialize().ok());
3234 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3235 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3236 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3237 });
3238
3239 let members: Vec<_> = self
3243 .members_no_sync(RoomMemberships::JOIN)
3244 .await?
3245 .into_iter()
3246 .filter(|member| {
3247 let server = member.user_id().server_name();
3248 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3249 })
3250 .collect();
3251
3252 let max = members
3255 .iter()
3256 .max_by_key(|member| member.power_level())
3257 .filter(|max| max.power_level() >= int!(50))
3258 .map(|member| member.user_id().server_name());
3259
3260 let servers = members
3262 .iter()
3263 .map(|member| member.user_id().server_name())
3264 .filter(|server| max.filter(|max| max == server).is_none())
3265 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3266 *servers.entry(server).or_default() += 1;
3267 servers
3268 });
3269 let mut servers: Vec<_> = servers.into_iter().collect();
3270 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3271
3272 Ok(max
3273 .into_iter()
3274 .chain(servers.into_iter().map(|(name, _)| name))
3275 .take(3)
3276 .map(ToOwned::to_owned)
3277 .collect())
3278 }
3279
3280 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3287 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3288 return Ok(alias.matrix_to_uri());
3289 }
3290
3291 let via = self.route().await?;
3292 Ok(self.room_id().matrix_to_uri_via(via))
3293 }
3294
3295 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3306 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3307 return Ok(alias.matrix_uri(join));
3308 }
3309
3310 let via = self.route().await?;
3311 Ok(self.room_id().matrix_uri_via(via, join))
3312 }
3313
3314 pub async fn matrix_to_event_permalink(
3328 &self,
3329 event_id: impl Into<OwnedEventId>,
3330 ) -> Result<MatrixToUri> {
3331 let via = self.route().await?;
3334 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3335 }
3336
3337 pub async fn matrix_event_permalink(
3351 &self,
3352 event_id: impl Into<OwnedEventId>,
3353 ) -> Result<MatrixUri> {
3354 let via = self.route().await?;
3357 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3358 }
3359
3360 pub async fn load_user_receipt(
3373 &self,
3374 receipt_type: ReceiptType,
3375 thread: ReceiptThread,
3376 user_id: &UserId,
3377 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3378 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3379 }
3380
3381 pub async fn load_event_receipts(
3394 &self,
3395 receipt_type: ReceiptType,
3396 thread: ReceiptThread,
3397 event_id: &EventId,
3398 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3399 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3400 }
3401
3402 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3407 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3408 }
3409
3410 pub(crate) async fn push_condition_room_ctx_internal(
3417 &self,
3418 with_threads_subscriptions: bool,
3419 ) -> Result<Option<PushConditionRoomCtx>> {
3420 let room_id = self.room_id();
3421 let user_id = self.own_user_id();
3422 let room_info = self.clone_info();
3423 let member_count = room_info.active_members_count();
3424
3425 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3426 member.name().to_owned()
3427 } else {
3428 return Ok(None);
3429 };
3430
3431 let power_levels = match self.power_levels().await {
3432 Ok(power_levels) => Some(power_levels.into()),
3433 Err(error) => {
3434 if matches!(room_info.state(), RoomState::Joined) {
3435 error!("Could not compute power levels for push conditions: {error}");
3438 }
3439 None
3440 }
3441 };
3442
3443 let mut ctx = assign!(PushConditionRoomCtx::new(
3444 room_id.to_owned(),
3445 UInt::new(member_count).unwrap_or(UInt::MAX),
3446 user_id.to_owned(),
3447 user_display_name,
3448 ),
3449 {
3450 power_levels,
3451 });
3452
3453 if with_threads_subscriptions {
3454 let this = self.clone();
3455 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3456 let room = this.clone();
3457 Box::pin(async move {
3458 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3459 maybe_sub.is_some()
3460 } else {
3461 false
3462 }
3463 })
3464 });
3465 }
3466
3467 Ok(Some(ctx))
3468 }
3469
3470 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3473 self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3474 }
3475
3476 #[instrument(skip(self))]
3480 pub(crate) async fn push_context_internal(
3481 &self,
3482 with_threads_subscriptions: bool,
3483 ) -> Result<Option<PushContext>> {
3484 let Some(push_condition_room_ctx) =
3485 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3486 else {
3487 debug!("Could not aggregate push context");
3488 return Ok(None);
3489 };
3490 let push_rules = self.client().account().push_rules().await?;
3491 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3492 }
3493
3494 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3499 if let Some(ctx) = self.push_context().await? {
3500 Ok(Some(ctx.for_event(event).await))
3501 } else {
3502 Ok(None)
3503 }
3504 }
3505
3506 pub async fn invite_details(&self) -> Result<Invite> {
3509 let state = self.state();
3510
3511 if state != RoomState::Invited {
3512 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3513 }
3514
3515 let invitee = self
3516 .get_member_no_sync(self.own_user_id())
3517 .await?
3518 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3519 let event = invitee.event();
3520 let inviter_id = event.sender();
3521 let inviter = self.get_member_no_sync(inviter_id).await?;
3522 Ok(Invite { invitee, inviter })
3523 }
3524
3525 pub async fn member_with_sender_info(
3533 &self,
3534 user_id: &UserId,
3535 ) -> Result<RoomMemberWithSenderInfo> {
3536 let Some(member) = self.get_member_no_sync(user_id).await? else {
3537 return Err(Error::InsufficientData);
3538 };
3539
3540 let sender_member =
3541 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3542 Some(member)
3544 } else if self.are_members_synced() {
3545 None
3547 } else if self.sync_members().await.is_ok() {
3548 self.get_member_no_sync(member.event().sender()).await?
3550 } else {
3551 None
3552 };
3553
3554 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3555 }
3556
3557 pub async fn forget(&self) -> Result<()> {
3563 let state = self.state();
3564 match state {
3565 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3566 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3567 "Left / Banned",
3568 state,
3569 ))));
3570 }
3571 RoomState::Left | RoomState::Banned => {}
3572 }
3573
3574 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3575 let _response = self.client.send(request).await?;
3576
3577 if self.inner.direct_targets_length() != 0
3579 && let Err(e) = self.set_is_direct(false).await
3580 {
3581 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3584 }
3585
3586 self.client.base_client().forget_room(self.inner.room_id()).await?;
3587
3588 Ok(())
3589 }
3590
3591 fn ensure_room_joined(&self) -> Result<()> {
3592 let state = self.state();
3593 if state == RoomState::Joined {
3594 Ok(())
3595 } else {
3596 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3597 }
3598 }
3599
3600 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3602 if !matches!(self.state(), RoomState::Joined) {
3603 return None;
3604 }
3605
3606 let notification_settings = self.client().notification_settings().await;
3607
3608 let notification_mode =
3610 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3611
3612 if notification_mode.is_some() {
3613 notification_mode
3614 } else if let Ok(is_encrypted) =
3615 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3616 {
3617 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3622 let default_mode = notification_settings
3623 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3624 .await;
3625 Some(default_mode)
3626 } else {
3627 None
3628 }
3629 }
3630
3631 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3642 if !matches!(self.state(), RoomState::Joined) {
3643 return None;
3644 }
3645
3646 let notification_settings = self.client().notification_settings().await;
3647
3648 let mode =
3650 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3651
3652 if let Some(mode) = mode {
3653 self.update_cached_user_defined_notification_mode(mode);
3654 }
3655
3656 mode
3657 }
3658
3659 pub async fn report_content(
3672 &self,
3673 event_id: OwnedEventId,
3674 score: Option<ReportedContentScore>,
3675 reason: Option<String>,
3676 ) -> Result<report_content::v3::Response> {
3677 let state = self.state();
3678 if state != RoomState::Joined {
3679 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3680 }
3681
3682 let request = report_content::v3::Request::new(
3683 self.inner.room_id().to_owned(),
3684 event_id,
3685 score.map(Into::into),
3686 reason,
3687 );
3688 Ok(self.client.send(request).await?)
3689 }
3690
3691 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3702 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3703
3704 Ok(self.client.send(request).await?)
3705 }
3706
3707 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3713 if self.is_marked_unread() == unread {
3714 return Ok(());
3716 }
3717
3718 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3719
3720 let content = MarkedUnreadEventContent::new(unread);
3721
3722 let request = set_room_account_data::v3::Request::new(
3723 user_id.to_owned(),
3724 self.inner.room_id().to_owned(),
3725 &content,
3726 )?;
3727
3728 self.client.send(request).await?;
3729 Ok(())
3730 }
3731
3732 pub async fn event_cache(
3735 &self,
3736 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3737 self.client.event_cache().for_room(self.room_id()).await
3738 }
3739
3740 pub(crate) async fn get_user_beacon_info(
3747 &self,
3748 user_id: &UserId,
3749 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3750 let raw_event = self
3751 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3752 .await?
3753 .ok_or(BeaconError::NotFound)?;
3754
3755 match raw_event.deserialize()? {
3756 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3757 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3758 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3759 }
3760 }
3761
3762 pub async fn start_live_location_share(
3775 &self,
3776 duration_millis: u64,
3777 description: Option<String>,
3778 ) -> Result<send_state_event::v3::Response> {
3779 self.ensure_room_joined()?;
3780
3781 self.send_state_event_for_key(
3782 self.own_user_id(),
3783 BeaconInfoEventContent::new(
3784 description,
3785 Duration::from_millis(duration_millis),
3786 true,
3787 None,
3788 ),
3789 )
3790 .await
3791 }
3792
3793 pub async fn stop_live_location_share(
3800 &self,
3801 ) -> Result<send_state_event::v3::Response, BeaconError> {
3802 self.ensure_room_joined()?;
3803
3804 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3805 beacon_info_event.content.stop();
3806 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3807 }
3808
3809 pub async fn send_location_beacon(
3821 &self,
3822 geo_uri: String,
3823 ) -> Result<send_message_event::v3::Response, BeaconError> {
3824 self.ensure_room_joined()?;
3825
3826 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3827
3828 if beacon_info_event.content.is_live() {
3829 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3830 Ok(self.send(content).await?)
3831 } else {
3832 Err(BeaconError::NotLive)
3833 }
3834 }
3835
3836 pub async fn save_composer_draft(
3839 &self,
3840 draft: ComposerDraft,
3841 thread_root: Option<&EventId>,
3842 ) -> Result<()> {
3843 self.client
3844 .state_store()
3845 .set_kv_data(
3846 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3847 StateStoreDataValue::ComposerDraft(draft),
3848 )
3849 .await?;
3850 Ok(())
3851 }
3852
3853 pub async fn load_composer_draft(
3856 &self,
3857 thread_root: Option<&EventId>,
3858 ) -> Result<Option<ComposerDraft>> {
3859 let data = self
3860 .client
3861 .state_store()
3862 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3863 .await?;
3864 Ok(data.and_then(|d| d.into_composer_draft()))
3865 }
3866
3867 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3870 self.client
3871 .state_store()
3872 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3873 .await?;
3874 Ok(())
3875 }
3876
3877 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3880 let response = self
3881 .client
3882 .send(get_state_event_for_key::v3::Request::new(
3883 self.room_id().to_owned(),
3884 StateEventType::RoomPinnedEvents,
3885 "".to_owned(),
3886 ))
3887 .await;
3888
3889 match response {
3890 Ok(response) => Ok(Some(
3891 response
3892 .into_content()
3893 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3894 .pinned,
3895 )),
3896 Err(http_error) => match http_error.as_client_api_error() {
3897 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3898 _ => Err(http_error.into()),
3899 },
3900 }
3901 }
3902
3903 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3911 ObservableLiveLocation::new(&self.client, self.room_id())
3912 }
3913
3914 pub async fn subscribe_to_knock_requests(
3928 &self,
3929 ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
3930 let this = Arc::new(self.clone());
3931
3932 let room_member_events_observer =
3933 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3934
3935 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3936 let mut seen_request_ids_stream = self
3937 .seen_knock_request_ids_map
3938 .subscribe()
3939 .await
3940 .map(|values| values.unwrap_or_default());
3941
3942 let mut room_info_stream = self.subscribe_info();
3943
3944 let clear_seen_ids_handle = spawn({
3947 let this = self.clone();
3948 async move {
3949 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3950 while member_updates_stream.recv().await.is_ok() {
3951 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3953 warn!("Failed to remove seen knock requests: {err}")
3954 }
3955 }
3956 }
3957 });
3958
3959 let combined_stream = stream! {
3960 match this.get_current_join_requests(¤t_seen_ids).await {
3962 Ok(initial_requests) => yield initial_requests,
3963 Err(err) => warn!("Failed to get initial requests to join: {err}")
3964 }
3965
3966 let mut requests_stream = room_member_events_observer.subscribe();
3967 let mut seen_ids = current_seen_ids.clone();
3968
3969 loop {
3970 tokio::select! {
3973 Some((event, _)) = requests_stream.next() => {
3974 if let Some(event) = event.as_original() {
3975 let emit = if event.prev_content().is_some() {
3977 matches!(event.membership_change(),
3978 MembershipChange::Banned |
3979 MembershipChange::Knocked |
3980 MembershipChange::KnockAccepted |
3981 MembershipChange::KnockDenied |
3982 MembershipChange::KnockRetracted
3983 )
3984 } else {
3985 true
3988 };
3989
3990 if emit {
3991 match this.get_current_join_requests(&seen_ids).await {
3992 Ok(requests) => yield requests,
3993 Err(err) => {
3994 warn!("Failed to get updated knock requests on new member event: {err}")
3995 }
3996 }
3997 }
3998 }
3999 }
4000
4001 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4002 seen_ids = new_seen_ids;
4004
4005 match this.get_current_join_requests(&seen_ids).await {
4008 Ok(requests) => yield requests,
4009 Err(err) => {
4010 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4011 }
4012 }
4013 }
4014
4015 Some(room_info) = room_info_stream.next() => {
4016 if !room_info.are_members_synced() {
4019 match this.get_current_join_requests(&seen_ids).await {
4020 Ok(requests) => yield requests,
4021 Err(err) => {
4022 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4023 }
4024 }
4025 }
4026 }
4027 else => break,
4029 }
4030 }
4031 };
4032
4033 Ok((combined_stream, clear_seen_ids_handle))
4034 }
4035
4036 async fn get_current_join_requests(
4037 &self,
4038 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4039 ) -> Result<Vec<KnockRequest>> {
4040 Ok(self
4041 .members(RoomMemberships::KNOCK)
4042 .await?
4043 .into_iter()
4044 .filter_map(|member| {
4045 let event_id = member.event().event_id()?;
4046 Some(KnockRequest::new(
4047 self,
4048 event_id,
4049 member.event().timestamp(),
4050 KnockRequestMemberInfo::from_member(&member),
4051 seen_request_ids.contains_key(event_id),
4052 ))
4053 })
4054 .collect())
4055 }
4056
4057 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4059 RoomPrivacySettings::new(&self.inner, &self.client)
4060 }
4061
4062 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4070 let request = opts.into_request(self.room_id());
4071
4072 let response = self.client.send(request).await?;
4073
4074 let push_ctx = self.push_context().await?;
4075 let chunk = join_all(
4076 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4077 )
4078 .await;
4079
4080 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4081 }
4082
4083 pub async fn relations(
4097 &self,
4098 event_id: OwnedEventId,
4099 opts: RelationsOptions,
4100 ) -> Result<Relations> {
4101 opts.send(self, event_id).await
4102 }
4103
4104 #[cfg(feature = "experimental-search")]
4107 pub async fn search(
4108 &self,
4109 query: &str,
4110 max_number_of_results: usize,
4111 pagination_offset: Option<usize>,
4112 ) -> Result<Vec<OwnedEventId>, IndexError> {
4113 let mut search_index_guard = self.client.search_index().lock().await;
4114 search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4115 }
4116
4117 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4139 pub async fn subscribe_thread(
4140 &self,
4141 thread_root: OwnedEventId,
4142 automatic: Option<OwnedEventId>,
4143 ) -> Result<()> {
4144 let is_automatic = automatic.is_some();
4145
4146 match self
4147 .client
4148 .send(subscribe_thread::unstable::Request::new(
4149 self.room_id().to_owned(),
4150 thread_root.clone(),
4151 automatic,
4152 ))
4153 .await
4154 {
4155 Ok(_response) => {
4156 trace!("Server acknowledged the thread subscription; saving in db");
4157
4158 self.client
4160 .state_store()
4161 .upsert_thread_subscription(
4162 self.room_id(),
4163 &thread_root,
4164 StoredThreadSubscription {
4165 status: ThreadSubscriptionStatus::Subscribed {
4166 automatic: is_automatic,
4167 },
4168 bump_stamp: None,
4169 },
4170 )
4171 .await?;
4172
4173 Ok(())
4174 }
4175
4176 Err(err) => {
4177 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4178 trace!("Thread subscription skipped: {err}");
4183 Ok(())
4184 } else {
4185 Err(err.into())
4187 }
4188 }
4189 }
4190 }
4191
4192 pub async fn subscribe_thread_if_needed(
4198 &self,
4199 thread_root: &EventId,
4200 automatic: Option<OwnedEventId>,
4201 ) -> Result<()> {
4202 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4203 if !prev_sub.automatic || automatic.is_some() {
4206 return Ok(());
4209 }
4210 }
4211 self.subscribe_thread(thread_root.to_owned(), automatic).await
4212 }
4213
4214 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4226 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4227 self.client
4228 .send(unsubscribe_thread::unstable::Request::new(
4229 self.room_id().to_owned(),
4230 thread_root.clone(),
4231 ))
4232 .await?;
4233
4234 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4235
4236 self.client
4238 .state_store()
4239 .upsert_thread_subscription(
4240 self.room_id(),
4241 &thread_root,
4242 StoredThreadSubscription {
4243 status: ThreadSubscriptionStatus::Unsubscribed,
4244 bump_stamp: None,
4245 },
4246 )
4247 .await?;
4248
4249 Ok(())
4250 }
4251
4252 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4269 pub async fn fetch_thread_subscription(
4270 &self,
4271 thread_root: OwnedEventId,
4272 ) -> Result<Option<ThreadSubscription>> {
4273 let result = self
4274 .client
4275 .send(get_thread_subscription::unstable::Request::new(
4276 self.room_id().to_owned(),
4277 thread_root.clone(),
4278 ))
4279 .await;
4280
4281 let subscription = match result {
4282 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4283 Err(http_error) => match http_error.as_client_api_error() {
4284 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4285 _ => return Err(http_error.into()),
4286 },
4287 };
4288
4289 if let Some(sub) = &subscription {
4291 self.client
4292 .state_store()
4293 .upsert_thread_subscription(
4294 self.room_id(),
4295 &thread_root,
4296 StoredThreadSubscription {
4297 status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4298 bump_stamp: None,
4299 },
4300 )
4301 .await?;
4302 } else {
4303 self.client
4305 .state_store()
4306 .remove_thread_subscription(self.room_id(), &thread_root)
4307 .await?;
4308 }
4309
4310 Ok(subscription)
4311 }
4312
4313 pub async fn load_or_fetch_thread_subscription(
4320 &self,
4321 thread_root: &EventId,
4322 ) -> Result<Option<ThreadSubscription>> {
4323 if self.client.thread_subscription_catchup().is_outdated() {
4325 return self.fetch_thread_subscription(thread_root.to_owned()).await;
4326 }
4327
4328 Ok(self
4330 .client
4331 .state_store()
4332 .load_thread_subscription(self.room_id(), thread_root)
4333 .await
4334 .map(|maybe_sub| {
4335 maybe_sub.and_then(|stored| match stored.status {
4336 ThreadSubscriptionStatus::Unsubscribed => None,
4337 ThreadSubscriptionStatus::Subscribed { automatic } => {
4338 Some(ThreadSubscription { automatic })
4339 }
4340 })
4341 })?)
4342 }
4343}
4344
4345#[cfg(feature = "e2e-encryption")]
4346impl RoomIdentityProvider for Room {
4347 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4348 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4349 }
4350
4351 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4352 Box::pin(async {
4353 let members = self
4354 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4355 .await
4356 .unwrap_or_else(|_| Default::default());
4357
4358 let mut ret: Vec<UserIdentity> = Vec::new();
4359 for member in members {
4360 if let Some(i) = self.user_identity(member.user_id()).await {
4361 ret.push(i);
4362 }
4363 }
4364 ret
4365 })
4366 }
4367
4368 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4369 Box::pin(async {
4370 self.client
4371 .encryption()
4372 .get_user_identity(user_id)
4373 .await
4374 .unwrap_or(None)
4375 .map(|u| u.underlying_identity())
4376 })
4377 }
4378}
4379
4380#[derive(Clone, Debug)]
4383pub(crate) struct WeakRoom {
4384 client: WeakClient,
4385 room_id: OwnedRoomId,
4386}
4387
4388impl WeakRoom {
4389 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4391 Self { client, room_id }
4392 }
4393
4394 pub fn get(&self) -> Option<Room> {
4396 self.client.get().and_then(|client| client.get_room(&self.room_id))
4397 }
4398
4399 pub fn room_id(&self) -> &RoomId {
4401 &self.room_id
4402 }
4403}
4404
4405#[derive(Debug, Clone)]
4407pub struct Invite {
4408 pub invitee: RoomMember,
4410 pub inviter: Option<RoomMember>,
4412}
4413
4414#[derive(Error, Debug)]
4415enum InvitationError {
4416 #[error("No membership event found")]
4417 EventMissing,
4418}
4419
4420#[derive(Debug, Clone, Default)]
4422#[non_exhaustive]
4423pub struct Receipts {
4424 pub fully_read: Option<OwnedEventId>,
4426 pub public_read_receipt: Option<OwnedEventId>,
4428 pub private_read_receipt: Option<OwnedEventId>,
4430}
4431
4432impl Receipts {
4433 pub fn new() -> Self {
4435 Self::default()
4436 }
4437
4438 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4447 self.fully_read = event_id.into();
4448 self
4449 }
4450
4451 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4457 self.public_read_receipt = event_id.into();
4458 self
4459 }
4460
4461 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4465 self.private_read_receipt = event_id.into();
4466 self
4467 }
4468
4469 pub fn is_empty(&self) -> bool {
4471 self.fully_read.is_none()
4472 && self.public_read_receipt.is_none()
4473 && self.private_read_receipt.is_none()
4474 }
4475}
4476
4477#[derive(Debug)]
4480pub enum ParentSpace {
4481 Reciprocal(Room),
4484 WithPowerlevel(Room),
4489 Illegitimate(Room),
4492 Unverifiable(OwnedRoomId),
4495}
4496
4497#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4501pub struct ReportedContentScore(i8);
4502
4503impl ReportedContentScore {
4504 pub const MIN: Self = Self(-100);
4508
4509 pub const MAX: Self = Self(0);
4513
4514 pub fn new(value: i8) -> Option<Self> {
4523 value.try_into().ok()
4524 }
4525
4526 pub fn new_saturating(value: i8) -> Self {
4532 if value > Self::MAX {
4533 Self::MAX
4534 } else if value < Self::MIN {
4535 Self::MIN
4536 } else {
4537 Self(value)
4538 }
4539 }
4540
4541 pub fn value(&self) -> i8 {
4543 self.0
4544 }
4545}
4546
4547impl PartialEq<i8> for ReportedContentScore {
4548 fn eq(&self, other: &i8) -> bool {
4549 self.0.eq(other)
4550 }
4551}
4552
4553impl PartialEq<ReportedContentScore> for i8 {
4554 fn eq(&self, other: &ReportedContentScore) -> bool {
4555 self.eq(&other.0)
4556 }
4557}
4558
4559impl PartialOrd<i8> for ReportedContentScore {
4560 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4561 self.0.partial_cmp(other)
4562 }
4563}
4564
4565impl PartialOrd<ReportedContentScore> for i8 {
4566 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4567 self.partial_cmp(&other.0)
4568 }
4569}
4570
4571impl From<ReportedContentScore> for Int {
4572 fn from(value: ReportedContentScore) -> Self {
4573 value.0.into()
4574 }
4575}
4576
4577impl TryFrom<i8> for ReportedContentScore {
4578 type Error = TryFromReportedContentScoreError;
4579
4580 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4581 if value > Self::MAX || value < Self::MIN {
4582 Err(TryFromReportedContentScoreError(()))
4583 } else {
4584 Ok(Self(value))
4585 }
4586 }
4587}
4588
4589impl TryFrom<i16> for ReportedContentScore {
4590 type Error = TryFromReportedContentScoreError;
4591
4592 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4593 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4594 value.try_into()
4595 }
4596}
4597
4598impl TryFrom<i32> for ReportedContentScore {
4599 type Error = TryFromReportedContentScoreError;
4600
4601 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4602 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4603 value.try_into()
4604 }
4605}
4606
4607impl TryFrom<i64> for ReportedContentScore {
4608 type Error = TryFromReportedContentScoreError;
4609
4610 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4611 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4612 value.try_into()
4613 }
4614}
4615
4616impl TryFrom<Int> for ReportedContentScore {
4617 type Error = TryFromReportedContentScoreError;
4618
4619 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4620 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4621 value.try_into()
4622 }
4623}
4624
4625trait EventSource {
4626 fn get_event(
4627 &self,
4628 event_id: &EventId,
4629 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4630}
4631
4632impl EventSource for &Room {
4633 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4634 self.load_or_fetch_event(event_id, None).await
4635 }
4636}
4637
4638#[derive(Debug, Clone, Error)]
4641#[error("out of range conversion attempted")]
4642pub struct TryFromReportedContentScoreError(());
4643
4644#[derive(Debug)]
4647pub struct RoomMemberWithSenderInfo {
4648 pub room_member: RoomMember,
4650 pub sender_info: Option<RoomMember>,
4653}
4654
4655#[cfg(all(test, not(target_family = "wasm")))]
4656mod tests {
4657 use std::collections::BTreeMap;
4658
4659 use matrix_sdk_base::{ComposerDraft, store::ComposerDraftType};
4660 use matrix_sdk_test::{
4661 JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
4662 event_factory::EventFactory, test_json,
4663 };
4664 use ruma::{
4665 RoomVersionId, event_id,
4666 events::{relation::RelationType, room::member::MembershipState},
4667 int, owned_event_id, room_id, user_id,
4668 };
4669 use wiremock::{
4670 Mock, MockServer, ResponseTemplate,
4671 matchers::{header, method, path_regex},
4672 };
4673
4674 use super::ReportedContentScore;
4675 use crate::{
4676 Client,
4677 config::RequestConfig,
4678 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4679 test_utils::{
4680 client::mock_matrix_session,
4681 logged_in_client,
4682 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4683 },
4684 };
4685
4686 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4687 #[async_test]
4688 async fn test_cache_invalidation_while_encrypt() {
4689 use matrix_sdk_base::store::RoomLoadSettings;
4690 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4691
4692 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4693 let session = mock_matrix_session();
4694
4695 let client = Client::builder()
4696 .homeserver_url("http://localhost:1234")
4697 .request_config(RequestConfig::new().disable_retry())
4698 .sqlite_store(&sqlite_path, None)
4699 .build()
4700 .await
4701 .unwrap();
4702 client
4703 .matrix_auth()
4704 .restore_session(session.clone(), RoomLoadSettings::default())
4705 .await
4706 .unwrap();
4707
4708 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4709
4710 let server = MockServer::start().await;
4712 {
4713 Mock::given(method("GET"))
4714 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4715 .and(header("authorization", "Bearer 1234"))
4716 .respond_with(
4717 ResponseTemplate::new(200)
4718 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4719 )
4720 .mount(&server)
4721 .await;
4722 let response = SyncResponseBuilder::default()
4723 .add_joined_room(
4724 JoinedRoomBuilder::default()
4725 .add_state_event(StateTestEvent::Member)
4726 .add_state_event(StateTestEvent::PowerLevels)
4727 .add_state_event(StateTestEvent::Encryption),
4728 )
4729 .build_sync_response();
4730 client.base_client().receive_sync_response(response).await.unwrap();
4731 }
4732
4733 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4734
4735 room.preshare_room_key().await.unwrap();
4737
4738 {
4741 let client = Client::builder()
4742 .homeserver_url("http://localhost:1234")
4743 .request_config(RequestConfig::new().disable_retry())
4744 .sqlite_store(&sqlite_path, None)
4745 .build()
4746 .await
4747 .unwrap();
4748 client
4749 .matrix_auth()
4750 .restore_session(session.clone(), RoomLoadSettings::default())
4751 .await
4752 .unwrap();
4753 client
4754 .encryption()
4755 .enable_cross_process_store_lock("client2".to_owned())
4756 .await
4757 .unwrap();
4758
4759 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4760 assert!(guard.is_some());
4761 }
4762
4763 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4765 assert!(guard.is_some());
4766
4767 let olm = client.olm_machine().await;
4769 let olm = olm.as_ref().expect("Olm machine wasn't started");
4770
4771 let _encrypted_content = olm
4774 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4775 .await
4776 .unwrap();
4777 }
4778
4779 #[test]
4780 fn reported_content_score() {
4781 let score = ReportedContentScore::new(0).unwrap();
4783 assert_eq!(score.value(), 0);
4784 let score = ReportedContentScore::new(-50).unwrap();
4785 assert_eq!(score.value(), -50);
4786 let score = ReportedContentScore::new(-100).unwrap();
4787 assert_eq!(score.value(), -100);
4788 assert_eq!(ReportedContentScore::new(10), None);
4789 assert_eq!(ReportedContentScore::new(-110), None);
4790
4791 let score = ReportedContentScore::new_saturating(0);
4792 assert_eq!(score.value(), 0);
4793 let score = ReportedContentScore::new_saturating(-50);
4794 assert_eq!(score.value(), -50);
4795 let score = ReportedContentScore::new_saturating(-100);
4796 assert_eq!(score.value(), -100);
4797 let score = ReportedContentScore::new_saturating(10);
4798 assert_eq!(score, ReportedContentScore::MAX);
4799 let score = ReportedContentScore::new_saturating(-110);
4800 assert_eq!(score, ReportedContentScore::MIN);
4801
4802 let score = ReportedContentScore::try_from(0i16).unwrap();
4804 assert_eq!(score.value(), 0);
4805 let score = ReportedContentScore::try_from(-100i16).unwrap();
4806 assert_eq!(score.value(), -100);
4807 ReportedContentScore::try_from(10i16).unwrap_err();
4808 ReportedContentScore::try_from(-110i16).unwrap_err();
4809
4810 let score = ReportedContentScore::try_from(0i32).unwrap();
4812 assert_eq!(score.value(), 0);
4813 let score = ReportedContentScore::try_from(-100i32).unwrap();
4814 assert_eq!(score.value(), -100);
4815 ReportedContentScore::try_from(10i32).unwrap_err();
4816 ReportedContentScore::try_from(-110i32).unwrap_err();
4817
4818 let score = ReportedContentScore::try_from(0i64).unwrap();
4820 assert_eq!(score.value(), 0);
4821 let score = ReportedContentScore::try_from(-100i64).unwrap();
4822 assert_eq!(score.value(), -100);
4823 ReportedContentScore::try_from(10i64).unwrap_err();
4824 ReportedContentScore::try_from(-110i64).unwrap_err();
4825
4826 let score = ReportedContentScore::try_from(int!(0)).unwrap();
4828 assert_eq!(score.value(), 0);
4829 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4830 assert_eq!(score.value(), -100);
4831 ReportedContentScore::try_from(int!(10)).unwrap_err();
4832 ReportedContentScore::try_from(int!(-110)).unwrap_err();
4833 }
4834
4835 #[async_test]
4836 async fn test_composer_draft() {
4837 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4838
4839 let client = logged_in_client(None).await;
4840
4841 let response = SyncResponseBuilder::default()
4842 .add_joined_room(JoinedRoomBuilder::default())
4843 .build_sync_response();
4844 client.base_client().receive_sync_response(response).await.unwrap();
4845 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4846
4847 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4848
4849 let draft = ComposerDraft {
4852 plain_text: "Hello, world!".to_owned(),
4853 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4854 draft_type: ComposerDraftType::NewMessage,
4855 };
4856
4857 room.save_composer_draft(draft.clone(), None).await.unwrap();
4858
4859 let thread_root = owned_event_id!("$thread_root:b.c");
4860 let thread_draft = ComposerDraft {
4861 plain_text: "Hello, thread!".to_owned(),
4862 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4863 draft_type: ComposerDraftType::NewMessage,
4864 };
4865
4866 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4867
4868 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4870
4871 assert_eq!(
4873 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4874 Some(thread_draft.clone())
4875 );
4876
4877 room.clear_composer_draft(None).await.unwrap();
4879 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4880
4881 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4883
4884 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4886 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4887 }
4888
4889 #[async_test]
4890 async fn test_mark_join_requests_as_seen() {
4891 let server = MatrixMockServer::new().await;
4892 let client = server.client_builder().build().await;
4893 let event_id = event_id!("$a:b.c");
4894 let room_id = room_id!("!a:b.c");
4895 let user_id = user_id!("@alice:b.c");
4896
4897 let f = EventFactory::new().room(room_id);
4898 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4899 f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into(),
4900 ]);
4901 let room = server.sync_room(&client, joined_room_builder).await;
4902
4903 let seen_ids =
4905 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4906 assert!(seen_ids.is_empty());
4907
4908 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4910 .await
4911 .expect("Couldn't mark join request as seen");
4912
4913 let seen_ids =
4915 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4916 assert_eq!(seen_ids.len(), 1);
4917 assert_eq!(
4918 seen_ids.into_iter().next().expect("No next value"),
4919 (event_id.to_owned(), user_id.to_owned())
4920 )
4921 }
4922
4923 #[async_test]
4924 async fn test_own_room_membership_with_no_own_member_event() {
4925 let server = MatrixMockServer::new().await;
4926 let client = server.client_builder().build().await;
4927 let room_id = room_id!("!a:b.c");
4928
4929 let room = server.sync_joined_room(&client, room_id).await;
4930
4931 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4934 assert!(error.is_some());
4935 }
4936
4937 #[async_test]
4938 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4939 let server = MatrixMockServer::new().await;
4940 let client = server.client_builder().build().await;
4941 let room_id = room_id!("!a:b.c");
4942 let user_id = user_id!("@example:localhost");
4943
4944 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4945 let joined_room_builder =
4946 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
4947 let room = server.sync_room(&client, joined_room_builder).await;
4948
4949 let ret = room
4951 .member_with_sender_info(client.user_id().unwrap())
4952 .await
4953 .expect("Room member info should be available");
4954
4955 assert_eq!(ret.room_member.event().user_id(), user_id);
4957
4958 assert!(ret.sender_info.is_none());
4960 }
4961
4962 #[async_test]
4963 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4964 let server = MatrixMockServer::new().await;
4965 let client = server.client_builder().build().await;
4966 let room_id = room_id!("!a:b.c");
4967 let user_id = user_id!("@example:localhost");
4968
4969 let f = EventFactory::new().room(room_id).sender(user_id);
4970 let joined_room_builder =
4971 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
4972 let room = server.sync_room(&client, joined_room_builder).await;
4973
4974 let ret = room
4976 .member_with_sender_info(client.user_id().unwrap())
4977 .await
4978 .expect("Room member info should be available");
4979
4980 assert_eq!(ret.room_member.event().user_id(), user_id);
4982
4983 assert!(ret.sender_info.is_some());
4985 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
4986 }
4987
4988 #[async_test]
4989 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
4990 let server = MatrixMockServer::new().await;
4991 let client = server.client_builder().build().await;
4992 let room_id = room_id!("!a:b.c");
4993 let user_id = user_id!("@example:localhost");
4994 let sender_id = user_id!("@alice:b.c");
4995
4996 let f = EventFactory::new().room(room_id).sender(sender_id);
4997 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4998 f.member(user_id).into(),
4999 f.member(sender_id).into(),
5001 ]);
5002 let room = server.sync_room(&client, joined_room_builder).await;
5003
5004 let ret = room
5006 .member_with_sender_info(client.user_id().unwrap())
5007 .await
5008 .expect("Room member info should be available");
5009
5010 assert_eq!(ret.room_member.event().user_id(), user_id);
5012
5013 assert!(ret.sender_info.is_some());
5015 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5016 }
5017
5018 #[async_test]
5019 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5020 let server = MatrixMockServer::new().await;
5021 let client = server.client_builder().build().await;
5022 let room_id = room_id!("!a:b.c");
5023 let user_id = user_id!("@example:localhost");
5024 let sender_id = user_id!("@alice:b.c");
5025
5026 let f = EventFactory::new().room(room_id).sender(sender_id);
5027 let joined_room_builder =
5028 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into()]);
5029 let room = server.sync_room(&client, joined_room_builder).await;
5030
5031 server
5033 .mock_get_members()
5034 .ok(vec![f.member(sender_id).into_raw()])
5035 .mock_once()
5036 .mount()
5037 .await;
5038
5039 let ret = room
5041 .member_with_sender_info(client.user_id().unwrap())
5042 .await
5043 .expect("Room member info should be available");
5044
5045 assert_eq!(ret.room_member.event().user_id(), user_id);
5047
5048 assert!(ret.sender_info.is_some());
5050 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5051 }
5052
5053 #[async_test]
5054 async fn test_list_threads() {
5055 let server = MatrixMockServer::new().await;
5056 let client = server.client_builder().build().await;
5057
5058 let room_id = room_id!("!a:b.c");
5059 let sender_id = user_id!("@alice:b.c");
5060 let f = EventFactory::new().room(room_id).sender(sender_id);
5061
5062 let eid1 = event_id!("$1");
5063 let eid2 = event_id!("$2");
5064 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5065 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5066
5067 server
5068 .mock_room_threads()
5069 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5070 .mock_once()
5071 .mount()
5072 .await;
5073 server
5074 .mock_room_threads()
5075 .match_from("prev_batch")
5076 .ok(batch2, None)
5077 .mock_once()
5078 .mount()
5079 .await;
5080
5081 let room = server.sync_joined_room(&client, room_id).await;
5082 let result =
5083 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5084 assert_eq!(result.chunk.len(), 1);
5085 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5086 assert!(result.prev_batch_token.is_some());
5087
5088 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5089 let result = room.list_threads(opts).await.expect("Failed to list threads");
5090 assert_eq!(result.chunk.len(), 1);
5091 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5092 assert!(result.prev_batch_token.is_none());
5093 }
5094
5095 #[async_test]
5096 async fn test_relations() {
5097 let server = MatrixMockServer::new().await;
5098 let client = server.client_builder().build().await;
5099
5100 let room_id = room_id!("!a:b.c");
5101 let sender_id = user_id!("@alice:b.c");
5102 let f = EventFactory::new().room(room_id).sender(sender_id);
5103
5104 let target_event_id = owned_event_id!("$target");
5105 let eid1 = event_id!("$1");
5106 let eid2 = event_id!("$2");
5107 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5108 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5109
5110 server
5111 .mock_room_relations()
5112 .match_target_event(target_event_id.clone())
5113 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5114 .mock_once()
5115 .mount()
5116 .await;
5117
5118 server
5119 .mock_room_relations()
5120 .match_target_event(target_event_id.clone())
5121 .match_from("next_batch")
5122 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5123 .mock_once()
5124 .mount()
5125 .await;
5126
5127 let room = server.sync_joined_room(&client, room_id).await;
5128
5129 let mut opts = RelationsOptions {
5131 include_relations: IncludeRelations::AllRelations,
5132 ..Default::default()
5133 };
5134 let result = room
5135 .relations(target_event_id.clone(), opts.clone())
5136 .await
5137 .expect("Failed to list relations the first time");
5138 assert_eq!(result.chunk.len(), 1);
5139 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5140 assert!(result.prev_batch_token.is_none());
5141 assert!(result.next_batch_token.is_some());
5142 assert!(result.recursion_depth.is_none());
5143
5144 opts.from = result.next_batch_token;
5145 let result = room
5146 .relations(target_event_id, opts)
5147 .await
5148 .expect("Failed to list relations the second time");
5149 assert_eq!(result.chunk.len(), 1);
5150 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5151 assert!(result.prev_batch_token.is_none());
5152 assert!(result.next_batch_token.is_none());
5153 assert!(result.recursion_depth.is_none());
5154 }
5155
5156 #[async_test]
5157 async fn test_relations_with_reltype() {
5158 let server = MatrixMockServer::new().await;
5159 let client = server.client_builder().build().await;
5160
5161 let room_id = room_id!("!a:b.c");
5162 let sender_id = user_id!("@alice:b.c");
5163 let f = EventFactory::new().room(room_id).sender(sender_id);
5164
5165 let target_event_id = owned_event_id!("$target");
5166 let eid1 = event_id!("$1");
5167 let eid2 = event_id!("$2");
5168 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5169 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5170
5171 server
5172 .mock_room_relations()
5173 .match_target_event(target_event_id.clone())
5174 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5175 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5176 .mock_once()
5177 .mount()
5178 .await;
5179
5180 server
5181 .mock_room_relations()
5182 .match_target_event(target_event_id.clone())
5183 .match_from("next_batch")
5184 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5185 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5186 .mock_once()
5187 .mount()
5188 .await;
5189
5190 let room = server.sync_joined_room(&client, room_id).await;
5191
5192 let mut opts = RelationsOptions {
5194 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5195 ..Default::default()
5196 };
5197 let result = room
5198 .relations(target_event_id.clone(), opts.clone())
5199 .await
5200 .expect("Failed to list relations the first time");
5201 assert_eq!(result.chunk.len(), 1);
5202 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5203 assert!(result.prev_batch_token.is_none());
5204 assert!(result.next_batch_token.is_some());
5205 assert!(result.recursion_depth.is_none());
5206
5207 opts.from = result.next_batch_token;
5208 let result = room
5209 .relations(target_event_id, opts)
5210 .await
5211 .expect("Failed to list relations the second time");
5212 assert_eq!(result.chunk.len(), 1);
5213 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5214 assert!(result.prev_batch_token.is_none());
5215 assert!(result.next_batch_token.is_none());
5216 assert!(result.recursion_depth.is_none());
5217 }
5218
5219 #[async_test]
5220 async fn test_power_levels_computation() {
5221 let server = MatrixMockServer::new().await;
5222 let client = server.client_builder().build().await;
5223
5224 let room_id = room_id!("!a:b.c");
5225 let sender_id = client.user_id().expect("No session id");
5226 let f = EventFactory::new().room(room_id).sender(sender_id);
5227 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5228
5229 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into();
5231 let power_levels_event = f.power_levels(&mut user_map).state_key("").into();
5232 let room_member_event = f.member(sender_id).into();
5233
5234 let room = server
5236 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event]))
5237 .await;
5238 let ctx = room
5239 .push_condition_room_ctx()
5240 .await
5241 .expect("Failed to get push condition context")
5242 .expect("Could not get push condition context");
5243
5244 assert!(ctx.power_levels.is_none());
5246
5247 let room = server
5249 .sync_room(&client, JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event]))
5250 .await;
5251 let ctx = room
5252 .push_condition_room_ctx()
5253 .await
5254 .expect("Failed to get push condition context")
5255 .expect("Could not get push condition context");
5256
5257 assert!(ctx.power_levels.is_none());
5259
5260 let room = server
5262 .sync_room(
5263 &client,
5264 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5265 )
5266 .await;
5267 let ctx = room
5268 .push_condition_room_ctx()
5269 .await
5270 .expect("Failed to get push condition context")
5271 .expect("Could not get push condition context");
5272
5273 assert!(ctx.power_levels.is_some());
5275 }
5276}