1use std::{
18 borrow::Borrow,
19 collections::{BTreeMap, HashMap},
20 future::Future,
21 ops::Deref,
22 sync::Arc,
23 time::Duration,
24};
25
26use async_stream::stream;
27use eyeball::SharedObservable;
28use futures_core::Stream;
29use futures_util::{
30 StreamExt, future::join_all, stream as futures_stream, stream::FuturesUnordered,
31};
32use http::StatusCode;
33#[cfg(feature = "e2e-encryption")]
34pub use identity_status_changes::IdentityStatusChanges;
35#[cfg(feature = "experimental-encrypted-state-events")]
36use matrix_sdk_base::crypto::types::events::room::encrypted::EncryptedEvent;
37#[cfg(feature = "e2e-encryption")]
38use matrix_sdk_base::crypto::{IdentityStatusChange, RoomIdentityProvider, UserIdentity};
39pub use matrix_sdk_base::store::StoredThreadSubscription;
40use matrix_sdk_base::{
41 ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
42 StateChanges, StateStoreDataKey, StateStoreDataValue,
43 deserialized_responses::{
44 RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
45 },
46 media::{MediaThumbnailSettings, store::IgnoreMediaRetentionPolicy},
47 store::{StateStoreExt, ThreadSubscriptionStatus},
48};
49#[cfg(feature = "e2e-encryption")]
50use matrix_sdk_base::{crypto::RoomEventDecryptionResult, deserialized_responses::EncryptionInfo};
51#[cfg(feature = "e2e-encryption")]
52use matrix_sdk_common::BoxFuture;
53use matrix_sdk_common::{
54 deserialized_responses::TimelineEvent,
55 executor::{JoinHandle, spawn},
56 timeout::timeout,
57};
58#[cfg(feature = "experimental-search")]
59use matrix_sdk_search::error::IndexError;
60#[cfg(feature = "experimental-search")]
61#[cfg(doc)]
62use matrix_sdk_search::index::RoomIndex;
63use mime::Mime;
64use reply::Reply;
65#[cfg(any(feature = "experimental-search", feature = "e2e-encryption"))]
66use ruma::events::AnySyncMessageLikeEvent;
67#[cfg(feature = "experimental-encrypted-state-events")]
68use ruma::events::AnySyncStateEvent;
69#[cfg(feature = "unstable-msc4274")]
70use ruma::events::room::message::GalleryItemType;
71#[cfg(feature = "e2e-encryption")]
72use ruma::events::{
73 AnySyncTimelineEvent, SyncMessageLikeEvent, room::encrypted::OriginalSyncRoomEncryptedEvent,
74};
75use ruma::{
76 EventId, Int, MatrixToUri, MatrixUri, MxcUri, OwnedEventId, OwnedRoomId, OwnedServerName,
77 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
78 api::client::{
79 config::{set_global_account_data, set_room_account_data},
80 context,
81 error::ErrorKind,
82 filter::LazyLoadOptions,
83 membership::{
84 Invite3pid, ban_user, forget_room, get_member_events,
85 invite_user::{self, v3::InvitationRecipient},
86 kick_user, leave_room, unban_user,
87 },
88 message::send_message_event,
89 read_marker::set_read_marker,
90 receipt::create_receipt,
91 redact::redact_event,
92 room::{get_room_event, report_content, report_room},
93 state::{get_state_event_for_key, send_state_event},
94 tag::{create_tag, delete_tag},
95 threads::{get_thread_subscription, subscribe_thread, unsubscribe_thread},
96 typing::create_typing_event::{self, v3::Typing},
97 },
98 assign,
99 events::{
100 AnyRoomAccountDataEvent, AnyRoomAccountDataEventContent, AnyTimelineEvent, EmptyStateKey,
101 Mentions, MessageLikeEventContent, OriginalSyncStateEvent, RedactContent,
102 RedactedStateEventContent, RoomAccountDataEvent, RoomAccountDataEventContent,
103 RoomAccountDataEventType, StateEventContent, StateEventType, StaticEventContent,
104 StaticStateEventContent, SyncStateEvent,
105 beacon::BeaconEventContent,
106 beacon_info::BeaconInfoEventContent,
107 direct::DirectEventContent,
108 marked_unread::MarkedUnreadEventContent,
109 receipt::{Receipt, ReceiptThread, ReceiptType},
110 room::{
111 ImageInfo, MediaSource, ThumbnailInfo,
112 avatar::{self, RoomAvatarEventContent},
113 encryption::RoomEncryptionEventContent,
114 history_visibility::HistoryVisibility,
115 member::{MembershipChange, SyncRoomMemberEvent},
116 message::{
117 AudioInfo, AudioMessageEventContent, FileInfo, FileMessageEventContent,
118 FormattedBody, ImageMessageEventContent, MessageType, RoomMessageEventContent,
119 UnstableAudioDetailsContentBlock, UnstableVoiceContentBlock, VideoInfo,
120 VideoMessageEventContent,
121 },
122 name::RoomNameEventContent,
123 pinned_events::RoomPinnedEventsEventContent,
124 power_levels::{
125 RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource, UserPowerLevel,
126 },
127 server_acl::RoomServerAclEventContent,
128 topic::RoomTopicEventContent,
129 },
130 space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
131 tag::{TagInfo, TagName},
132 typing::SyncTypingEvent,
133 },
134 int,
135 push::{Action, AnyPushRuleRef, PushConditionRoomCtx, Ruleset},
136 serde::Raw,
137 time::Instant,
138};
139#[cfg(feature = "experimental-encrypted-state-events")]
140use ruma::{
141 events::room::encrypted::unstable_state::OriginalSyncStateRoomEncryptedEvent,
142 serde::JsonCastable,
143};
144use serde::de::DeserializeOwned;
145use thiserror::Error;
146use tokio::{join, sync::broadcast};
147use tracing::{debug, error, info, instrument, trace, warn};
148
149use self::futures::{SendAttachment, SendMessageLikeEvent, SendRawMessageLikeEvent};
150pub use self::{
151 member::{RoomMember, RoomMemberRole},
152 messages::{
153 EventWithContextResponse, IncludeRelations, ListThreadsOptions, Messages, MessagesOptions,
154 Relations, RelationsOptions, ThreadRoots,
155 },
156};
157#[cfg(doc)]
158use crate::event_cache::EventCache;
159#[cfg(feature = "experimental-encrypted-state-events")]
160use crate::room::futures::{SendRawStateEvent, SendStateEvent};
161use crate::{
162 BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
163 attachment::{AttachmentConfig, AttachmentInfo},
164 client::WeakClient,
165 config::RequestConfig,
166 error::{BeaconError, WrongRoomState},
167 event_cache::{self, EventCacheDropHandles, RoomEventCache},
168 event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
169 live_location_share::ObservableLiveLocation,
170 media::{MediaFormat, MediaRequestParameters},
171 notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
172 room::{
173 knock_requests::{KnockRequest, KnockRequestMemberInfo},
174 power_levels::{RoomPowerLevelChanges, RoomPowerLevelsExt},
175 privacy_settings::RoomPrivacySettings,
176 },
177 sync::RoomUpdate,
178 utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
179};
180#[cfg(feature = "e2e-encryption")]
181use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
182
183pub mod edit;
184pub mod futures;
185pub mod identity_status_changes;
186pub mod knock_requests;
188mod member;
189mod messages;
190pub mod power_levels;
191pub mod reply;
192
193pub mod calls;
194
195pub mod privacy_settings;
197
198#[cfg(feature = "e2e-encryption")]
199pub(crate) mod shared_room_history;
200
201#[derive(Debug, Clone)]
204pub struct Room {
205 inner: BaseRoom,
206 pub(crate) client: Client,
207}
208
209impl Deref for Room {
210 type Target = BaseRoom;
211
212 fn deref(&self) -> &Self::Target {
213 &self.inner
214 }
215}
216
217const TYPING_NOTICE_TIMEOUT: Duration = Duration::from_secs(4);
218const TYPING_NOTICE_RESEND_TIMEOUT: Duration = Duration::from_secs(3);
219
220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub struct ThreadSubscription {
223 pub automatic: bool,
226}
227
228#[derive(Debug)]
230pub struct PushContext {
231 push_condition_room_ctx: PushConditionRoomCtx,
233
234 push_rules: Ruleset,
237}
238
239impl PushContext {
240 pub fn new(push_condition_room_ctx: PushConditionRoomCtx, push_rules: Ruleset) -> Self {
242 Self { push_condition_room_ctx, push_rules }
243 }
244
245 pub async fn for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
247 self.push_rules.get_actions(event, &self.push_condition_room_ctx).await.to_owned()
248 }
249
250 #[doc(hidden)]
253 #[instrument(skip_all)]
254 pub async fn traced_for_event<T>(&self, event: &Raw<T>) -> Vec<Action> {
255 let rules = self
256 .push_rules
257 .iter()
258 .filter_map(|r| {
259 if !r.enabled() {
260 return None;
261 }
262
263 let simplified_action = if r.actions().is_empty() { "inhibit" } else { "notify" };
264
265 let conditions = match r {
266 AnyPushRuleRef::Override(r) => {
267 format!("{:?}", r.conditions)
268 }
269 AnyPushRuleRef::Content(r) => format!("content-body-match:{}", r.pattern),
270 AnyPushRuleRef::Room(r) => format!("room-match:{}", r.rule_id),
271 AnyPushRuleRef::Sender(r) => format!("sender-match:{}", r.rule_id),
272 AnyPushRuleRef::Underride(r) => format!("{:?}", r.conditions),
273 _ => "<unknown push rule kind>".to_owned(),
274 };
275
276 Some(format!("- {}: {conditions} => {simplified_action}", r.rule_id(),))
277 })
278 .collect::<Vec<_>>()
279 .join("\n");
280 trace!("rules:\n\n{rules}\n\n");
281
282 let found = self.push_rules.get_match(event, &self.push_condition_room_ctx).await;
283
284 if let Some(found) = found {
285 trace!("rule {} matched", found.rule_id());
286 found.actions().to_owned()
287 } else {
288 trace!("no match");
289 Vec::new()
290 }
291 }
292}
293
294macro_rules! make_media_type {
295 ($t:ty, $content_type: ident, $filename: ident, $source: ident, $caption: ident, $formatted_caption: ident, $info: ident, $thumbnail: ident) => {{
296 let (body, filename) = match $caption {
300 Some(caption) => (caption, Some($filename)),
301 None => ($filename, None),
302 };
303
304 let (thumbnail_source, thumbnail_info) = $thumbnail.unzip();
305
306 match $content_type.type_() {
307 mime::IMAGE => {
308 let info = assign!($info.map(ImageInfo::from).unwrap_or_default(), {
309 mimetype: Some($content_type.as_ref().to_owned()),
310 thumbnail_source,
311 thumbnail_info
312 });
313 let content = assign!(ImageMessageEventContent::new(body, $source), {
314 info: Some(Box::new(info)),
315 formatted: $formatted_caption,
316 filename
317 });
318 <$t>::Image(content)
319 }
320
321 mime::AUDIO => {
322 let mut content = assign!(AudioMessageEventContent::new(body, $source), {
323 formatted: $formatted_caption,
324 filename
325 });
326
327 if let Some(AttachmentInfo::Voice { audio_info, waveform: Some(waveform_vec) }) =
328 &$info
329 {
330 if let Some(duration) = audio_info.duration {
331 let waveform = waveform_vec.iter().map(|v| (*v).into()).collect();
332 content.audio =
333 Some(UnstableAudioDetailsContentBlock::new(duration, waveform));
334 }
335 content.voice = Some(UnstableVoiceContentBlock::new());
336 }
337
338 let mut audio_info = $info.map(AudioInfo::from).unwrap_or_default();
339 audio_info.mimetype = Some($content_type.as_ref().to_owned());
340 let content = content.info(Box::new(audio_info));
341
342 <$t>::Audio(content)
343 }
344
345 mime::VIDEO => {
346 let info = assign!($info.map(VideoInfo::from).unwrap_or_default(), {
347 mimetype: Some($content_type.as_ref().to_owned()),
348 thumbnail_source,
349 thumbnail_info
350 });
351 let content = assign!(VideoMessageEventContent::new(body, $source), {
352 info: Some(Box::new(info)),
353 formatted: $formatted_caption,
354 filename
355 });
356 <$t>::Video(content)
357 }
358
359 _ => {
360 let info = assign!($info.map(FileInfo::from).unwrap_or_default(), {
361 mimetype: Some($content_type.as_ref().to_owned()),
362 thumbnail_source,
363 thumbnail_info
364 });
365 let content = assign!(FileMessageEventContent::new(body, $source), {
366 info: Some(Box::new(info)),
367 formatted: $formatted_caption,
368 filename,
369 });
370 <$t>::File(content)
371 }
372 }
373 }};
374}
375
376impl Room {
377 pub(crate) fn new(client: Client, room: BaseRoom) -> Self {
384 Self { inner: room, client }
385 }
386
387 #[doc(alias = "reject_invitation")]
393 #[instrument(skip_all, fields(room_id = ?self.inner.room_id()))]
394 async fn leave_impl(&self) -> (Result<()>, &Room) {
395 let state = self.state();
396 if state == RoomState::Left {
397 return (
398 Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
399 "Joined or Invited",
400 state,
401 )))),
402 self,
403 );
404 }
405
406 let should_forget = matches!(self.state(), RoomState::Invited);
409
410 let request = leave_room::v3::Request::new(self.inner.room_id().to_owned());
411 let response = self.client.send(request).await;
412
413 if let Err(error) = response {
416 #[allow(clippy::collapsible_match)]
417 let ignore_error = if let Some(error) = error.client_api_error_kind() {
418 match error {
419 ErrorKind::Forbidden { .. } => true,
422 _ => false,
423 }
424 } else {
425 false
426 };
427
428 error!(?error, ignore_error, should_forget, "Failed to leave the room");
429
430 if !ignore_error {
431 return (Err(error.into()), self);
432 }
433 }
434
435 if let Err(e) = self.client.base_client().room_left(self.room_id()).await {
436 return (Err(e.into()), self);
437 }
438
439 if should_forget {
440 trace!("Trying to forget the room");
441
442 if let Err(error) = self.forget().await {
443 error!(?error, "Failed to forget the room");
444 }
445 }
446
447 (Ok(()), self)
448 }
449
450 pub async fn leave(&self) -> Result<()> {
458 let mut rooms: Vec<Room> = vec![self.clone()];
459 let mut current_room = self;
460
461 while let Some(predecessor) = current_room.predecessor_room() {
462 let maybe_predecessor_room = current_room.client.get_room(&predecessor.room_id);
463
464 if let Some(predecessor_room) = maybe_predecessor_room {
465 rooms.push(predecessor_room.clone());
466 current_room = rooms.last().expect("Room just pushed so can't be empty");
467 } else {
468 warn!("Cannot find predecessor room");
469 break;
470 }
471 }
472
473 let batch_size = 5;
474
475 let rooms_futures: Vec<_> = rooms
476 .iter()
477 .filter_map(|room| match room.state() {
478 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
479 Some(room.leave_impl())
480 }
481 RoomState::Banned | RoomState::Left => None,
482 })
483 .collect();
484
485 let mut futures_stream = futures_stream::iter(rooms_futures).buffer_unordered(batch_size);
486
487 let mut maybe_this_room_failed_with: Option<Error> = None;
488
489 while let Some(result) = futures_stream.next().await {
490 if let (Err(e), room) = result {
491 if room.room_id() == self.room_id() {
492 maybe_this_room_failed_with = Some(e);
493 } else {
494 warn!("Failure while attempting to leave predecessor room: {e:?}");
495 }
496 }
497 }
498
499 maybe_this_room_failed_with.map_or(Ok(()), Err)
500 }
501
502 #[doc(alias = "accept_invitation")]
506 pub async fn join(&self) -> Result<()> {
507 let prev_room_state = self.inner.state();
508
509 if prev_room_state == RoomState::Joined {
510 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
511 "Invited or Left",
512 prev_room_state,
513 ))));
514 }
515
516 self.client.join_room_by_id(self.room_id()).await?;
517
518 Ok(())
519 }
520
521 pub fn client(&self) -> Client {
525 self.client.clone()
526 }
527
528 pub fn is_synced(&self) -> bool {
531 self.inner.is_state_fully_synced()
532 }
533
534 pub async fn avatar(&self, format: MediaFormat) -> Result<Option<Vec<u8>>> {
564 let Some(url) = self.avatar_url() else { return Ok(None) };
565 let request = MediaRequestParameters { source: MediaSource::Plain(url.to_owned()), format };
566 Ok(Some(self.client.media().get_media_content(&request, true).await?))
567 }
568
569 #[instrument(skip_all, fields(room_id = ?self.inner.room_id(), ?options))]
598 pub async fn messages(&self, options: MessagesOptions) -> Result<Messages> {
599 let room_id = self.inner.room_id();
600 let request = options.into_request(room_id);
601 let http_response = self.client.send(request).await?;
602
603 let push_ctx = self.push_context().await?;
604 let chunk = join_all(
605 http_response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
606 )
607 .await;
608
609 Ok(Messages {
610 start: http_response.start,
611 end: http_response.end,
612 chunk,
613 state: http_response.state,
614 })
615 }
616
617 pub fn add_event_handler<Ev, Ctx, H>(&self, handler: H) -> EventHandlerHandle
627 where
628 Ev: SyncEvent + DeserializeOwned + Send + 'static,
629 H: EventHandler<Ev, Ctx>,
630 {
631 self.client.add_room_event_handler(self.room_id(), handler)
632 }
633
634 pub fn subscribe_to_updates(&self) -> broadcast::Receiver<RoomUpdate> {
639 self.client.subscribe_to_room_updates(self.room_id())
640 }
641
642 pub fn subscribe_to_typing_notifications(
648 &self,
649 ) -> (EventHandlerDropGuard, broadcast::Receiver<Vec<OwnedUserId>>) {
650 let (sender, receiver) = broadcast::channel(16);
651 let typing_event_handler_handle = self.client.add_room_event_handler(self.room_id(), {
652 let own_user_id = self.own_user_id().to_owned();
653 move |event: SyncTypingEvent| async move {
654 let typing_user_ids = event
656 .content
657 .user_ids
658 .into_iter()
659 .filter(|user_id| *user_id != own_user_id)
660 .collect();
661 let _ = sender.send(typing_user_ids);
663 }
664 });
665 let drop_guard = self.client().event_handler_drop_guard(typing_event_handler_handle);
666 (drop_guard, receiver)
667 }
668
669 #[cfg(feature = "e2e-encryption")]
692 pub async fn subscribe_to_identity_status_changes(
693 &self,
694 ) -> Result<impl Stream<Item = Vec<IdentityStatusChange>> + use<>> {
695 IdentityStatusChanges::create_stream(self.clone()).await
696 }
697
698 #[cfg(not(feature = "experimental-encrypted-state-events"))]
703 #[allow(clippy::unused_async)] async fn try_decrypt_event(
705 &self,
706 event: Raw<AnyTimelineEvent>,
707 push_ctx: Option<&PushContext>,
708 ) -> TimelineEvent {
709 #[cfg(feature = "e2e-encryption")]
710 if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
711 SyncMessageLikeEvent::Original(_),
712 ))) = event.deserialize_as::<AnySyncTimelineEvent>()
713 && let Ok(event) = self.decrypt_event(event.cast_ref_unchecked(), push_ctx).await
714 {
715 return event;
716 }
717
718 let mut event = TimelineEvent::from_plaintext(event.cast());
719 if let Some(push_ctx) = push_ctx {
720 event.set_push_actions(push_ctx.for_event(event.raw()).await);
721 }
722
723 event
724 }
725
726 #[cfg(feature = "experimental-encrypted-state-events")]
731 #[allow(clippy::unused_async)] async fn try_decrypt_event(
733 &self,
734 event: Raw<AnyTimelineEvent>,
735 push_ctx: Option<&PushContext>,
736 ) -> TimelineEvent {
737 match event.deserialize_as::<AnySyncTimelineEvent>() {
739 Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
740 SyncMessageLikeEvent::Original(_),
741 ))) => {
742 if let Ok(event) = self
743 .decrypt_event(
744 event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
745 push_ctx,
746 )
747 .await
748 {
749 return event;
750 }
751 }
752 Ok(AnySyncTimelineEvent::State(AnySyncStateEvent::RoomEncrypted(
753 SyncStateEvent::Original(_),
754 ))) => {
755 if let Ok(event) = self
756 .decrypt_event(
757 event.cast_ref_unchecked::<OriginalSyncStateRoomEncryptedEvent>(),
758 push_ctx,
759 )
760 .await
761 {
762 return event;
763 }
764 }
765 _ => {}
766 }
767
768 let mut event = TimelineEvent::from_plaintext(event.cast());
769 if let Some(push_ctx) = push_ctx {
770 event.set_push_actions(push_ctx.for_event(event.raw()).await);
771 }
772
773 event
774 }
775
776 pub async fn event(
781 &self,
782 event_id: &EventId,
783 request_config: Option<RequestConfig>,
784 ) -> Result<TimelineEvent> {
785 let request =
786 get_room_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
787
788 let raw_event = self.client.send(request).with_request_config(request_config).await?.event;
789 let push_ctx = self.push_context().await?;
790 let event = self.try_decrypt_event(raw_event, push_ctx.as_ref()).await;
791
792 if let Ok((cache, _handles)) = self.event_cache().await {
794 cache.save_events([event.clone()]).await;
795 }
796
797 Ok(event)
798 }
799
800 pub async fn load_or_fetch_event(
807 &self,
808 event_id: &EventId,
809 request_config: Option<RequestConfig>,
810 ) -> Result<TimelineEvent> {
811 match self.event_cache().await {
812 Ok((event_cache, _drop_handles)) => {
813 if let Some(event) = event_cache.find_event(event_id).await {
814 return Ok(event);
815 }
816 }
818 Err(err) => {
819 debug!("error when getting the event cache: {err}");
820 }
821 }
822 self.event(event_id, request_config).await
823 }
824
825 pub async fn event_with_context(
828 &self,
829 event_id: &EventId,
830 lazy_load_members: bool,
831 context_size: UInt,
832 request_config: Option<RequestConfig>,
833 ) -> Result<EventWithContextResponse> {
834 let mut request =
835 context::get_context::v3::Request::new(self.room_id().to_owned(), event_id.to_owned());
836
837 request.limit = context_size;
838
839 if lazy_load_members {
840 request.filter.lazy_load_options =
841 LazyLoadOptions::Enabled { include_redundant_members: false };
842 }
843
844 let response = self.client.send(request).with_request_config(request_config).await?;
845
846 let push_ctx = self.push_context().await?;
847 let push_ctx = push_ctx.as_ref();
848 let target_event = if let Some(event) = response.event {
849 Some(self.try_decrypt_event(event, push_ctx).await)
850 } else {
851 None
852 };
853
854 let (events_before, events_after) = join!(
858 join_all(
859 response.events_before.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
860 ),
861 join_all(
862 response.events_after.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx)),
863 ),
864 );
865
866 if let Ok((cache, _handles)) = self.event_cache().await {
868 let mut events_to_save: Vec<TimelineEvent> = Vec::new();
869 if let Some(event) = &target_event {
870 events_to_save.push(event.clone());
871 }
872
873 for event in &events_before {
874 events_to_save.push(event.clone());
875 }
876
877 for event in &events_after {
878 events_to_save.push(event.clone());
879 }
880
881 cache.save_events(events_to_save).await;
882 }
883
884 Ok(EventWithContextResponse {
885 event: target_event,
886 events_before,
887 events_after,
888 state: response.state,
889 prev_batch_token: response.start,
890 next_batch_token: response.end,
891 })
892 }
893
894 pub(crate) async fn request_members(&self) -> Result<()> {
895 self.client
896 .locks()
897 .members_request_deduplicated_handler
898 .run(self.room_id().to_owned(), async move {
899 let request = get_member_events::v3::Request::new(self.inner.room_id().to_owned());
900 let response = self
901 .client
902 .send(request.clone())
903 .with_request_config(
904 RequestConfig::new().timeout(Duration::from_secs(60)).retry_limit(3),
907 )
908 .await?;
909
910 Box::pin(self.client.base_client().receive_all_members(
912 self.room_id(),
913 &request,
914 &response,
915 ))
916 .await?;
917
918 Ok(())
919 })
920 .await
921 }
922
923 pub async fn request_encryption_state(&self) -> Result<()> {
928 if !self.inner.encryption_state().is_unknown() {
929 return Ok(());
930 }
931
932 self.client
933 .locks()
934 .encryption_state_deduplicated_handler
935 .run(self.room_id().to_owned(), async move {
936 let request = get_state_event_for_key::v3::Request::new(
938 self.room_id().to_owned(),
939 StateEventType::RoomEncryption,
940 "".to_owned(),
941 );
942 let response = match self.client.send(request).await {
943 Ok(response) => Some(
944 response
945 .into_content()
946 .deserialize_as_unchecked::<RoomEncryptionEventContent>()?,
947 ),
948 Err(err) if err.client_api_error_kind() == Some(&ErrorKind::NotFound) => None,
949 Err(err) => return Err(err.into()),
950 };
951
952 let _sync_lock = self.client.base_client().sync_lock().lock().await;
953
954 let mut room_info = self.clone_info();
957 room_info.mark_encryption_state_synced();
958 room_info.set_encryption_event(response.clone());
959 let mut changes = StateChanges::default();
960 changes.add_room(room_info.clone());
961
962 self.client.state_store().save_changes(&changes).await?;
963 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
964
965 Ok(())
966 })
967 .await
968 }
969
970 pub fn encryption_state(&self) -> EncryptionState {
975 self.inner.encryption_state()
976 }
977
978 pub async fn latest_encryption_state(&self) -> Result<EncryptionState> {
984 self.request_encryption_state().await?;
985
986 Ok(self.encryption_state())
987 }
988
989 #[cfg(feature = "e2e-encryption")]
991 pub async fn crypto_context_info(&self) -> CryptoContextInfo {
992 let encryption = self.client.encryption();
993
994 let this_device_is_verified = match encryption.get_own_device().await {
995 Ok(Some(device)) => device.is_verified_with_cross_signing(),
996
997 _ => true,
999 };
1000
1001 let backup_exists_on_server =
1002 encryption.backups().exists_on_server().await.unwrap_or(false);
1003
1004 CryptoContextInfo {
1005 device_creation_ts: encryption.device_creation_timestamp().await,
1006 this_device_is_verified,
1007 is_backup_configured: encryption.backups().state() == BackupState::Enabled,
1008 backup_exists_on_server,
1009 }
1010 }
1011
1012 fn are_events_visible(&self) -> bool {
1013 if let RoomState::Invited = self.inner.state() {
1014 return matches!(
1015 self.inner.history_visibility_or_default(),
1016 HistoryVisibility::WorldReadable | HistoryVisibility::Invited
1017 );
1018 }
1019
1020 true
1021 }
1022
1023 pub async fn sync_members(&self) -> Result<()> {
1029 if !self.are_events_visible() {
1030 return Ok(());
1031 }
1032
1033 if !self.are_members_synced() { self.request_members().await } else { Ok(()) }
1034 }
1035
1036 pub async fn get_member(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1050 self.sync_members().await?;
1051 self.get_member_no_sync(user_id).await
1052 }
1053
1054 pub async fn get_member_no_sync(&self, user_id: &UserId) -> Result<Option<RoomMember>> {
1068 Ok(self
1069 .inner
1070 .get_member(user_id)
1071 .await?
1072 .map(|member| RoomMember::new(self.client.clone(), member)))
1073 }
1074
1075 pub async fn members(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1084 self.sync_members().await?;
1085 self.members_no_sync(memberships).await
1086 }
1087
1088 pub async fn members_no_sync(&self, memberships: RoomMemberships) -> Result<Vec<RoomMember>> {
1097 Ok(self
1098 .inner
1099 .members(memberships)
1100 .await?
1101 .into_iter()
1102 .map(|member| RoomMember::new(self.client.clone(), member))
1103 .collect())
1104 }
1105
1106 pub async fn get_state_events(
1108 &self,
1109 event_type: StateEventType,
1110 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1111 self.client
1112 .state_store()
1113 .get_state_events(self.room_id(), event_type)
1114 .await
1115 .map_err(Into::into)
1116 }
1117
1118 pub async fn get_state_events_static<C>(&self) -> Result<Vec<RawSyncOrStrippedState<C>>>
1135 where
1136 C: StaticEventContent<IsPrefix = ruma::events::False>
1137 + StaticStateEventContent
1138 + RedactContent,
1139 C::Redacted: RedactedStateEventContent,
1140 {
1141 Ok(self.client.state_store().get_state_events_static(self.room_id()).await?)
1142 }
1143
1144 pub async fn get_state_events_for_keys(
1147 &self,
1148 event_type: StateEventType,
1149 state_keys: &[&str],
1150 ) -> Result<Vec<RawAnySyncOrStrippedState>> {
1151 self.client
1152 .state_store()
1153 .get_state_events_for_keys(self.room_id(), event_type, state_keys)
1154 .await
1155 .map_err(Into::into)
1156 }
1157
1158 pub async fn get_state_events_for_keys_static<'a, C, K, I>(
1178 &self,
1179 state_keys: I,
1180 ) -> Result<Vec<RawSyncOrStrippedState<C>>>
1181 where
1182 C: StaticEventContent<IsPrefix = ruma::events::False>
1183 + StaticStateEventContent
1184 + RedactContent,
1185 C::StateKey: Borrow<K>,
1186 C::Redacted: RedactedStateEventContent,
1187 K: AsRef<str> + Sized + Sync + 'a,
1188 I: IntoIterator<Item = &'a K> + Send,
1189 I::IntoIter: Send,
1190 {
1191 Ok(self
1192 .client
1193 .state_store()
1194 .get_state_events_for_keys_static(self.room_id(), state_keys)
1195 .await?)
1196 }
1197
1198 pub async fn get_state_event(
1200 &self,
1201 event_type: StateEventType,
1202 state_key: &str,
1203 ) -> Result<Option<RawAnySyncOrStrippedState>> {
1204 self.client
1205 .state_store()
1206 .get_state_event(self.room_id(), event_type, state_key)
1207 .await
1208 .map_err(Into::into)
1209 }
1210
1211 pub async fn get_state_event_static<C>(&self) -> Result<Option<RawSyncOrStrippedState<C>>>
1230 where
1231 C: StaticEventContent<IsPrefix = ruma::events::False>
1232 + StaticStateEventContent<StateKey = EmptyStateKey>
1233 + RedactContent,
1234 C::Redacted: RedactedStateEventContent,
1235 {
1236 self.get_state_event_static_for_key(&EmptyStateKey).await
1237 }
1238
1239 pub async fn get_state_event_static_for_key<C, K>(
1259 &self,
1260 state_key: &K,
1261 ) -> Result<Option<RawSyncOrStrippedState<C>>>
1262 where
1263 C: StaticEventContent<IsPrefix = ruma::events::False>
1264 + StaticStateEventContent
1265 + RedactContent,
1266 C::StateKey: Borrow<K>,
1267 C::Redacted: RedactedStateEventContent,
1268 K: AsRef<str> + ?Sized + Sync,
1269 {
1270 Ok(self
1271 .client
1272 .state_store()
1273 .get_state_event_static_for_key(self.room_id(), state_key)
1274 .await?)
1275 }
1276
1277 pub async fn parent_spaces(&self) -> Result<impl Stream<Item = Result<ParentSpace>> + '_> {
1281 Ok(self
1286 .get_state_events_static::<SpaceParentEventContent>()
1287 .await?
1288 .into_iter()
1289 .filter_map(|parent_event| match parent_event.deserialize() {
1291 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(e))) => {
1292 Some((e.state_key.to_owned(), e.sender))
1293 }
1294 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => None,
1295 Ok(SyncOrStrippedState::Stripped(e)) => Some((e.state_key.to_owned(), e.sender)),
1296 Err(e) => {
1297 info!(room_id = ?self.room_id(), "Could not deserialize m.space.parent: {e}");
1298 None
1299 }
1300 })
1301 .map(|(state_key, sender): (OwnedRoomId, OwnedUserId)| async move {
1303 let Some(parent_room) = self.client.get_room(&state_key) else {
1304 return Ok(ParentSpace::Unverifiable(state_key));
1307 };
1308 if let Some(child_event) = parent_room
1311 .get_state_event_static_for_key::<SpaceChildEventContent, _>(self.room_id())
1312 .await?
1313 {
1314 match child_event.deserialize() {
1315 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Original(_))) => {
1316 return Ok(ParentSpace::Reciprocal(parent_room));
1319 }
1320 Ok(SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_))) => {}
1321 Ok(SyncOrStrippedState::Stripped(_)) => {}
1322 Err(e) => {
1323 info!(
1324 room_id = ?self.room_id(), parent_room_id = ?state_key,
1325 "Could not deserialize m.space.child: {e}"
1326 );
1327 }
1328 }
1329 }
1334
1335 let Some(member) = parent_room.get_member(&sender).await? else {
1338 return Ok(ParentSpace::Illegitimate(parent_room));
1340 };
1341
1342 if member.can_send_state(StateEventType::SpaceChild) {
1343 Ok(ParentSpace::WithPowerlevel(parent_room))
1345 } else {
1346 Ok(ParentSpace::Illegitimate(parent_room))
1347 }
1348 })
1349 .collect::<FuturesUnordered<_>>())
1350 }
1351
1352 pub async fn account_data(
1354 &self,
1355 data_type: RoomAccountDataEventType,
1356 ) -> Result<Option<Raw<AnyRoomAccountDataEvent>>> {
1357 self.client
1358 .state_store()
1359 .get_room_account_data_event(self.room_id(), data_type)
1360 .await
1361 .map_err(Into::into)
1362 }
1363
1364 pub async fn account_data_static<C>(&self) -> Result<Option<Raw<RoomAccountDataEvent<C>>>>
1383 where
1384 C: StaticEventContent<IsPrefix = ruma::events::False> + RoomAccountDataEventContent,
1385 {
1386 Ok(self.account_data(C::TYPE.into()).await?.map(Raw::cast_unchecked))
1387 }
1388
1389 #[cfg(feature = "e2e-encryption")]
1394 pub async fn contains_only_verified_devices(&self) -> Result<bool> {
1395 let user_ids = self
1396 .client
1397 .state_store()
1398 .get_user_ids(self.room_id(), RoomMemberships::empty())
1399 .await?;
1400
1401 for user_id in user_ids {
1402 let devices = self.client.encryption().get_user_devices(&user_id).await?;
1403 let any_unverified = devices.devices().any(|d| !d.is_verified());
1404
1405 if any_unverified {
1406 return Ok(false);
1407 }
1408 }
1409
1410 Ok(true)
1411 }
1412
1413 pub async fn set_account_data<T>(
1428 &self,
1429 content: T,
1430 ) -> Result<set_room_account_data::v3::Response>
1431 where
1432 T: RoomAccountDataEventContent,
1433 {
1434 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1435
1436 let request = set_room_account_data::v3::Request::new(
1437 own_user.to_owned(),
1438 self.room_id().to_owned(),
1439 &content,
1440 )?;
1441
1442 Ok(self.client.send(request).await?)
1443 }
1444
1445 pub async fn set_account_data_raw(
1470 &self,
1471 event_type: RoomAccountDataEventType,
1472 content: Raw<AnyRoomAccountDataEventContent>,
1473 ) -> Result<set_room_account_data::v3::Response> {
1474 let own_user = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1475
1476 let request = set_room_account_data::v3::Request::new_raw(
1477 own_user.to_owned(),
1478 self.room_id().to_owned(),
1479 event_type,
1480 content,
1481 );
1482
1483 Ok(self.client.send(request).await?)
1484 }
1485
1486 pub async fn set_tag(
1517 &self,
1518 tag: TagName,
1519 tag_info: TagInfo,
1520 ) -> Result<create_tag::v3::Response> {
1521 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1522 let request = create_tag::v3::Request::new(
1523 user_id.to_owned(),
1524 self.inner.room_id().to_owned(),
1525 tag.to_string(),
1526 tag_info,
1527 );
1528 Ok(self.client.send(request).await?)
1529 }
1530
1531 pub async fn remove_tag(&self, tag: TagName) -> Result<delete_tag::v3::Response> {
1538 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1539 let request = delete_tag::v3::Request::new(
1540 user_id.to_owned(),
1541 self.inner.room_id().to_owned(),
1542 tag.to_string(),
1543 );
1544 Ok(self.client.send(request).await?)
1545 }
1546
1547 pub async fn set_is_favourite(&self, is_favourite: bool, tag_order: Option<f64>) -> Result<()> {
1557 if is_favourite {
1558 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1559
1560 self.set_tag(TagName::Favorite, tag_info).await?;
1561
1562 if self.is_low_priority() {
1563 self.remove_tag(TagName::LowPriority).await?;
1564 }
1565 } else {
1566 self.remove_tag(TagName::Favorite).await?;
1567 }
1568 Ok(())
1569 }
1570
1571 pub async fn set_is_low_priority(
1581 &self,
1582 is_low_priority: bool,
1583 tag_order: Option<f64>,
1584 ) -> Result<()> {
1585 if is_low_priority {
1586 let tag_info = assign!(TagInfo::new(), { order: tag_order });
1587
1588 self.set_tag(TagName::LowPriority, tag_info).await?;
1589
1590 if self.is_favourite() {
1591 self.remove_tag(TagName::Favorite).await?;
1592 }
1593 } else {
1594 self.remove_tag(TagName::LowPriority).await?;
1595 }
1596 Ok(())
1597 }
1598
1599 pub async fn set_is_direct(&self, is_direct: bool) -> Result<()> {
1608 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
1609
1610 let mut content = self
1611 .client
1612 .account()
1613 .account_data::<DirectEventContent>()
1614 .await?
1615 .map(|c| c.deserialize())
1616 .transpose()?
1617 .unwrap_or_default();
1618
1619 let this_room_id = self.inner.room_id();
1620
1621 if is_direct {
1622 let mut room_members = self.members(RoomMemberships::ACTIVE).await?;
1623 room_members.retain(|member| member.user_id() != self.own_user_id());
1624
1625 for member in room_members {
1626 let entry = content.entry(member.user_id().into()).or_default();
1627 if !entry.iter().any(|room_id| room_id == this_room_id) {
1628 entry.push(this_room_id.to_owned());
1629 }
1630 }
1631 } else {
1632 for (_, list) in content.iter_mut() {
1633 list.retain(|room_id| *room_id != this_room_id);
1634 }
1635
1636 content.retain(|_, list| !list.is_empty());
1638 }
1639
1640 let request = set_global_account_data::v3::Request::new(user_id.to_owned(), &content)?;
1641
1642 self.client.send(request).await?;
1643 Ok(())
1644 }
1645
1646 #[cfg(feature = "e2e-encryption")]
1654 #[cfg(not(feature = "experimental-encrypted-state-events"))]
1655 pub async fn decrypt_event(
1656 &self,
1657 event: &Raw<OriginalSyncRoomEncryptedEvent>,
1658 push_ctx: Option<&PushContext>,
1659 ) -> Result<TimelineEvent> {
1660 let machine = self.client.olm_machine().await;
1661 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1662
1663 match machine
1664 .try_decrypt_room_event(
1665 event.cast_ref(),
1666 self.inner.room_id(),
1667 self.client.decryption_settings(),
1668 )
1669 .await?
1670 {
1671 RoomEventDecryptionResult::Decrypted(decrypted) => {
1672 let push_actions = if let Some(push_ctx) = push_ctx {
1673 Some(push_ctx.for_event(&decrypted.event).await)
1674 } else {
1675 None
1676 };
1677 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1678 }
1679 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1680 self.client
1681 .encryption()
1682 .backups()
1683 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1684 Ok(TimelineEvent::from_utd(event.clone().cast(), utd_info))
1685 }
1686 }
1687 }
1688
1689 #[cfg(feature = "experimental-encrypted-state-events")]
1697 pub async fn decrypt_event<T: JsonCastable<EncryptedEvent>>(
1698 &self,
1699 event: &Raw<T>,
1700 push_ctx: Option<&PushContext>,
1701 ) -> Result<TimelineEvent> {
1702 let machine = self.client.olm_machine().await;
1703 let machine = machine.as_ref().ok_or(Error::NoOlmMachine)?;
1704
1705 match machine
1706 .try_decrypt_room_event(
1707 event.cast_ref(),
1708 self.inner.room_id(),
1709 self.client.decryption_settings(),
1710 )
1711 .await?
1712 {
1713 RoomEventDecryptionResult::Decrypted(decrypted) => {
1714 let push_actions = if let Some(push_ctx) = push_ctx {
1715 Some(push_ctx.for_event(&decrypted.event).await)
1716 } else {
1717 None
1718 };
1719 Ok(TimelineEvent::from_decrypted(decrypted, push_actions))
1720 }
1721 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
1722 self.client
1723 .encryption()
1724 .backups()
1725 .maybe_download_room_key(self.room_id().to_owned(), event.clone());
1726 Ok(TimelineEvent::from_utd(event.clone().cast_unchecked(), utd_info))
1729 }
1730 }
1731 }
1732
1733 #[cfg(feature = "e2e-encryption")]
1746 pub async fn get_encryption_info(
1747 &self,
1748 session_id: &str,
1749 sender: &UserId,
1750 ) -> Option<Arc<EncryptionInfo>> {
1751 let machine = self.client.olm_machine().await;
1752 let machine = machine.as_ref()?;
1753 machine.get_session_encryption_info(self.room_id(), session_id, sender).await.ok()
1754 }
1755
1756 #[cfg(feature = "e2e-encryption")]
1769 pub async fn discard_room_key(&self) -> Result<()> {
1770 let machine = self.client.olm_machine().await;
1771 if let Some(machine) = machine.as_ref() {
1772 machine.discard_room_key(self.inner.room_id()).await?;
1773 Ok(())
1774 } else {
1775 Err(Error::NoOlmMachine)
1776 }
1777 }
1778
1779 #[instrument(skip_all)]
1787 pub async fn ban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1788 let request = assign!(
1789 ban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1790 { reason: reason.map(ToOwned::to_owned) }
1791 );
1792 self.client.send(request).await?;
1793 Ok(())
1794 }
1795
1796 #[instrument(skip_all)]
1804 pub async fn unban_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1805 let request = assign!(
1806 unban_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1807 { reason: reason.map(ToOwned::to_owned) }
1808 );
1809 self.client.send(request).await?;
1810 Ok(())
1811 }
1812
1813 #[instrument(skip_all)]
1822 pub async fn kick_user(&self, user_id: &UserId, reason: Option<&str>) -> Result<()> {
1823 let request = assign!(
1824 kick_user::v3::Request::new(self.room_id().to_owned(), user_id.to_owned()),
1825 { reason: reason.map(ToOwned::to_owned) }
1826 );
1827 self.client.send(request).await?;
1828 Ok(())
1829 }
1830
1831 #[instrument(skip_all)]
1837 pub async fn invite_user_by_id(&self, user_id: &UserId) -> Result<()> {
1838 #[cfg(feature = "e2e-encryption")]
1839 if self.client.inner.enable_share_history_on_invite {
1840 shared_room_history::share_room_history(self, user_id.to_owned()).await?;
1841 }
1842
1843 let recipient = InvitationRecipient::UserId { user_id: user_id.to_owned() };
1844 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1845 self.client.send(request).await?;
1846
1847 self.mark_members_missing();
1851
1852 Ok(())
1853 }
1854
1855 #[instrument(skip_all)]
1861 pub async fn invite_user_by_3pid(&self, invite_id: Invite3pid) -> Result<()> {
1862 let recipient = InvitationRecipient::ThirdPartyId(invite_id);
1863 let request = invite_user::v3::Request::new(self.room_id().to_owned(), recipient);
1864 self.client.send(request).await?;
1865
1866 self.mark_members_missing();
1870
1871 Ok(())
1872 }
1873
1874 pub async fn typing_notice(&self, typing: bool) -> Result<()> {
1909 self.ensure_room_joined()?;
1910
1911 let send = if let Some(typing_time) =
1914 self.client.inner.typing_notice_times.read().unwrap().get(self.room_id())
1915 {
1916 if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
1917 typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
1921 } else {
1922 !typing
1924 }
1925 } else {
1926 typing
1929 };
1930
1931 if send {
1932 self.send_typing_notice(typing).await?;
1933 }
1934
1935 Ok(())
1936 }
1937
1938 #[instrument(name = "typing_notice", skip(self))]
1939 async fn send_typing_notice(&self, typing: bool) -> Result<()> {
1940 let typing = if typing {
1941 self.client
1942 .inner
1943 .typing_notice_times
1944 .write()
1945 .unwrap()
1946 .insert(self.room_id().to_owned(), Instant::now());
1947 Typing::Yes(TYPING_NOTICE_TIMEOUT)
1948 } else {
1949 self.client.inner.typing_notice_times.write().unwrap().remove(self.room_id());
1950 Typing::No
1951 };
1952
1953 let request = create_typing_event::v3::Request::new(
1954 self.own_user_id().to_owned(),
1955 self.room_id().to_owned(),
1956 typing,
1957 );
1958
1959 self.client.send(request).await?;
1960
1961 Ok(())
1962 }
1963
1964 #[instrument(skip_all)]
1981 pub async fn send_single_receipt(
1982 &self,
1983 receipt_type: create_receipt::v3::ReceiptType,
1984 thread: ReceiptThread,
1985 event_id: OwnedEventId,
1986 ) -> Result<()> {
1987 let request_key = format!("{}|{}", receipt_type, thread.as_str().unwrap_or("<unthreaded>"));
1990
1991 self.client
1992 .inner
1993 .locks
1994 .read_receipt_deduplicated_handler
1995 .run((request_key, event_id.clone()), async {
1996 let is_unthreaded = thread == ReceiptThread::Unthreaded;
1998
1999 let mut request = create_receipt::v3::Request::new(
2000 self.room_id().to_owned(),
2001 receipt_type,
2002 event_id,
2003 );
2004 request.thread = thread;
2005
2006 self.client.send(request).await?;
2007
2008 if is_unthreaded {
2009 self.set_unread_flag(false).await?;
2010 }
2011
2012 Ok(())
2013 })
2014 .await
2015 }
2016
2017 #[instrument(skip_all)]
2027 pub async fn send_multiple_receipts(&self, receipts: Receipts) -> Result<()> {
2028 if receipts.is_empty() {
2029 return Ok(());
2030 }
2031
2032 let Receipts { fully_read, public_read_receipt, private_read_receipt } = receipts;
2033 let request = assign!(set_read_marker::v3::Request::new(self.room_id().to_owned()), {
2034 fully_read,
2035 read_receipt: public_read_receipt,
2036 private_read_receipt,
2037 });
2038
2039 self.client.send(request).await?;
2040
2041 self.set_unread_flag(false).await?;
2042
2043 Ok(())
2044 }
2045
2046 #[allow(unused_variables, unused_mut)]
2050 async fn enable_encryption_inner(&self, encrypted_state_events: bool) -> Result<()> {
2051 use ruma::{
2052 EventEncryptionAlgorithm, events::room::encryption::RoomEncryptionEventContent,
2053 };
2054 const SYNC_WAIT_TIME: Duration = Duration::from_secs(3);
2055
2056 if !self.latest_encryption_state().await?.is_encrypted() {
2057 let mut content =
2058 RoomEncryptionEventContent::new(EventEncryptionAlgorithm::MegolmV1AesSha2);
2059 #[cfg(feature = "experimental-encrypted-state-events")]
2060 if encrypted_state_events {
2061 content = content.with_encrypted_state();
2062 }
2063 self.send_state_event(content).await?;
2064
2065 let res = timeout(
2072 async {
2073 loop {
2074 self.client.inner.sync_beat.listen().await;
2076 let _sync_lock = self.client.base_client().sync_lock().lock().await;
2077 if !self.inner.encryption_state().is_unknown() {
2078 break;
2079 }
2080 }
2081 },
2082 SYNC_WAIT_TIME,
2083 )
2084 .await;
2085
2086 let _sync_lock = self.client.base_client().sync_lock().lock().await;
2087
2088 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2090 if res.is_ok() && self.inner.encryption_state().is_encrypted() {
2091 debug!("room successfully marked as encrypted");
2092 return Ok(());
2093 }
2094
2095 #[cfg(feature = "experimental-encrypted-state-events")]
2097 if res.is_ok() && {
2098 if encrypted_state_events {
2099 self.inner.encryption_state().is_state_encrypted()
2100 } else {
2101 self.inner.encryption_state().is_encrypted()
2102 }
2103 } {
2104 debug!("room successfully marked as encrypted");
2105 return Ok(());
2106 }
2107
2108 debug!("still not marked as encrypted, marking encryption state as missing");
2113
2114 let mut room_info = self.clone_info();
2115 room_info.mark_encryption_state_missing();
2116 let mut changes = StateChanges::default();
2117 changes.add_room(room_info.clone());
2118
2119 self.client.state_store().save_changes(&changes).await?;
2120 self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
2121 }
2122
2123 Ok(())
2124 }
2125
2126 #[instrument(skip_all)]
2158 pub async fn enable_encryption(&self) -> Result<()> {
2159 self.enable_encryption_inner(false).await
2160 }
2161
2162 #[instrument(skip_all)]
2195 #[cfg(feature = "experimental-encrypted-state-events")]
2196 pub async fn enable_encryption_with_state_event_encryption(&self) -> Result<()> {
2197 self.enable_encryption_inner(true).await
2198 }
2199
2200 #[cfg(feature = "e2e-encryption")]
2209 #[instrument(skip_all, fields(room_id = ?self.room_id(), store_generation))]
2210 async fn preshare_room_key(&self) -> Result<()> {
2211 self.ensure_room_joined()?;
2212
2213 let guard = self.client.encryption().spin_lock_store(Some(60000)).await?;
2215 tracing::Span::current().record("store_generation", guard.map(|guard| guard.generation()));
2216
2217 self.client
2218 .locks()
2219 .group_session_deduplicated_handler
2220 .run(self.room_id().to_owned(), async move {
2221 {
2222 let members = self
2223 .client
2224 .state_store()
2225 .get_user_ids(self.room_id(), RoomMemberships::ACTIVE)
2226 .await?;
2227 self.client.claim_one_time_keys(members.iter().map(Deref::deref)).await?;
2228 };
2229
2230 let response = self.share_room_key().await;
2231
2232 if let Err(r) = response {
2236 let machine = self.client.olm_machine().await;
2237 if let Some(machine) = machine.as_ref() {
2238 machine.discard_room_key(self.room_id()).await?;
2239 }
2240 return Err(r);
2241 }
2242
2243 Ok(())
2244 })
2245 .await
2246 }
2247
2248 #[cfg(feature = "e2e-encryption")]
2254 #[instrument(skip_all)]
2255 async fn share_room_key(&self) -> Result<()> {
2256 self.ensure_room_joined()?;
2257
2258 let requests = self.client.base_client().share_room_key(self.room_id()).await?;
2259
2260 for request in requests {
2261 let response = self.client.send_to_device(&request).await?;
2262 self.client.mark_request_as_sent(&request.txn_id, &response).await?;
2263 }
2264
2265 Ok(())
2266 }
2267
2268 #[instrument(skip_all)]
2277 pub async fn sync_up(&self) {
2278 while !self.is_synced() && self.state() == RoomState::Joined {
2279 let wait_for_beat = self.client.inner.sync_beat.listen();
2280 let _ = timeout(wait_for_beat, Duration::from_millis(1000)).await;
2282 }
2283 }
2284
2285 pub fn send(&self, content: impl MessageLikeEventContent) -> SendMessageLikeEvent<'_> {
2356 SendMessageLikeEvent::new(self, content)
2357 }
2358
2359 #[cfg(feature = "e2e-encryption")]
2362 async fn query_keys_for_untracked_or_dirty_users(&self) -> Result<()> {
2363 let olm = self.client.olm_machine().await;
2364 let olm = olm.as_ref().expect("Olm machine wasn't started");
2365
2366 let members =
2367 self.client.state_store().get_user_ids(self.room_id(), RoomMemberships::ACTIVE).await?;
2368
2369 let tracked: HashMap<_, _> = olm
2370 .store()
2371 .load_tracked_users()
2372 .await?
2373 .into_iter()
2374 .map(|tracked| (tracked.user_id, tracked.dirty))
2375 .collect();
2376
2377 let members_with_unknown_devices =
2380 members.iter().filter(|member| tracked.get(*member).is_none_or(|dirty| *dirty));
2381
2382 let (req_id, request) =
2383 olm.query_keys_for_users(members_with_unknown_devices.map(|owned| owned.borrow()));
2384
2385 if !request.device_keys.is_empty() {
2386 self.client.keys_query(&req_id, request.device_keys).await?;
2387 }
2388
2389 Ok(())
2390 }
2391
2392 #[instrument(skip_all, fields(event_type, room_id = ?self.room_id(), transaction_id, is_room_encrypted, event_id))]
2436 pub fn send_raw<'a>(
2437 &'a self,
2438 event_type: &'a str,
2439 content: impl IntoRawMessageLikeEventContent,
2440 ) -> SendRawMessageLikeEvent<'a> {
2441 SendRawMessageLikeEvent::new(self, event_type, content)
2444 }
2445
2446 #[instrument(skip_all)]
2494 pub fn send_attachment<'a>(
2495 &'a self,
2496 filename: impl Into<String>,
2497 content_type: &'a Mime,
2498 data: Vec<u8>,
2499 config: AttachmentConfig,
2500 ) -> SendAttachment<'a> {
2501 SendAttachment::new(self, filename.into(), content_type, data, config)
2502 }
2503
2504 #[instrument(skip_all)]
2532 pub(super) async fn prepare_and_send_attachment<'a>(
2533 &'a self,
2534 filename: String,
2535 content_type: &'a Mime,
2536 data: Vec<u8>,
2537 mut config: AttachmentConfig,
2538 send_progress: SharedObservable<TransmissionProgress>,
2539 store_in_cache: bool,
2540 ) -> Result<send_message_event::v3::Response> {
2541 self.ensure_room_joined()?;
2542
2543 let txn_id = config.txn_id.take();
2544 let mentions = config.mentions.take();
2545
2546 let thumbnail = config.thumbnail.take();
2547
2548 let thumbnail_cache_info = if store_in_cache {
2550 thumbnail
2551 .as_ref()
2552 .map(|thumbnail| (thumbnail.data.clone(), thumbnail.height, thumbnail.width))
2553 } else {
2554 None
2555 };
2556
2557 #[cfg(feature = "e2e-encryption")]
2558 let (media_source, thumbnail) = if self.latest_encryption_state().await?.is_encrypted() {
2559 self.client
2560 .upload_encrypted_media_and_thumbnail(&data, thumbnail, send_progress)
2561 .await?
2562 } else {
2563 self.client
2564 .media()
2565 .upload_plain_media_and_thumbnail(
2566 content_type,
2567 data.clone(),
2570 thumbnail,
2571 send_progress,
2572 )
2573 .await?
2574 };
2575
2576 #[cfg(not(feature = "e2e-encryption"))]
2577 let (media_source, thumbnail) = self
2578 .client
2579 .media()
2580 .upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
2581 .await?;
2582
2583 if store_in_cache {
2584 let media_store_lock_guard = self.client.media_store().lock().await?;
2585
2586 debug!("caching the media");
2590 let request =
2591 MediaRequestParameters { source: media_source.clone(), format: MediaFormat::File };
2592
2593 if let Err(err) = media_store_lock_guard
2594 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2595 .await
2596 {
2597 warn!("unable to cache the media after uploading it: {err}");
2598 }
2599
2600 if let Some(((data, height, width), source)) =
2601 thumbnail_cache_info.zip(thumbnail.as_ref().map(|tuple| &tuple.0))
2602 {
2603 debug!("caching the thumbnail");
2604
2605 let request = MediaRequestParameters {
2606 source: source.clone(),
2607 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(width, height)),
2608 };
2609
2610 if let Err(err) = media_store_lock_guard
2611 .add_media_content(&request, data, IgnoreMediaRetentionPolicy::No)
2612 .await
2613 {
2614 warn!("unable to cache the media after uploading it: {err}");
2615 }
2616 }
2617 }
2618
2619 let content = self
2620 .make_media_event(
2621 Room::make_attachment_type(
2622 content_type,
2623 filename,
2624 media_source,
2625 config.caption,
2626 config.formatted_caption,
2627 config.info,
2628 thumbnail,
2629 ),
2630 mentions,
2631 config.reply,
2632 )
2633 .await?;
2634
2635 let mut fut = self.send(content);
2636 if let Some(txn_id) = txn_id {
2637 fut = fut.with_transaction_id(txn_id);
2638 }
2639 fut.await
2640 }
2641
2642 #[allow(clippy::too_many_arguments)]
2645 pub(crate) fn make_attachment_type(
2646 content_type: &Mime,
2647 filename: String,
2648 source: MediaSource,
2649 caption: Option<String>,
2650 formatted_caption: Option<FormattedBody>,
2651 info: Option<AttachmentInfo>,
2652 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2653 ) -> MessageType {
2654 make_media_type!(
2655 MessageType,
2656 content_type,
2657 filename,
2658 source,
2659 caption,
2660 formatted_caption,
2661 info,
2662 thumbnail
2663 )
2664 }
2665
2666 pub(crate) async fn make_media_event(
2669 &self,
2670 msg_type: MessageType,
2671 mentions: Option<Mentions>,
2672 reply: Option<Reply>,
2673 ) -> Result<RoomMessageEventContent> {
2674 let mut content = RoomMessageEventContent::new(msg_type);
2675 if let Some(mentions) = mentions {
2676 content = content.add_mentions(mentions);
2677 }
2678 if let Some(reply) = reply {
2679 content = self.make_reply_event(content.into(), reply).await?;
2682 }
2683 Ok(content)
2684 }
2685
2686 #[cfg(feature = "unstable-msc4274")]
2689 #[allow(clippy::too_many_arguments)]
2690 pub(crate) fn make_gallery_item_type(
2691 content_type: &Mime,
2692 filename: String,
2693 source: MediaSource,
2694 caption: Option<String>,
2695 formatted_caption: Option<FormattedBody>,
2696 info: Option<AttachmentInfo>,
2697 thumbnail: Option<(MediaSource, Box<ThumbnailInfo>)>,
2698 ) -> GalleryItemType {
2699 make_media_type!(
2700 GalleryItemType,
2701 content_type,
2702 filename,
2703 source,
2704 caption,
2705 formatted_caption,
2706 info,
2707 thumbnail
2708 )
2709 }
2710
2711 pub async fn update_power_levels(
2720 &self,
2721 updates: Vec<(&UserId, Int)>,
2722 ) -> Result<send_state_event::v3::Response> {
2723 let mut power_levels = self.power_levels().await?;
2724
2725 for (user_id, new_level) in updates {
2726 if new_level == power_levels.users_default {
2727 power_levels.users.remove(user_id);
2728 } else {
2729 power_levels.users.insert(user_id.to_owned(), new_level);
2730 }
2731 }
2732
2733 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await
2734 }
2735
2736 pub async fn apply_power_level_changes(&self, changes: RoomPowerLevelChanges) -> Result<()> {
2741 let mut power_levels = self.power_levels().await?;
2742 power_levels.apply(changes)?;
2743 self.send_state_event(RoomPowerLevelsEventContent::try_from(power_levels)?).await?;
2744 Ok(())
2745 }
2746
2747 pub async fn reset_power_levels(&self) -> Result<RoomPowerLevels> {
2751 let creators = self.creators().unwrap_or_default();
2752 let rules = self.clone_info().room_version_rules_or_default();
2753
2754 let default_power_levels =
2755 RoomPowerLevels::new(RoomPowerLevelsSource::None, &rules.authorization, creators);
2756 let changes = RoomPowerLevelChanges::from(default_power_levels);
2757 self.apply_power_level_changes(changes).await?;
2758 Ok(self.power_levels().await?)
2759 }
2760
2761 pub async fn get_suggested_user_role(&self, user_id: &UserId) -> Result<RoomMemberRole> {
2766 let power_level = self.get_user_power_level(user_id).await?;
2767 Ok(RoomMemberRole::suggested_role_for_power_level(power_level))
2768 }
2769
2770 pub async fn get_user_power_level(&self, user_id: &UserId) -> Result<UserPowerLevel> {
2775 let event = self.power_levels().await?;
2776 Ok(event.for_user(user_id))
2777 }
2778
2779 pub async fn users_with_power_levels(&self) -> HashMap<OwnedUserId, i64> {
2782 let power_levels = self.power_levels().await.ok();
2783 let mut user_power_levels = HashMap::<OwnedUserId, i64>::new();
2784 if let Some(power_levels) = power_levels {
2785 for (id, level) in power_levels.users.into_iter() {
2786 user_power_levels.insert(id, level.into());
2787 }
2788 }
2789 user_power_levels
2790 }
2791
2792 pub async fn set_name(&self, name: String) -> Result<send_state_event::v3::Response> {
2794 self.send_state_event(RoomNameEventContent::new(name)).await
2795 }
2796
2797 pub async fn set_room_topic(&self, topic: &str) -> Result<send_state_event::v3::Response> {
2799 self.send_state_event(RoomTopicEventContent::new(topic.into())).await
2800 }
2801
2802 pub async fn set_avatar_url(
2808 &self,
2809 url: &MxcUri,
2810 info: Option<avatar::ImageInfo>,
2811 ) -> Result<send_state_event::v3::Response> {
2812 self.ensure_room_joined()?;
2813
2814 let mut room_avatar_event = RoomAvatarEventContent::new();
2815 room_avatar_event.url = Some(url.to_owned());
2816 room_avatar_event.info = info.map(Box::new);
2817
2818 self.send_state_event(room_avatar_event).await
2819 }
2820
2821 pub async fn remove_avatar(&self) -> Result<send_state_event::v3::Response> {
2823 self.send_state_event(RoomAvatarEventContent::new()).await
2824 }
2825
2826 pub async fn upload_avatar(
2834 &self,
2835 mime: &Mime,
2836 data: Vec<u8>,
2837 info: Option<avatar::ImageInfo>,
2838 ) -> Result<send_state_event::v3::Response> {
2839 self.ensure_room_joined()?;
2840
2841 let upload_response = self.client.media().upload(mime, data, None).await?;
2842 let mut info = info.unwrap_or_default();
2843 info.blurhash = upload_response.blurhash;
2844 info.mimetype = Some(mime.to_string());
2845
2846 self.set_avatar_url(&upload_response.content_uri, Some(info)).await
2847 }
2848
2849 #[cfg(not(feature = "experimental-encrypted-state-events"))]
2893 #[instrument(skip_all)]
2894 pub async fn send_state_event(
2895 &self,
2896 content: impl StateEventContent<StateKey = EmptyStateKey>,
2897 ) -> Result<send_state_event::v3::Response> {
2898 self.send_state_event_for_key(&EmptyStateKey, content).await
2899 }
2900
2901 #[cfg(feature = "experimental-encrypted-state-events")]
2952 #[instrument(skip_all)]
2953 pub fn send_state_event<'a>(
2954 &'a self,
2955 content: impl StateEventContent<StateKey = EmptyStateKey>,
2956 ) -> SendStateEvent<'a> {
2957 self.send_state_event_for_key(&EmptyStateKey, content)
2958 }
2959
2960 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3001 pub async fn send_state_event_for_key<C, K>(
3002 &self,
3003 state_key: &K,
3004 content: C,
3005 ) -> Result<send_state_event::v3::Response>
3006 where
3007 C: StateEventContent,
3008 C::StateKey: Borrow<K>,
3009 K: AsRef<str> + ?Sized,
3010 {
3011 self.ensure_room_joined()?;
3012 let request =
3013 send_state_event::v3::Request::new(self.room_id().to_owned(), state_key, &content)?;
3014 let response = self.client.send(request).await?;
3015 Ok(response)
3016 }
3017
3018 #[cfg(feature = "experimental-encrypted-state-events")]
3067 pub fn send_state_event_for_key<'a, C, K>(
3068 &'a self,
3069 state_key: &K,
3070 content: C,
3071 ) -> SendStateEvent<'a>
3072 where
3073 C: StateEventContent,
3074 C::StateKey: Borrow<K>,
3075 K: AsRef<str> + ?Sized,
3076 {
3077 SendStateEvent::new(self, state_key, content)
3078 }
3079
3080 #[cfg(not(feature = "experimental-encrypted-state-events"))]
3115 #[instrument(skip_all)]
3116 pub async fn send_state_event_raw(
3117 &self,
3118 event_type: &str,
3119 state_key: &str,
3120 content: impl IntoRawStateEventContent,
3121 ) -> Result<send_state_event::v3::Response> {
3122 self.ensure_room_joined()?;
3123
3124 let request = send_state_event::v3::Request::new_raw(
3125 self.room_id().to_owned(),
3126 event_type.into(),
3127 state_key.to_owned(),
3128 content.into_raw_state_event_content(),
3129 );
3130
3131 Ok(self.client.send(request).await?)
3132 }
3133
3134 #[cfg(feature = "experimental-encrypted-state-events")]
3176 #[instrument(skip_all)]
3177 pub fn send_state_event_raw<'a>(
3178 &'a self,
3179 event_type: &'a str,
3180 state_key: &'a str,
3181 content: impl IntoRawStateEventContent,
3182 ) -> SendRawStateEvent<'a> {
3183 SendRawStateEvent::new(self, event_type, state_key, content)
3184 }
3185
3186 #[instrument(skip_all)]
3221 pub async fn redact(
3222 &self,
3223 event_id: &EventId,
3224 reason: Option<&str>,
3225 txn_id: Option<OwnedTransactionId>,
3226 ) -> HttpResult<redact_event::v3::Response> {
3227 let txn_id = txn_id.unwrap_or_else(TransactionId::new);
3228 let request = assign!(
3229 redact_event::v3::Request::new(self.room_id().to_owned(), event_id.to_owned(), txn_id),
3230 { reason: reason.map(ToOwned::to_owned) }
3231 );
3232
3233 self.client.send(request).await
3234 }
3235
3236 pub async fn route(&self) -> Result<Vec<OwnedServerName>> {
3245 let acl_ev = self
3246 .get_state_event_static::<RoomServerAclEventContent>()
3247 .await?
3248 .and_then(|ev| ev.deserialize().ok());
3249 let acl = acl_ev.as_ref().and_then(|ev| match ev {
3250 SyncOrStrippedState::Sync(ev) => ev.as_original().map(|ev| &ev.content),
3251 SyncOrStrippedState::Stripped(ev) => Some(&ev.content),
3252 });
3253
3254 let members: Vec<_> = self
3258 .members_no_sync(RoomMemberships::JOIN)
3259 .await?
3260 .into_iter()
3261 .filter(|member| {
3262 let server = member.user_id().server_name();
3263 acl.filter(|acl| !acl.is_allowed(server)).is_none() && !server.is_ip_literal()
3264 })
3265 .collect();
3266
3267 let max = members
3270 .iter()
3271 .max_by_key(|member| member.power_level())
3272 .filter(|max| max.power_level() >= int!(50))
3273 .map(|member| member.user_id().server_name());
3274
3275 let servers = members
3277 .iter()
3278 .map(|member| member.user_id().server_name())
3279 .filter(|server| max.filter(|max| max == server).is_none())
3280 .fold(BTreeMap::<_, u32>::new(), |mut servers, server| {
3281 *servers.entry(server).or_default() += 1;
3282 servers
3283 });
3284 let mut servers: Vec<_> = servers.into_iter().collect();
3285 servers.sort_unstable_by(|(_, count_a), (_, count_b)| count_b.cmp(count_a));
3286
3287 Ok(max
3288 .into_iter()
3289 .chain(servers.into_iter().map(|(name, _)| name))
3290 .take(3)
3291 .map(ToOwned::to_owned)
3292 .collect())
3293 }
3294
3295 pub async fn matrix_to_permalink(&self) -> Result<MatrixToUri> {
3302 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3303 return Ok(alias.matrix_to_uri());
3304 }
3305
3306 let via = self.route().await?;
3307 Ok(self.room_id().matrix_to_uri_via(via))
3308 }
3309
3310 pub async fn matrix_permalink(&self, join: bool) -> Result<MatrixUri> {
3321 if let Some(alias) = self.canonical_alias().or_else(|| self.alt_aliases().pop()) {
3322 return Ok(alias.matrix_uri(join));
3323 }
3324
3325 let via = self.route().await?;
3326 Ok(self.room_id().matrix_uri_via(via, join))
3327 }
3328
3329 pub async fn matrix_to_event_permalink(
3343 &self,
3344 event_id: impl Into<OwnedEventId>,
3345 ) -> Result<MatrixToUri> {
3346 let via = self.route().await?;
3349 Ok(self.room_id().matrix_to_event_uri_via(event_id, via))
3350 }
3351
3352 pub async fn matrix_event_permalink(
3366 &self,
3367 event_id: impl Into<OwnedEventId>,
3368 ) -> Result<MatrixUri> {
3369 let via = self.route().await?;
3372 Ok(self.room_id().matrix_event_uri_via(event_id, via))
3373 }
3374
3375 pub async fn load_user_receipt(
3388 &self,
3389 receipt_type: ReceiptType,
3390 thread: ReceiptThread,
3391 user_id: &UserId,
3392 ) -> Result<Option<(OwnedEventId, Receipt)>> {
3393 self.inner.load_user_receipt(receipt_type, thread, user_id).await.map_err(Into::into)
3394 }
3395
3396 pub async fn load_event_receipts(
3409 &self,
3410 receipt_type: ReceiptType,
3411 thread: ReceiptThread,
3412 event_id: &EventId,
3413 ) -> Result<Vec<(OwnedUserId, Receipt)>> {
3414 self.inner.load_event_receipts(receipt_type, thread, event_id).await.map_err(Into::into)
3415 }
3416
3417 pub async fn push_condition_room_ctx(&self) -> Result<Option<PushConditionRoomCtx>> {
3422 self.push_condition_room_ctx_internal(self.client.enabled_thread_subscriptions()).await
3423 }
3424
3425 pub(crate) async fn push_condition_room_ctx_internal(
3432 &self,
3433 with_threads_subscriptions: bool,
3434 ) -> Result<Option<PushConditionRoomCtx>> {
3435 let room_id = self.room_id();
3436 let user_id = self.own_user_id();
3437 let room_info = self.clone_info();
3438 let member_count = room_info.active_members_count();
3439
3440 let user_display_name = if let Some(member) = self.get_member_no_sync(user_id).await? {
3441 member.name().to_owned()
3442 } else {
3443 return Ok(None);
3444 };
3445
3446 let power_levels = match self.power_levels().await {
3447 Ok(power_levels) => Some(power_levels.into()),
3448 Err(error) => {
3449 if matches!(room_info.state(), RoomState::Joined) {
3450 error!("Could not compute power levels for push conditions: {error}");
3453 }
3454 None
3455 }
3456 };
3457
3458 let mut ctx = assign!(PushConditionRoomCtx::new(
3459 room_id.to_owned(),
3460 UInt::new(member_count).unwrap_or(UInt::MAX),
3461 user_id.to_owned(),
3462 user_display_name,
3463 ),
3464 {
3465 power_levels,
3466 });
3467
3468 if with_threads_subscriptions {
3469 let this = self.clone();
3470 ctx = ctx.with_has_thread_subscription_fn(move |event_id: &EventId| {
3471 let room = this.clone();
3472 Box::pin(async move {
3473 if let Ok(maybe_sub) = room.load_or_fetch_thread_subscription(event_id).await {
3474 maybe_sub.is_some()
3475 } else {
3476 false
3477 }
3478 })
3479 });
3480 }
3481
3482 Ok(Some(ctx))
3483 }
3484
3485 pub async fn push_context(&self) -> Result<Option<PushContext>> {
3488 self.push_context_internal(self.client.enabled_thread_subscriptions()).await
3489 }
3490
3491 #[instrument(skip(self))]
3495 pub(crate) async fn push_context_internal(
3496 &self,
3497 with_threads_subscriptions: bool,
3498 ) -> Result<Option<PushContext>> {
3499 let Some(push_condition_room_ctx) =
3500 self.push_condition_room_ctx_internal(with_threads_subscriptions).await?
3501 else {
3502 debug!("Could not aggregate push context");
3503 return Ok(None);
3504 };
3505 let push_rules = self.client().account().push_rules().await?;
3506 Ok(Some(PushContext::new(push_condition_room_ctx, push_rules)))
3507 }
3508
3509 pub async fn event_push_actions<T>(&self, event: &Raw<T>) -> Result<Option<Vec<Action>>> {
3514 if let Some(ctx) = self.push_context().await? {
3515 Ok(Some(ctx.for_event(event).await))
3516 } else {
3517 Ok(None)
3518 }
3519 }
3520
3521 pub async fn invite_details(&self) -> Result<Invite> {
3524 let state = self.state();
3525
3526 if state != RoomState::Invited {
3527 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Invited", state))));
3528 }
3529
3530 let invitee = self
3531 .get_member_no_sync(self.own_user_id())
3532 .await?
3533 .ok_or_else(|| Error::UnknownError(Box::new(InvitationError::EventMissing)))?;
3534 let event = invitee.event();
3535 let inviter_id = event.sender();
3536 let inviter = self.get_member_no_sync(inviter_id).await?;
3537 Ok(Invite { invitee, inviter })
3538 }
3539
3540 pub async fn member_with_sender_info(
3548 &self,
3549 user_id: &UserId,
3550 ) -> Result<RoomMemberWithSenderInfo> {
3551 let Some(member) = self.get_member_no_sync(user_id).await? else {
3552 return Err(Error::InsufficientData);
3553 };
3554
3555 let sender_member =
3556 if let Some(member) = self.get_member_no_sync(member.event().sender()).await? {
3557 Some(member)
3559 } else if self.are_members_synced() {
3560 None
3562 } else if self.sync_members().await.is_ok() {
3563 self.get_member_no_sync(member.event().sender()).await?
3565 } else {
3566 None
3567 };
3568
3569 Ok(RoomMemberWithSenderInfo { room_member: member, sender_info: sender_member })
3570 }
3571
3572 pub async fn forget(&self) -> Result<()> {
3578 let state = self.state();
3579 match state {
3580 RoomState::Joined | RoomState::Invited | RoomState::Knocked => {
3581 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new(
3582 "Left / Banned",
3583 state,
3584 ))));
3585 }
3586 RoomState::Left | RoomState::Banned => {}
3587 }
3588
3589 let request = forget_room::v3::Request::new(self.inner.room_id().to_owned());
3590 let _response = self.client.send(request).await?;
3591
3592 if self.inner.direct_targets_length() != 0
3594 && let Err(e) = self.set_is_direct(false).await
3595 {
3596 warn!(room_id = ?self.room_id(), "failed to remove room from m.direct account data: {e}");
3599 }
3600
3601 self.client.base_client().forget_room(self.inner.room_id()).await?;
3602
3603 Ok(())
3604 }
3605
3606 fn ensure_room_joined(&self) -> Result<()> {
3607 let state = self.state();
3608 if state == RoomState::Joined {
3609 Ok(())
3610 } else {
3611 Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))))
3612 }
3613 }
3614
3615 pub async fn notification_mode(&self) -> Option<RoomNotificationMode> {
3617 if !matches!(self.state(), RoomState::Joined) {
3618 return None;
3619 }
3620
3621 let notification_settings = self.client().notification_settings().await;
3622
3623 let notification_mode =
3625 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3626
3627 if notification_mode.is_some() {
3628 notification_mode
3629 } else if let Ok(is_encrypted) =
3630 self.latest_encryption_state().await.map(|state| state.is_encrypted())
3631 {
3632 let is_one_to_one = IsOneToOne::from(self.active_members_count() == 2);
3637 let default_mode = notification_settings
3638 .get_default_room_notification_mode(IsEncrypted::from(is_encrypted), is_one_to_one)
3639 .await;
3640 Some(default_mode)
3641 } else {
3642 None
3643 }
3644 }
3645
3646 pub async fn user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
3657 if !matches!(self.state(), RoomState::Joined) {
3658 return None;
3659 }
3660
3661 let notification_settings = self.client().notification_settings().await;
3662
3663 let mode =
3665 notification_settings.get_user_defined_room_notification_mode(self.room_id()).await;
3666
3667 if let Some(mode) = mode {
3668 self.update_cached_user_defined_notification_mode(mode);
3669 }
3670
3671 mode
3672 }
3673
3674 pub async fn report_content(
3687 &self,
3688 event_id: OwnedEventId,
3689 score: Option<ReportedContentScore>,
3690 reason: Option<String>,
3691 ) -> Result<report_content::v3::Response> {
3692 let state = self.state();
3693 if state != RoomState::Joined {
3694 return Err(Error::WrongRoomState(Box::new(WrongRoomState::new("Joined", state))));
3695 }
3696
3697 let request = report_content::v3::Request::new(
3698 self.inner.room_id().to_owned(),
3699 event_id,
3700 score.map(Into::into),
3701 reason,
3702 );
3703 Ok(self.client.send(request).await?)
3704 }
3705
3706 pub async fn report_room(&self, reason: String) -> Result<report_room::v3::Response> {
3717 let request = report_room::v3::Request::new(self.inner.room_id().to_owned(), reason);
3718
3719 Ok(self.client.send(request).await?)
3720 }
3721
3722 pub async fn set_unread_flag(&self, unread: bool) -> Result<()> {
3728 if self.is_marked_unread() == unread {
3729 return Ok(());
3731 }
3732
3733 let user_id = self.client.user_id().ok_or(Error::AuthenticationRequired)?;
3734
3735 let content = MarkedUnreadEventContent::new(unread);
3736
3737 let request = set_room_account_data::v3::Request::new(
3738 user_id.to_owned(),
3739 self.inner.room_id().to_owned(),
3740 &content,
3741 )?;
3742
3743 self.client.send(request).await?;
3744 Ok(())
3745 }
3746
3747 pub async fn event_cache(
3750 &self,
3751 ) -> event_cache::Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
3752 self.client.event_cache().for_room(self.room_id()).await
3753 }
3754
3755 pub(crate) async fn get_user_beacon_info(
3762 &self,
3763 user_id: &UserId,
3764 ) -> Result<OriginalSyncStateEvent<BeaconInfoEventContent>, BeaconError> {
3765 let raw_event = self
3766 .get_state_event_static_for_key::<BeaconInfoEventContent, _>(user_id)
3767 .await?
3768 .ok_or(BeaconError::NotFound)?;
3769
3770 match raw_event.deserialize()? {
3771 SyncOrStrippedState::Sync(SyncStateEvent::Original(beacon_info)) => Ok(beacon_info),
3772 SyncOrStrippedState::Sync(SyncStateEvent::Redacted(_)) => Err(BeaconError::Redacted),
3773 SyncOrStrippedState::Stripped(_) => Err(BeaconError::Stripped),
3774 }
3775 }
3776
3777 pub async fn start_live_location_share(
3790 &self,
3791 duration_millis: u64,
3792 description: Option<String>,
3793 ) -> Result<send_state_event::v3::Response> {
3794 self.ensure_room_joined()?;
3795
3796 self.send_state_event_for_key(
3797 self.own_user_id(),
3798 BeaconInfoEventContent::new(
3799 description,
3800 Duration::from_millis(duration_millis),
3801 true,
3802 None,
3803 ),
3804 )
3805 .await
3806 }
3807
3808 pub async fn stop_live_location_share(
3815 &self,
3816 ) -> Result<send_state_event::v3::Response, BeaconError> {
3817 self.ensure_room_joined()?;
3818
3819 let mut beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3820 beacon_info_event.content.stop();
3821 Ok(self.send_state_event_for_key(self.own_user_id(), beacon_info_event.content).await?)
3822 }
3823
3824 pub async fn send_location_beacon(
3836 &self,
3837 geo_uri: String,
3838 ) -> Result<send_message_event::v3::Response, BeaconError> {
3839 self.ensure_room_joined()?;
3840
3841 let beacon_info_event = self.get_user_beacon_info(self.own_user_id()).await?;
3842
3843 if beacon_info_event.content.is_live() {
3844 let content = BeaconEventContent::new(beacon_info_event.event_id, geo_uri, None);
3845 Ok(self.send(content).await?)
3846 } else {
3847 Err(BeaconError::NotLive)
3848 }
3849 }
3850
3851 pub async fn save_composer_draft(
3854 &self,
3855 draft: ComposerDraft,
3856 thread_root: Option<&EventId>,
3857 ) -> Result<()> {
3858 self.client
3859 .state_store()
3860 .set_kv_data(
3861 StateStoreDataKey::ComposerDraft(self.room_id(), thread_root),
3862 StateStoreDataValue::ComposerDraft(draft),
3863 )
3864 .await?;
3865 Ok(())
3866 }
3867
3868 pub async fn load_composer_draft(
3871 &self,
3872 thread_root: Option<&EventId>,
3873 ) -> Result<Option<ComposerDraft>> {
3874 let data = self
3875 .client
3876 .state_store()
3877 .get_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3878 .await?;
3879 Ok(data.and_then(|d| d.into_composer_draft()))
3880 }
3881
3882 pub async fn clear_composer_draft(&self, thread_root: Option<&EventId>) -> Result<()> {
3885 self.client
3886 .state_store()
3887 .remove_kv_data(StateStoreDataKey::ComposerDraft(self.room_id(), thread_root))
3888 .await?;
3889 Ok(())
3890 }
3891
3892 pub async fn load_pinned_events(&self) -> Result<Option<Vec<OwnedEventId>>> {
3895 let response = self
3896 .client
3897 .send(get_state_event_for_key::v3::Request::new(
3898 self.room_id().to_owned(),
3899 StateEventType::RoomPinnedEvents,
3900 "".to_owned(),
3901 ))
3902 .await;
3903
3904 match response {
3905 Ok(response) => Ok(Some(
3906 response
3907 .into_content()
3908 .deserialize_as_unchecked::<RoomPinnedEventsEventContent>()?
3909 .pinned,
3910 )),
3911 Err(http_error) => match http_error.as_client_api_error() {
3912 Some(error) if error.status_code == StatusCode::NOT_FOUND => Ok(None),
3913 _ => Err(http_error.into()),
3914 },
3915 }
3916 }
3917
3918 pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
3926 ObservableLiveLocation::new(&self.client, self.room_id())
3927 }
3928
3929 pub async fn subscribe_to_knock_requests(
3943 &self,
3944 ) -> Result<(impl Stream<Item = Vec<KnockRequest>> + use<>, JoinHandle<()>)> {
3945 let this = Arc::new(self.clone());
3946
3947 let room_member_events_observer =
3948 self.client.observe_room_events::<SyncRoomMemberEvent, (Client, Room)>(this.room_id());
3949
3950 let current_seen_ids = self.get_seen_knock_request_ids().await?;
3951 let mut seen_request_ids_stream = self
3952 .seen_knock_request_ids_map
3953 .subscribe()
3954 .await
3955 .map(|values| values.unwrap_or_default());
3956
3957 let mut room_info_stream = self.subscribe_info();
3958
3959 let clear_seen_ids_handle = spawn({
3962 let this = self.clone();
3963 async move {
3964 let mut member_updates_stream = this.room_member_updates_sender.subscribe();
3965 while member_updates_stream.recv().await.is_ok() {
3966 if let Err(err) = this.remove_outdated_seen_knock_requests_ids().await {
3968 warn!("Failed to remove seen knock requests: {err}")
3969 }
3970 }
3971 }
3972 });
3973
3974 let combined_stream = stream! {
3975 match this.get_current_join_requests(¤t_seen_ids).await {
3977 Ok(initial_requests) => yield initial_requests,
3978 Err(err) => warn!("Failed to get initial requests to join: {err}")
3979 }
3980
3981 let mut requests_stream = room_member_events_observer.subscribe();
3982 let mut seen_ids = current_seen_ids.clone();
3983
3984 loop {
3985 tokio::select! {
3988 Some((event, _)) = requests_stream.next() => {
3989 if let Some(event) = event.as_original() {
3990 let emit = if event.prev_content().is_some() {
3992 matches!(event.membership_change(),
3993 MembershipChange::Banned |
3994 MembershipChange::Knocked |
3995 MembershipChange::KnockAccepted |
3996 MembershipChange::KnockDenied |
3997 MembershipChange::KnockRetracted
3998 )
3999 } else {
4000 true
4003 };
4004
4005 if emit {
4006 match this.get_current_join_requests(&seen_ids).await {
4007 Ok(requests) => yield requests,
4008 Err(err) => {
4009 warn!("Failed to get updated knock requests on new member event: {err}")
4010 }
4011 }
4012 }
4013 }
4014 }
4015
4016 Some(new_seen_ids) = seen_request_ids_stream.next() => {
4017 seen_ids = new_seen_ids;
4019
4020 match this.get_current_join_requests(&seen_ids).await {
4023 Ok(requests) => yield requests,
4024 Err(err) => {
4025 warn!("Failed to get updated knock requests on seen ids changed: {err}")
4026 }
4027 }
4028 }
4029
4030 Some(room_info) = room_info_stream.next() => {
4031 if !room_info.are_members_synced() {
4034 match this.get_current_join_requests(&seen_ids).await {
4035 Ok(requests) => yield requests,
4036 Err(err) => {
4037 warn!("Failed to get updated knock requests on gappy (limited) sync: {err}")
4038 }
4039 }
4040 }
4041 }
4042 else => break,
4044 }
4045 }
4046 };
4047
4048 Ok((combined_stream, clear_seen_ids_handle))
4049 }
4050
4051 async fn get_current_join_requests(
4052 &self,
4053 seen_request_ids: &BTreeMap<OwnedEventId, OwnedUserId>,
4054 ) -> Result<Vec<KnockRequest>> {
4055 Ok(self
4056 .members(RoomMemberships::KNOCK)
4057 .await?
4058 .into_iter()
4059 .filter_map(|member| {
4060 let event_id = member.event().event_id()?;
4061 Some(KnockRequest::new(
4062 self,
4063 event_id,
4064 member.event().timestamp(),
4065 KnockRequestMemberInfo::from_member(&member),
4066 seen_request_ids.contains_key(event_id),
4067 ))
4068 })
4069 .collect())
4070 }
4071
4072 pub fn privacy_settings(&self) -> RoomPrivacySettings<'_> {
4074 RoomPrivacySettings::new(&self.inner, &self.client)
4075 }
4076
4077 pub async fn list_threads(&self, opts: ListThreadsOptions) -> Result<ThreadRoots> {
4085 let request = opts.into_request(self.room_id());
4086
4087 let response = self.client.send(request).await?;
4088
4089 let push_ctx = self.push_context().await?;
4090 let chunk = join_all(
4091 response.chunk.into_iter().map(|ev| self.try_decrypt_event(ev, push_ctx.as_ref())),
4092 )
4093 .await;
4094
4095 Ok(ThreadRoots { chunk, prev_batch_token: response.next_batch })
4096 }
4097
4098 pub async fn relations(
4112 &self,
4113 event_id: OwnedEventId,
4114 opts: RelationsOptions,
4115 ) -> Result<Relations> {
4116 opts.send(self, event_id).await
4117 }
4118
4119 #[cfg(feature = "experimental-search")]
4122 pub async fn search(
4123 &self,
4124 query: &str,
4125 max_number_of_results: usize,
4126 pagination_offset: Option<usize>,
4127 ) -> Result<Vec<OwnedEventId>, IndexError> {
4128 let mut search_index_guard = self.client.search_index().lock().await;
4129 search_index_guard.search(query, max_number_of_results, pagination_offset, self.room_id())
4130 }
4131
4132 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4154 pub async fn subscribe_thread(
4155 &self,
4156 thread_root: OwnedEventId,
4157 automatic: Option<OwnedEventId>,
4158 ) -> Result<()> {
4159 let is_automatic = automatic.is_some();
4160
4161 match self
4162 .client
4163 .send(subscribe_thread::unstable::Request::new(
4164 self.room_id().to_owned(),
4165 thread_root.clone(),
4166 automatic,
4167 ))
4168 .await
4169 {
4170 Ok(_response) => {
4171 trace!("Server acknowledged the thread subscription; saving in db");
4172
4173 self.client
4175 .state_store()
4176 .upsert_thread_subscription(
4177 self.room_id(),
4178 &thread_root,
4179 StoredThreadSubscription {
4180 status: ThreadSubscriptionStatus::Subscribed {
4181 automatic: is_automatic,
4182 },
4183 bump_stamp: None,
4184 },
4185 )
4186 .await?;
4187
4188 Ok(())
4189 }
4190
4191 Err(err) => {
4192 if let Some(ErrorKind::ConflictingUnsubscription) = err.client_api_error_kind() {
4193 trace!("Thread subscription skipped: {err}");
4198 Ok(())
4199 } else {
4200 Err(err.into())
4202 }
4203 }
4204 }
4205 }
4206
4207 pub async fn subscribe_thread_if_needed(
4213 &self,
4214 thread_root: &EventId,
4215 automatic: Option<OwnedEventId>,
4216 ) -> Result<()> {
4217 if let Some(prev_sub) = self.load_or_fetch_thread_subscription(thread_root).await? {
4218 if !prev_sub.automatic || automatic.is_some() {
4221 return Ok(());
4224 }
4225 }
4226 self.subscribe_thread(thread_root.to_owned(), automatic).await
4227 }
4228
4229 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4241 pub async fn unsubscribe_thread(&self, thread_root: OwnedEventId) -> Result<()> {
4242 self.client
4243 .send(unsubscribe_thread::unstable::Request::new(
4244 self.room_id().to_owned(),
4245 thread_root.clone(),
4246 ))
4247 .await?;
4248
4249 trace!("Server acknowledged the thread subscription removal; removed it from db too");
4250
4251 self.client
4253 .state_store()
4254 .upsert_thread_subscription(
4255 self.room_id(),
4256 &thread_root,
4257 StoredThreadSubscription {
4258 status: ThreadSubscriptionStatus::Unsubscribed,
4259 bump_stamp: None,
4260 },
4261 )
4262 .await?;
4263
4264 Ok(())
4265 }
4266
4267 #[instrument(skip(self), fields(room_id = %self.room_id()))]
4284 pub async fn fetch_thread_subscription(
4285 &self,
4286 thread_root: OwnedEventId,
4287 ) -> Result<Option<ThreadSubscription>> {
4288 let result = self
4289 .client
4290 .send(get_thread_subscription::unstable::Request::new(
4291 self.room_id().to_owned(),
4292 thread_root.clone(),
4293 ))
4294 .await;
4295
4296 let subscription = match result {
4297 Ok(response) => Some(ThreadSubscription { automatic: response.automatic }),
4298 Err(http_error) => match http_error.as_client_api_error() {
4299 Some(error) if error.status_code == StatusCode::NOT_FOUND => None,
4300 _ => return Err(http_error.into()),
4301 },
4302 };
4303
4304 if let Some(sub) = &subscription {
4306 self.client
4307 .state_store()
4308 .upsert_thread_subscription(
4309 self.room_id(),
4310 &thread_root,
4311 StoredThreadSubscription {
4312 status: ThreadSubscriptionStatus::Subscribed { automatic: sub.automatic },
4313 bump_stamp: None,
4314 },
4315 )
4316 .await?;
4317 } else {
4318 self.client
4320 .state_store()
4321 .remove_thread_subscription(self.room_id(), &thread_root)
4322 .await?;
4323 }
4324
4325 Ok(subscription)
4326 }
4327
4328 pub async fn load_or_fetch_thread_subscription(
4335 &self,
4336 thread_root: &EventId,
4337 ) -> Result<Option<ThreadSubscription>> {
4338 if self.client.thread_subscription_catchup().is_outdated() {
4340 return self.fetch_thread_subscription(thread_root.to_owned()).await;
4341 }
4342
4343 Ok(self
4345 .client
4346 .state_store()
4347 .load_thread_subscription(self.room_id(), thread_root)
4348 .await
4349 .map(|maybe_sub| {
4350 maybe_sub.and_then(|stored| match stored.status {
4351 ThreadSubscriptionStatus::Unsubscribed => None,
4352 ThreadSubscriptionStatus::Subscribed { automatic } => {
4353 Some(ThreadSubscription { automatic })
4354 }
4355 })
4356 })?)
4357 }
4358}
4359
4360#[cfg(feature = "e2e-encryption")]
4361impl RoomIdentityProvider for Room {
4362 fn is_member<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, bool> {
4363 Box::pin(async { self.get_member(user_id).await.unwrap_or(None).is_some() })
4364 }
4365
4366 fn member_identities(&self) -> BoxFuture<'_, Vec<UserIdentity>> {
4367 Box::pin(async {
4368 let members = self
4369 .members(RoomMemberships::JOIN | RoomMemberships::INVITE)
4370 .await
4371 .unwrap_or_else(|_| Default::default());
4372
4373 let mut ret: Vec<UserIdentity> = Vec::new();
4374 for member in members {
4375 if let Some(i) = self.user_identity(member.user_id()).await {
4376 ret.push(i);
4377 }
4378 }
4379 ret
4380 })
4381 }
4382
4383 fn user_identity<'a>(&'a self, user_id: &'a UserId) -> BoxFuture<'a, Option<UserIdentity>> {
4384 Box::pin(async {
4385 self.client
4386 .encryption()
4387 .get_user_identity(user_id)
4388 .await
4389 .unwrap_or(None)
4390 .map(|u| u.underlying_identity())
4391 })
4392 }
4393}
4394
4395#[derive(Clone, Debug)]
4398pub(crate) struct WeakRoom {
4399 client: WeakClient,
4400 room_id: OwnedRoomId,
4401}
4402
4403impl WeakRoom {
4404 pub fn new(client: WeakClient, room_id: OwnedRoomId) -> Self {
4406 Self { client, room_id }
4407 }
4408
4409 pub fn get(&self) -> Option<Room> {
4411 self.client.get().and_then(|client| client.get_room(&self.room_id))
4412 }
4413
4414 pub fn room_id(&self) -> &RoomId {
4416 &self.room_id
4417 }
4418}
4419
4420#[derive(Debug, Clone)]
4422pub struct Invite {
4423 pub invitee: RoomMember,
4425 pub inviter: Option<RoomMember>,
4427}
4428
4429#[derive(Error, Debug)]
4430enum InvitationError {
4431 #[error("No membership event found")]
4432 EventMissing,
4433}
4434
4435#[derive(Debug, Clone, Default)]
4437#[non_exhaustive]
4438pub struct Receipts {
4439 pub fully_read: Option<OwnedEventId>,
4441 pub public_read_receipt: Option<OwnedEventId>,
4443 pub private_read_receipt: Option<OwnedEventId>,
4445}
4446
4447impl Receipts {
4448 pub fn new() -> Self {
4450 Self::default()
4451 }
4452
4453 pub fn fully_read_marker(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4462 self.fully_read = event_id.into();
4463 self
4464 }
4465
4466 pub fn public_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4472 self.public_read_receipt = event_id.into();
4473 self
4474 }
4475
4476 pub fn private_read_receipt(mut self, event_id: impl Into<Option<OwnedEventId>>) -> Self {
4480 self.private_read_receipt = event_id.into();
4481 self
4482 }
4483
4484 pub fn is_empty(&self) -> bool {
4486 self.fully_read.is_none()
4487 && self.public_read_receipt.is_none()
4488 && self.private_read_receipt.is_none()
4489 }
4490}
4491
4492#[derive(Debug)]
4495pub enum ParentSpace {
4496 Reciprocal(Room),
4499 WithPowerlevel(Room),
4504 Illegitimate(Room),
4507 Unverifiable(OwnedRoomId),
4510}
4511
4512#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
4516pub struct ReportedContentScore(i8);
4517
4518impl ReportedContentScore {
4519 pub const MIN: Self = Self(-100);
4523
4524 pub const MAX: Self = Self(0);
4528
4529 pub fn new(value: i8) -> Option<Self> {
4538 value.try_into().ok()
4539 }
4540
4541 pub fn new_saturating(value: i8) -> Self {
4547 if value > Self::MAX {
4548 Self::MAX
4549 } else if value < Self::MIN {
4550 Self::MIN
4551 } else {
4552 Self(value)
4553 }
4554 }
4555
4556 pub fn value(&self) -> i8 {
4558 self.0
4559 }
4560}
4561
4562impl PartialEq<i8> for ReportedContentScore {
4563 fn eq(&self, other: &i8) -> bool {
4564 self.0.eq(other)
4565 }
4566}
4567
4568impl PartialEq<ReportedContentScore> for i8 {
4569 fn eq(&self, other: &ReportedContentScore) -> bool {
4570 self.eq(&other.0)
4571 }
4572}
4573
4574impl PartialOrd<i8> for ReportedContentScore {
4575 fn partial_cmp(&self, other: &i8) -> Option<std::cmp::Ordering> {
4576 self.0.partial_cmp(other)
4577 }
4578}
4579
4580impl PartialOrd<ReportedContentScore> for i8 {
4581 fn partial_cmp(&self, other: &ReportedContentScore) -> Option<std::cmp::Ordering> {
4582 self.partial_cmp(&other.0)
4583 }
4584}
4585
4586impl From<ReportedContentScore> for Int {
4587 fn from(value: ReportedContentScore) -> Self {
4588 value.0.into()
4589 }
4590}
4591
4592impl TryFrom<i8> for ReportedContentScore {
4593 type Error = TryFromReportedContentScoreError;
4594
4595 fn try_from(value: i8) -> std::prelude::v1::Result<Self, Self::Error> {
4596 if value > Self::MAX || value < Self::MIN {
4597 Err(TryFromReportedContentScoreError(()))
4598 } else {
4599 Ok(Self(value))
4600 }
4601 }
4602}
4603
4604impl TryFrom<i16> for ReportedContentScore {
4605 type Error = TryFromReportedContentScoreError;
4606
4607 fn try_from(value: i16) -> std::prelude::v1::Result<Self, Self::Error> {
4608 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4609 value.try_into()
4610 }
4611}
4612
4613impl TryFrom<i32> for ReportedContentScore {
4614 type Error = TryFromReportedContentScoreError;
4615
4616 fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
4617 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4618 value.try_into()
4619 }
4620}
4621
4622impl TryFrom<i64> for ReportedContentScore {
4623 type Error = TryFromReportedContentScoreError;
4624
4625 fn try_from(value: i64) -> std::prelude::v1::Result<Self, Self::Error> {
4626 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4627 value.try_into()
4628 }
4629}
4630
4631impl TryFrom<Int> for ReportedContentScore {
4632 type Error = TryFromReportedContentScoreError;
4633
4634 fn try_from(value: Int) -> std::prelude::v1::Result<Self, Self::Error> {
4635 let value = i8::try_from(value).map_err(|_| TryFromReportedContentScoreError(()))?;
4636 value.try_into()
4637 }
4638}
4639
4640trait EventSource {
4641 fn get_event(
4642 &self,
4643 event_id: &EventId,
4644 ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
4645}
4646
4647impl EventSource for &Room {
4648 async fn get_event(&self, event_id: &EventId) -> Result<TimelineEvent, Error> {
4649 self.load_or_fetch_event(event_id, None).await
4650 }
4651}
4652
4653#[derive(Debug, Clone, Error)]
4656#[error("out of range conversion attempted")]
4657pub struct TryFromReportedContentScoreError(());
4658
4659#[derive(Debug)]
4662pub struct RoomMemberWithSenderInfo {
4663 pub room_member: RoomMember,
4665 pub sender_info: Option<RoomMember>,
4668}
4669
4670#[cfg(all(test, not(target_family = "wasm")))]
4671mod tests {
4672 use std::collections::BTreeMap;
4673
4674 use matrix_sdk_base::{ComposerDraft, store::ComposerDraftType};
4675 use matrix_sdk_test::{
4676 JoinedRoomBuilder, StateTestEvent, SyncResponseBuilder, async_test,
4677 event_factory::EventFactory, test_json,
4678 };
4679 use ruma::{
4680 RoomVersionId, event_id,
4681 events::{relation::RelationType, room::member::MembershipState},
4682 int, owned_event_id, room_id, user_id,
4683 };
4684 use wiremock::{
4685 Mock, MockServer, ResponseTemplate,
4686 matchers::{header, method, path_regex},
4687 };
4688
4689 use super::ReportedContentScore;
4690 use crate::{
4691 Client,
4692 config::RequestConfig,
4693 room::messages::{IncludeRelations, ListThreadsOptions, RelationsOptions},
4694 test_utils::{
4695 client::mock_matrix_session,
4696 logged_in_client,
4697 mocks::{MatrixMockServer, RoomRelationsResponseTemplate},
4698 },
4699 };
4700
4701 #[cfg(all(feature = "sqlite", feature = "e2e-encryption"))]
4702 #[async_test]
4703 async fn test_cache_invalidation_while_encrypt() {
4704 use matrix_sdk_base::store::RoomLoadSettings;
4705 use matrix_sdk_test::{DEFAULT_TEST_ROOM_ID, message_like_event_content};
4706
4707 let sqlite_path = std::env::temp_dir().join("cache_invalidation_while_encrypt.db");
4708 let session = mock_matrix_session();
4709
4710 let client = Client::builder()
4711 .homeserver_url("http://localhost:1234")
4712 .request_config(RequestConfig::new().disable_retry())
4713 .sqlite_store(&sqlite_path, None)
4714 .build()
4715 .await
4716 .unwrap();
4717 client
4718 .matrix_auth()
4719 .restore_session(session.clone(), RoomLoadSettings::default())
4720 .await
4721 .unwrap();
4722
4723 client.encryption().enable_cross_process_store_lock("client1".to_owned()).await.unwrap();
4724
4725 let server = MockServer::start().await;
4727 {
4728 Mock::given(method("GET"))
4729 .and(path_regex(r"^/_matrix/client/r0/rooms/.*/state/m.*room.*encryption.?"))
4730 .and(header("authorization", "Bearer 1234"))
4731 .respond_with(
4732 ResponseTemplate::new(200)
4733 .set_body_json(&*test_json::sync_events::ENCRYPTION_CONTENT),
4734 )
4735 .mount(&server)
4736 .await;
4737 let response = SyncResponseBuilder::default()
4738 .add_joined_room(
4739 JoinedRoomBuilder::default()
4740 .add_state_event(StateTestEvent::Member)
4741 .add_state_event(StateTestEvent::PowerLevels)
4742 .add_state_event(StateTestEvent::Encryption),
4743 )
4744 .build_sync_response();
4745 client.base_client().receive_sync_response(response).await.unwrap();
4746 }
4747
4748 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4749
4750 room.preshare_room_key().await.unwrap();
4752
4753 {
4756 let client = Client::builder()
4757 .homeserver_url("http://localhost:1234")
4758 .request_config(RequestConfig::new().disable_retry())
4759 .sqlite_store(&sqlite_path, None)
4760 .build()
4761 .await
4762 .unwrap();
4763 client
4764 .matrix_auth()
4765 .restore_session(session.clone(), RoomLoadSettings::default())
4766 .await
4767 .unwrap();
4768 client
4769 .encryption()
4770 .enable_cross_process_store_lock("client2".to_owned())
4771 .await
4772 .unwrap();
4773
4774 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4775 assert!(guard.is_some());
4776 }
4777
4778 let guard = client.encryption().spin_lock_store(None).await.unwrap();
4780 assert!(guard.is_some());
4781
4782 let olm = client.olm_machine().await;
4784 let olm = olm.as_ref().expect("Olm machine wasn't started");
4785
4786 let _encrypted_content = olm
4789 .encrypt_room_event_raw(room.room_id(), "test-event", &message_like_event_content!({}))
4790 .await
4791 .unwrap();
4792 }
4793
4794 #[test]
4795 fn reported_content_score() {
4796 let score = ReportedContentScore::new(0).unwrap();
4798 assert_eq!(score.value(), 0);
4799 let score = ReportedContentScore::new(-50).unwrap();
4800 assert_eq!(score.value(), -50);
4801 let score = ReportedContentScore::new(-100).unwrap();
4802 assert_eq!(score.value(), -100);
4803 assert_eq!(ReportedContentScore::new(10), None);
4804 assert_eq!(ReportedContentScore::new(-110), None);
4805
4806 let score = ReportedContentScore::new_saturating(0);
4807 assert_eq!(score.value(), 0);
4808 let score = ReportedContentScore::new_saturating(-50);
4809 assert_eq!(score.value(), -50);
4810 let score = ReportedContentScore::new_saturating(-100);
4811 assert_eq!(score.value(), -100);
4812 let score = ReportedContentScore::new_saturating(10);
4813 assert_eq!(score, ReportedContentScore::MAX);
4814 let score = ReportedContentScore::new_saturating(-110);
4815 assert_eq!(score, ReportedContentScore::MIN);
4816
4817 let score = ReportedContentScore::try_from(0i16).unwrap();
4819 assert_eq!(score.value(), 0);
4820 let score = ReportedContentScore::try_from(-100i16).unwrap();
4821 assert_eq!(score.value(), -100);
4822 ReportedContentScore::try_from(10i16).unwrap_err();
4823 ReportedContentScore::try_from(-110i16).unwrap_err();
4824
4825 let score = ReportedContentScore::try_from(0i32).unwrap();
4827 assert_eq!(score.value(), 0);
4828 let score = ReportedContentScore::try_from(-100i32).unwrap();
4829 assert_eq!(score.value(), -100);
4830 ReportedContentScore::try_from(10i32).unwrap_err();
4831 ReportedContentScore::try_from(-110i32).unwrap_err();
4832
4833 let score = ReportedContentScore::try_from(0i64).unwrap();
4835 assert_eq!(score.value(), 0);
4836 let score = ReportedContentScore::try_from(-100i64).unwrap();
4837 assert_eq!(score.value(), -100);
4838 ReportedContentScore::try_from(10i64).unwrap_err();
4839 ReportedContentScore::try_from(-110i64).unwrap_err();
4840
4841 let score = ReportedContentScore::try_from(int!(0)).unwrap();
4843 assert_eq!(score.value(), 0);
4844 let score = ReportedContentScore::try_from(int!(-100)).unwrap();
4845 assert_eq!(score.value(), -100);
4846 ReportedContentScore::try_from(int!(10)).unwrap_err();
4847 ReportedContentScore::try_from(int!(-110)).unwrap_err();
4848 }
4849
4850 #[async_test]
4851 async fn test_composer_draft() {
4852 use matrix_sdk_test::DEFAULT_TEST_ROOM_ID;
4853
4854 let client = logged_in_client(None).await;
4855
4856 let response = SyncResponseBuilder::default()
4857 .add_joined_room(JoinedRoomBuilder::default())
4858 .build_sync_response();
4859 client.base_client().receive_sync_response(response).await.unwrap();
4860 let room = client.get_room(&DEFAULT_TEST_ROOM_ID).expect("Room should exist");
4861
4862 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4863
4864 let draft = ComposerDraft {
4867 plain_text: "Hello, world!".to_owned(),
4868 html_text: Some("<strong>Hello</strong>, world!".to_owned()),
4869 draft_type: ComposerDraftType::NewMessage,
4870 };
4871
4872 room.save_composer_draft(draft.clone(), None).await.unwrap();
4873
4874 let thread_root = owned_event_id!("$thread_root:b.c");
4875 let thread_draft = ComposerDraft {
4876 plain_text: "Hello, thread!".to_owned(),
4877 html_text: Some("<strong>Hello</strong>, thread!".to_owned()),
4878 draft_type: ComposerDraftType::NewMessage,
4879 };
4880
4881 room.save_composer_draft(thread_draft.clone(), Some(&thread_root)).await.unwrap();
4882
4883 assert_eq!(room.load_composer_draft(None).await.unwrap(), Some(draft));
4885
4886 assert_eq!(
4888 room.load_composer_draft(Some(&thread_root)).await.unwrap(),
4889 Some(thread_draft.clone())
4890 );
4891
4892 room.clear_composer_draft(None).await.unwrap();
4894 assert_eq!(room.load_composer_draft(None).await.unwrap(), None);
4895
4896 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), Some(thread_draft));
4898
4899 room.clear_composer_draft(Some(&thread_root)).await.unwrap();
4901 assert_eq!(room.load_composer_draft(Some(&thread_root)).await.unwrap(), None);
4902 }
4903
4904 #[async_test]
4905 async fn test_mark_join_requests_as_seen() {
4906 let server = MatrixMockServer::new().await;
4907 let client = server.client_builder().build().await;
4908 let event_id = event_id!("$a:b.c");
4909 let room_id = room_id!("!a:b.c");
4910 let user_id = user_id!("@alice:b.c");
4911
4912 let f = EventFactory::new().room(room_id);
4913 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
4914 f.member(user_id).membership(MembershipState::Knock).event_id(event_id).into_raw(),
4915 ]);
4916 let room = server.sync_room(&client, joined_room_builder).await;
4917
4918 let seen_ids =
4920 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4921 assert!(seen_ids.is_empty());
4922
4923 room.mark_knock_requests_as_seen(&[user_id.to_owned()])
4925 .await
4926 .expect("Couldn't mark join request as seen");
4927
4928 let seen_ids =
4930 room.get_seen_knock_request_ids().await.expect("Couldn't load seen join request ids");
4931 assert_eq!(seen_ids.len(), 1);
4932 assert_eq!(
4933 seen_ids.into_iter().next().expect("No next value"),
4934 (event_id.to_owned(), user_id.to_owned())
4935 )
4936 }
4937
4938 #[async_test]
4939 async fn test_own_room_membership_with_no_own_member_event() {
4940 let server = MatrixMockServer::new().await;
4941 let client = server.client_builder().build().await;
4942 let room_id = room_id!("!a:b.c");
4943
4944 let room = server.sync_joined_room(&client, room_id).await;
4945
4946 let error = room.member_with_sender_info(client.user_id().unwrap()).await.err();
4949 assert!(error.is_some());
4950 }
4951
4952 #[async_test]
4953 async fn test_own_room_membership_with_own_member_event_but_unknown_sender() {
4954 let server = MatrixMockServer::new().await;
4955 let client = server.client_builder().build().await;
4956 let room_id = room_id!("!a:b.c");
4957 let user_id = user_id!("@example:localhost");
4958
4959 let f = EventFactory::new().room(room_id).sender(user_id!("@alice:b.c"));
4960 let joined_room_builder =
4961 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4962 let room = server.sync_room(&client, joined_room_builder).await;
4963
4964 let ret = room
4966 .member_with_sender_info(client.user_id().unwrap())
4967 .await
4968 .expect("Room member info should be available");
4969
4970 assert_eq!(ret.room_member.event().user_id(), user_id);
4972
4973 assert!(ret.sender_info.is_none());
4975 }
4976
4977 #[async_test]
4978 async fn test_own_room_membership_with_own_member_event_and_own_sender() {
4979 let server = MatrixMockServer::new().await;
4980 let client = server.client_builder().build().await;
4981 let room_id = room_id!("!a:b.c");
4982 let user_id = user_id!("@example:localhost");
4983
4984 let f = EventFactory::new().room(room_id).sender(user_id);
4985 let joined_room_builder =
4986 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
4987 let room = server.sync_room(&client, joined_room_builder).await;
4988
4989 let ret = room
4991 .member_with_sender_info(client.user_id().unwrap())
4992 .await
4993 .expect("Room member info should be available");
4994
4995 assert_eq!(ret.room_member.event().user_id(), user_id);
4997
4998 assert!(ret.sender_info.is_some());
5000 assert_eq!(ret.sender_info.unwrap().event().user_id(), user_id);
5001 }
5002
5003 #[async_test]
5004 async fn test_own_room_membership_with_own_member_event_and_known_sender() {
5005 let server = MatrixMockServer::new().await;
5006 let client = server.client_builder().build().await;
5007 let room_id = room_id!("!a:b.c");
5008 let user_id = user_id!("@example:localhost");
5009 let sender_id = user_id!("@alice:b.c");
5010
5011 let f = EventFactory::new().room(room_id).sender(sender_id);
5012 let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
5013 f.member(user_id).into_raw(),
5014 f.member(sender_id).into_raw(),
5016 ]);
5017 let room = server.sync_room(&client, joined_room_builder).await;
5018
5019 let ret = room
5021 .member_with_sender_info(client.user_id().unwrap())
5022 .await
5023 .expect("Room member info should be available");
5024
5025 assert_eq!(ret.room_member.event().user_id(), user_id);
5027
5028 assert!(ret.sender_info.is_some());
5030 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5031 }
5032
5033 #[async_test]
5034 async fn test_own_room_membership_with_own_member_event_and_unknown_but_available_sender() {
5035 let server = MatrixMockServer::new().await;
5036 let client = server.client_builder().build().await;
5037 let room_id = room_id!("!a:b.c");
5038 let user_id = user_id!("@example:localhost");
5039 let sender_id = user_id!("@alice:b.c");
5040
5041 let f = EventFactory::new().room(room_id).sender(sender_id);
5042 let joined_room_builder =
5043 JoinedRoomBuilder::new(room_id).add_state_bulk(vec![f.member(user_id).into_raw()]);
5044 let room = server.sync_room(&client, joined_room_builder).await;
5045
5046 server
5048 .mock_get_members()
5049 .ok(vec![f.member(sender_id).into_raw()])
5050 .mock_once()
5051 .mount()
5052 .await;
5053
5054 let ret = room
5056 .member_with_sender_info(client.user_id().unwrap())
5057 .await
5058 .expect("Room member info should be available");
5059
5060 assert_eq!(ret.room_member.event().user_id(), user_id);
5062
5063 assert!(ret.sender_info.is_some());
5065 assert_eq!(ret.sender_info.unwrap().event().user_id(), sender_id);
5066 }
5067
5068 #[async_test]
5069 async fn test_list_threads() {
5070 let server = MatrixMockServer::new().await;
5071 let client = server.client_builder().build().await;
5072
5073 let room_id = room_id!("!a:b.c");
5074 let sender_id = user_id!("@alice:b.c");
5075 let f = EventFactory::new().room(room_id).sender(sender_id);
5076
5077 let eid1 = event_id!("$1");
5078 let eid2 = event_id!("$2");
5079 let batch1 = vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()];
5080 let batch2 = vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()];
5081
5082 server
5083 .mock_room_threads()
5084 .ok(batch1.clone(), Some("prev_batch".to_owned()))
5085 .mock_once()
5086 .mount()
5087 .await;
5088 server
5089 .mock_room_threads()
5090 .match_from("prev_batch")
5091 .ok(batch2, None)
5092 .mock_once()
5093 .mount()
5094 .await;
5095
5096 let room = server.sync_joined_room(&client, room_id).await;
5097 let result =
5098 room.list_threads(ListThreadsOptions::default()).await.expect("Failed to list threads");
5099 assert_eq!(result.chunk.len(), 1);
5100 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5101 assert!(result.prev_batch_token.is_some());
5102
5103 let opts = ListThreadsOptions { from: result.prev_batch_token, ..Default::default() };
5104 let result = room.list_threads(opts).await.expect("Failed to list threads");
5105 assert_eq!(result.chunk.len(), 1);
5106 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5107 assert!(result.prev_batch_token.is_none());
5108 }
5109
5110 #[async_test]
5111 async fn test_relations() {
5112 let server = MatrixMockServer::new().await;
5113 let client = server.client_builder().build().await;
5114
5115 let room_id = room_id!("!a:b.c");
5116 let sender_id = user_id!("@alice:b.c");
5117 let f = EventFactory::new().room(room_id).sender(sender_id);
5118
5119 let target_event_id = owned_event_id!("$target");
5120 let eid1 = event_id!("$1");
5121 let eid2 = event_id!("$2");
5122 let batch1 = vec![f.text_msg("Related event 1").event_id(eid1).into_raw()];
5123 let batch2 = vec![f.text_msg("Related event 2").event_id(eid2).into_raw()];
5124
5125 server
5126 .mock_room_relations()
5127 .match_target_event(target_event_id.clone())
5128 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5129 .mock_once()
5130 .mount()
5131 .await;
5132
5133 server
5134 .mock_room_relations()
5135 .match_target_event(target_event_id.clone())
5136 .match_from("next_batch")
5137 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5138 .mock_once()
5139 .mount()
5140 .await;
5141
5142 let room = server.sync_joined_room(&client, room_id).await;
5143
5144 let mut opts = RelationsOptions {
5146 include_relations: IncludeRelations::AllRelations,
5147 ..Default::default()
5148 };
5149 let result = room
5150 .relations(target_event_id.clone(), opts.clone())
5151 .await
5152 .expect("Failed to list relations the first time");
5153 assert_eq!(result.chunk.len(), 1);
5154 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5155 assert!(result.prev_batch_token.is_none());
5156 assert!(result.next_batch_token.is_some());
5157 assert!(result.recursion_depth.is_none());
5158
5159 opts.from = result.next_batch_token;
5160 let result = room
5161 .relations(target_event_id, opts)
5162 .await
5163 .expect("Failed to list relations the second time");
5164 assert_eq!(result.chunk.len(), 1);
5165 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5166 assert!(result.prev_batch_token.is_none());
5167 assert!(result.next_batch_token.is_none());
5168 assert!(result.recursion_depth.is_none());
5169 }
5170
5171 #[async_test]
5172 async fn test_relations_with_reltype() {
5173 let server = MatrixMockServer::new().await;
5174 let client = server.client_builder().build().await;
5175
5176 let room_id = room_id!("!a:b.c");
5177 let sender_id = user_id!("@alice:b.c");
5178 let f = EventFactory::new().room(room_id).sender(sender_id);
5179
5180 let target_event_id = owned_event_id!("$target");
5181 let eid1 = event_id!("$1");
5182 let eid2 = event_id!("$2");
5183 let batch1 = vec![f.text_msg("In-thread event 1").event_id(eid1).into_raw()];
5184 let batch2 = vec![f.text_msg("In-thread event 2").event_id(eid2).into_raw()];
5185
5186 server
5187 .mock_room_relations()
5188 .match_target_event(target_event_id.clone())
5189 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5190 .ok(RoomRelationsResponseTemplate::default().events(batch1).next_batch("next_batch"))
5191 .mock_once()
5192 .mount()
5193 .await;
5194
5195 server
5196 .mock_room_relations()
5197 .match_target_event(target_event_id.clone())
5198 .match_from("next_batch")
5199 .match_subrequest(IncludeRelations::RelationsOfType(RelationType::Thread))
5200 .ok(RoomRelationsResponseTemplate::default().events(batch2))
5201 .mock_once()
5202 .mount()
5203 .await;
5204
5205 let room = server.sync_joined_room(&client, room_id).await;
5206
5207 let mut opts = RelationsOptions {
5209 include_relations: IncludeRelations::RelationsOfType(RelationType::Thread),
5210 ..Default::default()
5211 };
5212 let result = room
5213 .relations(target_event_id.clone(), opts.clone())
5214 .await
5215 .expect("Failed to list relations the first time");
5216 assert_eq!(result.chunk.len(), 1);
5217 assert_eq!(result.chunk[0].event_id().unwrap(), eid1);
5218 assert!(result.prev_batch_token.is_none());
5219 assert!(result.next_batch_token.is_some());
5220 assert!(result.recursion_depth.is_none());
5221
5222 opts.from = result.next_batch_token;
5223 let result = room
5224 .relations(target_event_id, opts)
5225 .await
5226 .expect("Failed to list relations the second time");
5227 assert_eq!(result.chunk.len(), 1);
5228 assert_eq!(result.chunk[0].event_id().unwrap(), eid2);
5229 assert!(result.prev_batch_token.is_none());
5230 assert!(result.next_batch_token.is_none());
5231 assert!(result.recursion_depth.is_none());
5232 }
5233
5234 #[async_test]
5235 async fn test_power_levels_computation() {
5236 let server = MatrixMockServer::new().await;
5237 let client = server.client_builder().build().await;
5238
5239 let room_id = room_id!("!a:b.c");
5240 let sender_id = client.user_id().expect("No session id");
5241 let f = EventFactory::new().room(room_id).sender(sender_id);
5242 let mut user_map = BTreeMap::from([(sender_id.into(), 50.into())]);
5243
5244 let room_create_event = f.create(sender_id, RoomVersionId::V1).state_key("").into_raw();
5246 let power_levels_event = f.power_levels(&mut user_map).state_key("").into_raw();
5247 let room_member_event = f.member(sender_id).into_raw();
5248
5249 let room = server
5251 .sync_room(
5252 &client,
5253 JoinedRoomBuilder::new(room_id).add_state_bulk([room_member_event.clone()]),
5254 )
5255 .await;
5256 let ctx = room
5257 .push_condition_room_ctx()
5258 .await
5259 .expect("Failed to get push condition context")
5260 .expect("Could not get push condition context");
5261
5262 assert!(ctx.power_levels.is_none());
5264
5265 let room = server
5267 .sync_room(
5268 &client,
5269 JoinedRoomBuilder::new(room_id).add_state_bulk([room_create_event.clone()]),
5270 )
5271 .await;
5272 let ctx = room
5273 .push_condition_room_ctx()
5274 .await
5275 .expect("Failed to get push condition context")
5276 .expect("Could not get push condition context");
5277
5278 assert!(ctx.power_levels.is_none());
5280
5281 let room = server
5283 .sync_room(
5284 &client,
5285 JoinedRoomBuilder::new(room_id).add_state_bulk([power_levels_event]),
5286 )
5287 .await;
5288 let ctx = room
5289 .push_condition_room_ctx()
5290 .await
5291 .expect("Failed to get push condition context")
5292 .expect("Could not get push condition context");
5293
5294 assert!(ctx.power_levels.is_some());
5296 }
5297}