1#![allow(clippy::assign_op_pattern)] mod call;
18mod create;
19mod display_name;
20mod encryption;
21mod knock;
22mod latest_event;
23mod members;
24mod room_info;
25mod state;
26mod tags;
27mod tombstone;
28
29use std::collections::{BTreeMap, BTreeSet, HashSet};
30
31pub use call::CallIntentConsensus;
32pub use create::*;
33pub use display_name::{RoomDisplayName, RoomHero};
34pub(crate) use display_name::{RoomSummary, UpdatedRoomDisplayName};
35pub use encryption::EncryptionState;
36use eyeball::{AsyncLock, SharedObservable};
37use futures_util::{Stream, StreamExt};
38pub use members::{RoomMember, RoomMembersUpdate, RoomMemberships};
39pub(crate) use room_info::SyncInfo;
40pub use room_info::{
41 BaseRoomInfo, RoomInfo, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomRecencyStamp,
42 apply_redaction,
43};
44use ruma::{
45 EventId, OwnedEventId, OwnedMxcUri, OwnedRoomAliasId, OwnedRoomId, OwnedUserId, RoomId,
46 RoomVersionId, UserId,
47 events::{
48 direct::OwnedDirectUserIdentifier,
49 receipt::{Receipt, ReceiptThread, ReceiptType},
50 room::{
51 avatar,
52 guest_access::GuestAccess,
53 history_visibility::HistoryVisibility,
54 join_rules::JoinRule,
55 member::MembershipState,
56 power_levels::{RoomPowerLevels, RoomPowerLevelsEventContent, RoomPowerLevelsSource},
57 },
58 },
59 room::RoomType,
60};
61use serde::{Deserialize, Serialize};
62pub use state::{RoomState, RoomStateFilter};
63pub(crate) use tags::RoomNotableTags;
64use tokio::sync::broadcast;
65pub use tombstone::{PredecessorRoom, SuccessorRoom};
66use tracing::{info, instrument, trace, warn};
67
68use crate::{
69 DmRoomDefinition, Error, StateStore,
70 deserialized_responses::MemberEvent,
71 notification_settings::RoomNotificationMode,
72 read_receipts::RoomReadReceipts,
73 store::{Result as StoreResult, SaveLockedStateStore, StateStoreExt},
74 sync::UnreadNotificationsCount,
75};
76
77#[derive(Debug, Clone)]
80pub struct Room {
81 pub(super) room_id: OwnedRoomId,
83
84 pub(super) own_user_id: OwnedUserId,
86
87 pub(super) info: SharedObservable<RoomInfo>,
88
89 pub(super) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
93
94 pub(super) store: SaveLockedStateStore,
96
97 pub seen_knock_request_ids_map:
101 SharedObservable<Option<BTreeMap<OwnedEventId, OwnedUserId>>, AsyncLock>,
102
103 pub room_member_updates_sender: broadcast::Sender<RoomMembersUpdate>,
105}
106
107impl Room {
108 pub(crate) fn new(
109 own_user_id: &UserId,
110 store: SaveLockedStateStore,
111 room_id: &RoomId,
112 room_state: RoomState,
113 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
114 ) -> Self {
115 let room_info = RoomInfo::new(room_id, room_state);
116 Self::restore(own_user_id, store, room_info, room_info_notable_update_sender)
117 }
118
119 pub(crate) fn restore(
120 own_user_id: &UserId,
121 store: SaveLockedStateStore,
122 room_info: RoomInfo,
123 room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
124 ) -> Self {
125 let (room_member_updates_sender, _) = broadcast::channel(10);
126 Self {
127 own_user_id: own_user_id.into(),
128 room_id: room_info.room_id.clone(),
129 store,
130 info: SharedObservable::new(room_info),
131 room_info_notable_update_sender,
132 seen_knock_request_ids_map: SharedObservable::new_async(None),
133 room_member_updates_sender,
134 }
135 }
136
137 pub fn room_id(&self) -> &RoomId {
139 &self.room_id
140 }
141
142 pub fn creators(&self) -> Option<Vec<OwnedUserId>> {
144 self.info.read().creators()
145 }
146
147 pub fn own_user_id(&self) -> &UserId {
149 &self.own_user_id
150 }
151
152 pub fn is_space(&self) -> bool {
154 self.info.read().room_type().is_some_and(|t| *t == RoomType::Space)
155 }
156
157 pub fn is_call(&self) -> bool {
161 self.info.read().room_type().is_some_and(|t| *t == RoomType::Call)
162 }
163
164 pub fn room_type(&self) -> Option<RoomType> {
167 self.info.read().room_type().map(ToOwned::to_owned)
168 }
169
170 pub fn unread_notification_counts(&self) -> UnreadNotificationsCount {
179 self.info.read().notification_counts
180 }
181
182 pub fn num_unread_messages(&self) -> u64 {
187 self.info.read().read_receipts.num_unread
188 }
189
190 pub fn num_unread_notifications(&self) -> u64 {
195 self.info.read().read_receipts.num_notifications
196 }
197
198 pub fn num_unread_mentions(&self) -> u64 {
204 self.info.read().read_receipts.num_mentions
205 }
206
207 pub fn read_receipts(&self) -> RoomReadReceipts {
209 self.info.read().read_receipts.clone()
210 }
211
212 pub fn is_state_fully_synced(&self) -> bool {
220 self.info.read().sync_info == SyncInfo::FullySynced
221 }
222
223 pub fn is_state_partially_or_fully_synced(&self) -> bool {
227 self.info.read().sync_info != SyncInfo::NoState
228 }
229
230 pub fn last_prev_batch(&self) -> Option<String> {
233 self.info.read().last_prev_batch.clone()
234 }
235
236 pub fn avatar_url(&self) -> Option<OwnedMxcUri> {
238 self.info.read().avatar_url().map(ToOwned::to_owned)
239 }
240
241 pub fn avatar_info(&self) -> Option<avatar::ImageInfo> {
243 self.info.read().avatar_info().map(ToOwned::to_owned)
244 }
245
246 pub fn canonical_alias(&self) -> Option<OwnedRoomAliasId> {
248 self.info.read().canonical_alias().map(ToOwned::to_owned)
249 }
250
251 pub fn alt_aliases(&self) -> Vec<OwnedRoomAliasId> {
253 self.info.read().alt_aliases().to_owned()
254 }
255
256 pub fn create_content(&self) -> Option<RoomCreateWithCreatorEventContent> {
266 Some(self.info.read().base_info.create.as_ref()?.content.clone())
267 }
268
269 #[instrument(skip_all, fields(room_id = ?self.room_id))]
273 pub async fn is_direct(&self) -> StoreResult<bool> {
274 match self.state() {
275 RoomState::Joined | RoomState::Left | RoomState::Banned => {
276 Ok(!self.info.read().base_info.dm_targets.is_empty())
277 }
278
279 RoomState::Invited => {
280 let member = self.get_member(self.own_user_id()).await?;
281
282 match member {
283 None => {
284 info!("RoomMember not found for the user's own id");
285 Ok(false)
286 }
287 Some(member) => match member.event.as_ref() {
288 MemberEvent::Sync(_) => {
289 warn!("Got MemberEvent::Sync in an invited room");
290 Ok(false)
291 }
292 MemberEvent::Stripped(event) => {
293 Ok(event.content.is_direct.unwrap_or(false))
294 }
295 },
296 }
297 }
298
299 RoomState::Knocked => Ok(false),
301 }
302 }
303
304 pub async fn compute_is_dm(&self, dm_room_definition: &DmRoomDefinition) -> StoreResult<bool> {
307 let is_direct = self.is_direct().await?;
308
309 match *dm_room_definition {
310 DmRoomDefinition::MatrixSpec => Ok(is_direct),
311 DmRoomDefinition::TwoMembers => {
312 if !is_direct {
313 return Ok(false);
314 }
315 let active_service_member_count =
316 self.update_active_service_members().await?.unwrap_or_default().len() as u64;
317 let has_at_most_two_members =
318 self.active_members_count().saturating_sub(active_service_member_count) <= 2;
319 Ok(has_at_most_two_members)
320 }
321 }
322 }
323
324 pub fn direct_targets(&self) -> HashSet<OwnedDirectUserIdentifier> {
333 self.info.read().base_info.dm_targets.clone()
334 }
335
336 pub fn direct_targets_length(&self) -> usize {
339 self.info.read().base_info.dm_targets.len()
340 }
341
342 pub fn guest_access(&self) -> GuestAccess {
344 self.info.read().guest_access().clone()
345 }
346
347 pub fn history_visibility(&self) -> Option<HistoryVisibility> {
349 self.info.read().history_visibility().cloned()
350 }
351
352 pub fn history_visibility_or_default(&self) -> HistoryVisibility {
355 self.info.read().history_visibility_or_default().clone()
356 }
357
358 pub fn is_public(&self) -> Option<bool> {
362 self.info.read().join_rule().map(|join_rule| matches!(join_rule, JoinRule::Public))
363 }
364
365 pub fn join_rule(&self) -> Option<JoinRule> {
367 self.info.read().join_rule().cloned()
368 }
369
370 pub fn max_power_level(&self) -> i64 {
375 self.info.read().base_info.max_power_level
376 }
377
378 pub fn service_members(&self) -> Option<BTreeSet<OwnedUserId>> {
380 self.info.read().service_members().cloned()
381 }
382
383 pub async fn power_levels(&self) -> Result<RoomPowerLevels, Error> {
385 let power_levels_content = self
386 .store
387 .get_state_event_static::<RoomPowerLevelsEventContent>(self.room_id())
388 .await?
389 .ok_or(Error::InsufficientData)?
390 .deserialize()?;
391 let creators = self.creators().ok_or(Error::InsufficientData)?;
392 let rules = self.info.read().room_version_rules_or_default();
393
394 Ok(power_levels_content.power_levels(&rules.authorization, creators))
395 }
396
397 pub async fn power_levels_or_default(&self) -> RoomPowerLevels {
400 if let Ok(power_levels) = self.power_levels().await {
401 return power_levels;
402 }
403
404 let rules = self.info.read().room_version_rules_or_default();
406 RoomPowerLevels::new(
407 RoomPowerLevelsSource::None,
408 &rules.authorization,
409 self.creators().into_iter().flatten(),
410 )
411 }
412
413 pub fn name(&self) -> Option<String> {
418 self.info.read().name().map(ToOwned::to_owned)
419 }
420
421 pub fn topic(&self) -> Option<String> {
423 self.info.read().topic().map(ToOwned::to_owned)
424 }
425
426 pub fn update_cached_user_defined_notification_mode(&self, mode: RoomNotificationMode) {
432 self.info.update_if(|info| {
433 if info.cached_user_defined_notification_mode.as_ref() != Some(&mode) {
434 info.cached_user_defined_notification_mode = Some(mode);
435
436 true
437 } else {
438 false
439 }
440 });
441 }
442
443 pub fn cached_user_defined_notification_mode(&self) -> Option<RoomNotificationMode> {
448 self.info.read().cached_user_defined_notification_mode
449 }
450
451 pub fn clear_user_defined_notification_mode(&self) {
454 self.info.update_if(|info| {
455 if info.cached_user_defined_notification_mode.is_some() {
456 info.cached_user_defined_notification_mode = None;
457 true
458 } else {
459 false
460 }
461 })
462 }
463
464 pub async fn joined_user_ids(&self) -> StoreResult<Vec<OwnedUserId>> {
467 self.store.get_user_ids(self.room_id(), RoomMemberships::JOIN).await
468 }
469
470 pub fn heroes(&self) -> Vec<RoomHero> {
475 let guard = self.info.read();
476 let heroes = guard.heroes();
477
478 if let Some(service_members) = guard.service_members() {
479 heroes.iter().filter(|hero| !service_members.contains(&hero.user_id)).cloned().collect()
480 } else {
481 heroes.to_vec()
482 }
483 }
484
485 pub async fn load_user_receipt(
488 &self,
489 receipt_type: ReceiptType,
490 thread: ReceiptThread,
491 user_id: &UserId,
492 ) -> StoreResult<Option<(OwnedEventId, Receipt)>> {
493 self.store.get_user_room_receipt_event(self.room_id(), receipt_type, thread, user_id).await
494 }
495
496 pub async fn load_event_receipts(
500 &self,
501 receipt_type: ReceiptType,
502 thread: ReceiptThread,
503 event_id: &EventId,
504 ) -> StoreResult<Vec<(OwnedUserId, Receipt)>> {
505 self.store
506 .get_event_room_receipt_events(self.room_id(), receipt_type, thread, event_id)
507 .await
508 }
509
510 pub fn is_marked_unread(&self) -> bool {
513 self.info.read().base_info.is_marked_unread
514 }
515
516 pub fn fully_read_event_id(&self) -> Option<OwnedEventId> {
519 self.info.read().fully_read_event_id().map(ToOwned::to_owned)
520 }
521
522 pub fn version(&self) -> Option<RoomVersionId> {
524 self.info.read().room_version().cloned()
525 }
526
527 pub fn recency_stamp(&self) -> Option<RoomRecencyStamp> {
531 self.info.read().recency_stamp
532 }
533
534 pub fn pinned_event_ids_stream(&self) -> impl Stream<Item = Vec<OwnedEventId>> + use<> {
537 self.info
538 .subscribe()
539 .map(|i| i.base_info.pinned_events.and_then(|c| c.pinned).unwrap_or_default())
540 }
541
542 pub fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>> {
544 self.info.read().pinned_event_ids()
545 }
546
547 pub async fn update_active_service_members(&self) -> StoreResult<Option<Vec<RoomMember>>> {
551 if let Some(service_members) = self.service_members() {
552 let mut found = Vec::new();
553 for user_id in service_members {
554 match self.get_member(&user_id).await {
555 Ok(Some(member)) => {
556 if matches!(
558 member.membership(),
559 MembershipState::Join | MembershipState::Invite
560 ) {
561 found.push(member);
562 }
563 }
564 Ok(None) => (),
565 Err(error) => return Err(error),
566 }
567 }
568
569 trace!("Updating active service members ({}) in room {}", found.len(), self.room_id());
570
571 let new_active_service_member_count = found.len() as u64;
572 let current_active_service_member_count =
573 self.info.read().summary.active_service_members.unwrap_or_default();
574 if new_active_service_member_count != current_active_service_member_count {
575 self.update_and_save_room_info(|mut info| {
576 info.update_active_service_member_count(Some(new_active_service_member_count));
577 (info, RoomInfoNotableUpdateReasons::ACTIVE_SERVICE_MEMBERS)
578 })
579 .await?;
580 }
581
582 Ok(Some(found))
583 } else {
584 if self.info.read().summary.active_service_members.is_some() {
585 self.update_and_save_room_info(|mut info| {
586 info.update_active_service_member_count(None);
587 (info, RoomInfoNotableUpdateReasons::ACTIVE_SERVICE_MEMBERS)
588 })
589 .await?;
590 }
591 Ok(None)
592 }
593 }
594
595 #[instrument(skip_all, fields(room_id = ?self.room_id))]
599 pub async fn compute_joined_service_members(&self) -> StoreResult<Option<Vec<RoomMember>>> {
600 if !self.are_members_synced() {
601 trace!("Tried to compute joined service members in a room that is not synced");
602 return Ok(None);
603 }
604 if let Some(service_member_ids) = self.service_members() {
605 let mut ret = vec![];
606 for user_id in service_member_ids.iter() {
607 if let Some(member) = self.get_member(user_id).await.unwrap()
608 && matches!(member.membership(), MembershipState::Join)
609 {
610 trace!("Found a joined service member ({})", user_id);
611 ret.push(member);
612 } else {
613 trace!("Did not find a joined service member ({})", user_id);
614 }
615 }
616 trace!(
617 "Computed joined service members ({}) for service member count {}",
618 ret.len(),
619 service_member_ids.len()
620 );
621 Ok(Some(ret))
622 } else {
623 trace!("Tried to compute joined service members in a room that has no service members",);
624 Ok(None)
625 }
626 }
627
628 pub fn active_service_members_count(&self) -> Option<u64> {
631 self.info.read().summary.active_service_members
632 }
633}
634
635#[cfg(not(feature = "test-send-sync"))]
637unsafe impl Send for Room {}
638
639#[cfg(not(feature = "test-send-sync"))]
641unsafe impl Sync for Room {}
642
643#[cfg(feature = "test-send-sync")]
644#[test]
645fn test_send_sync_for_room() {
647 fn assert_send_sync<
648 T: matrix_sdk_common::SendOutsideWasm + matrix_sdk_common::SyncOutsideWasm,
649 >() {
650 }
651
652 assert_send_sync::<Room>();
653}
654
655#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
657pub(crate) enum AccountDataSource {
658 Stable,
660
661 #[default]
663 Unstable,
664}
665
666#[cfg(test)]
667mod tests {
668 use matrix_sdk_test::{
669 JoinedRoomBuilder, SyncResponseBuilder, async_test, event_factory::EventFactory,
670 };
671 use ruma::{room_id, user_id};
672 use serde_json::json;
673
674 use super::*;
675 use crate::test_utils::logged_in_base_client;
676
677 #[async_test]
678 async fn test_room_heroes_filters_out_service_members() {
679 let client = logged_in_base_client(None).await;
680 let user_id = &client.session_meta().unwrap().user_id;
681 let service_member_id = user_id!("@service:example.org");
682 let alice_id = user_id!("@alice:example.org");
683 let room_id = room_id!("!room:example.org");
684
685 let room = client.get_or_create_room(room_id, RoomState::Joined);
686
687 let mut sync_builder = SyncResponseBuilder::new();
689 let response = sync_builder
690 .add_joined_room(
691 JoinedRoomBuilder::new(room_id)
692 .set_room_summary(json!({
693 "m.joined_member_count": 3,
694 "m.invited_member_count": 0,
695 "m.heroes": [alice_id.to_owned(), service_member_id.to_owned()],
696 }))
697 .add_state_event(
698 EventFactory::new()
699 .sender(user_id)
700 .member_hints(BTreeSet::from([service_member_id.to_owned()])),
701 ),
702 )
703 .build_sync_response();
704
705 client.receive_sync_response(response).await.unwrap();
706
707 let heroes = room.heroes();
709 assert_eq!(heroes.len(), 1);
710 assert_eq!(heroes[0].user_id, alice_id);
711 }
712}