1#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19 collections::{BTreeMap, BTreeSet, HashMap},
20 fmt, iter,
21 ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29 store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, DecryptionSettings,
30 EncryptionSettings, EncryptionSyncChanges, OlmError, OlmMachine, RoomEventDecryptionResult,
31 TrustRequirement,
32};
33#[cfg(feature = "e2e-encryption")]
34use ruma::events::{
35 room::{history_visibility::HistoryVisibility, message::MessageType},
36 SyncMessageLikeEvent,
37};
38#[cfg(doc)]
39use ruma::DeviceId;
40use ruma::{
41 api::client as api,
42 events::{
43 ignored_user_list::IgnoredUserListEvent,
44 marked_unread::MarkedUnreadEventContent,
45 push_rules::{PushRulesEvent, PushRulesEventContent},
46 room::{
47 member::{MembershipState, RoomMemberEventContent, SyncRoomMemberEvent},
48 power_levels::{
49 RoomPowerLevelsEvent, RoomPowerLevelsEventContent, StrippedRoomPowerLevelsEvent,
50 },
51 },
52 AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent,
53 AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent,
54 GlobalAccountDataEventType, StateEvent, StateEventType, SyncStateEvent,
55 },
56 push::{Action, PushConditionRoomCtx, Ruleset},
57 serde::Raw,
58 time::Instant,
59 OwnedRoomId, OwnedUserId, RoomId, RoomVersionId, UInt, UserId,
60};
61use tokio::sync::{broadcast, Mutex};
62#[cfg(feature = "e2e-encryption")]
63use tokio::sync::{RwLock, RwLockReadGuard};
64use tracing::{debug, error, info, instrument, trace, warn};
65
66#[cfg(feature = "e2e-encryption")]
67use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
68#[cfg(feature = "e2e-encryption")]
69use crate::RoomMemberships;
70use crate::{
71 deserialized_responses::{DisplayName, RawAnySyncOrStrippedTimelineEvent, TimelineEvent},
72 error::{Error, Result},
73 event_cache::store::EventCacheStoreLock,
74 response_processors::AccountDataProcessor,
75 rooms::{
76 normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate},
77 Room, RoomInfo, RoomState,
78 },
79 store::{
80 ambiguity_map::AmbiguityCache, DynStateStore, MemoryStore, Result as StoreResult,
81 StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, Store, StoreConfig,
82 },
83 sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse, Timeline},
84 RoomStateFilter, SessionMeta,
85};
86
87#[derive(Clone)]
92pub struct BaseClient {
93 pub(crate) store: Store,
95
96 event_cache_store: EventCacheStoreLock,
98
99 #[cfg(feature = "e2e-encryption")]
104 crypto_store: Arc<DynCryptoStore>,
105
106 #[cfg(feature = "e2e-encryption")]
110 olm_machine: Arc<RwLock<Option<OlmMachine>>>,
111
112 pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
114
115 pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
118
119 #[cfg(feature = "e2e-encryption")]
122 pub room_key_recipient_strategy: CollectStrategy,
123
124 #[cfg(feature = "e2e-encryption")]
126 pub decryption_trust_requirement: TrustRequirement,
127
128 #[cfg(feature = "e2e-encryption")]
130 pub handle_verification_events: bool,
131}
132
133#[cfg(not(tarpaulin_include))]
134impl fmt::Debug for BaseClient {
135 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
136 f.debug_struct("BaseClient")
137 .field("session_meta", &self.store.session_meta())
138 .field("sync_token", &self.store.sync_token)
139 .finish_non_exhaustive()
140 }
141}
142
143impl BaseClient {
144 pub fn with_store_config(config: StoreConfig) -> Self {
151 let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
152 broadcast::channel(u16::MAX as usize);
153
154 BaseClient {
155 store: Store::new(config.state_store),
156 event_cache_store: config.event_cache_store,
157 #[cfg(feature = "e2e-encryption")]
158 crypto_store: config.crypto_store,
159 #[cfg(feature = "e2e-encryption")]
160 olm_machine: Default::default(),
161 ignore_user_list_changes: Default::default(),
162 room_info_notable_update_sender,
163 #[cfg(feature = "e2e-encryption")]
164 room_key_recipient_strategy: Default::default(),
165 #[cfg(feature = "e2e-encryption")]
166 decryption_trust_requirement: TrustRequirement::Untrusted,
167 #[cfg(feature = "e2e-encryption")]
168 handle_verification_events: true,
169 }
170 }
171
172 #[cfg(feature = "e2e-encryption")]
175 pub async fn clone_with_in_memory_state_store(
176 &self,
177 cross_process_store_locks_holder_name: &str,
178 handle_verification_events: bool,
179 ) -> Result<Self> {
180 let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
181 .state_store(MemoryStore::new());
182 let config = config.crypto_store(self.crypto_store.clone());
183
184 let copy = Self {
185 store: Store::new(config.state_store),
186 event_cache_store: config.event_cache_store,
187 crypto_store: self.crypto_store.clone(),
194 olm_machine: self.olm_machine.clone(),
195 ignore_user_list_changes: Default::default(),
196 room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
197 room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
198 decryption_trust_requirement: self.decryption_trust_requirement,
199 handle_verification_events,
200 };
201
202 if let Some(session_meta) = self.session_meta().cloned() {
203 copy.store
204 .set_session_meta(session_meta, ©.room_info_notable_update_sender)
205 .await?;
206 }
207
208 Ok(copy)
209 }
210
211 #[cfg(not(feature = "e2e-encryption"))]
214 #[allow(clippy::unused_async)]
215 pub async fn clone_with_in_memory_state_store(
216 &self,
217 cross_process_store_locks_holder: &str,
218 _handle_verification_events: bool,
219 ) -> Result<Self> {
220 let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
221 .state_store(MemoryStore::new());
222 Ok(Self::with_store_config(config))
223 }
224
225 pub fn session_meta(&self) -> Option<&SessionMeta> {
231 self.store.session_meta()
232 }
233
234 pub fn rooms(&self) -> Vec<Room> {
236 self.store.rooms()
237 }
238
239 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
241 self.store.rooms_filtered(filter)
242 }
243
244 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
247 self.store.rooms_stream()
248 }
249
250 pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
253 self.store.get_or_create_room(
254 room_id,
255 room_state,
256 self.room_info_notable_update_sender.clone(),
257 )
258 }
259
260 pub fn store(&self) -> &DynStateStore {
262 self.store.deref()
263 }
264
265 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
267 &self.event_cache_store
268 }
269
270 pub fn logged_in(&self) -> bool {
272 self.store.session_meta().is_some()
273 }
274
275 pub async fn set_session_meta(
296 &self,
297 session_meta: SessionMeta,
298 #[cfg(feature = "e2e-encryption")] custom_account: Option<
299 crate::crypto::vodozemac::olm::Account,
300 >,
301 ) -> Result<()> {
302 debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Restoring login");
303 self.store
304 .set_session_meta(session_meta.clone(), &self.room_info_notable_update_sender)
305 .await?;
306
307 #[cfg(feature = "e2e-encryption")]
308 self.regenerate_olm(custom_account).await?;
309
310 Ok(())
311 }
312
313 #[cfg(feature = "e2e-encryption")]
317 pub async fn regenerate_olm(
318 &self,
319 custom_account: Option<crate::crypto::vodozemac::olm::Account>,
320 ) -> Result<()> {
321 tracing::debug!("regenerating OlmMachine");
322 let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
323
324 let olm_machine = OlmMachine::with_store(
327 &session_meta.user_id,
328 &session_meta.device_id,
329 self.crypto_store.clone(),
330 custom_account,
331 )
332 .await
333 .map_err(OlmError::from)?;
334
335 *self.olm_machine.write().await = Some(olm_machine);
336 Ok(())
337 }
338
339 pub async fn sync_token(&self) -> Option<String> {
342 self.store.sync_token.read().await.clone()
343 }
344
345 #[cfg(feature = "e2e-encryption")]
346 async fn handle_verification_event(
347 &self,
348 event: &AnySyncMessageLikeEvent,
349 room_id: &RoomId,
350 ) -> Result<()> {
351 if !self.handle_verification_events {
352 return Ok(());
353 }
354
355 if let Some(olm) = self.olm_machine().await.as_ref() {
356 olm.receive_verification_event(&event.clone().into_full_event(room_id.to_owned()))
357 .await?;
358 }
359
360 Ok(())
361 }
362
363 #[cfg(feature = "e2e-encryption")]
371 async fn decrypt_sync_room_event(
372 &self,
373 event: &Raw<AnySyncTimelineEvent>,
374 room_id: &RoomId,
375 ) -> Result<Option<TimelineEvent>> {
376 let olm = self.olm_machine().await;
377 let Some(olm) = olm.as_ref() else { return Ok(None) };
378
379 let decryption_settings = DecryptionSettings {
380 sender_device_trust_requirement: self.decryption_trust_requirement,
381 };
382
383 let event = match olm
384 .try_decrypt_room_event(event.cast_ref(), room_id, &decryption_settings)
385 .await?
386 {
387 RoomEventDecryptionResult::Decrypted(decrypted) => {
388 let event: TimelineEvent = decrypted.into();
389
390 if let Ok(AnySyncTimelineEvent::MessageLike(e)) = event.raw().deserialize() {
391 match &e {
392 AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(
393 original_event,
394 )) => {
395 if let MessageType::VerificationRequest(_) =
396 &original_event.content.msgtype
397 {
398 self.handle_verification_event(&e, room_id).await?;
399 }
400 }
401 _ if e.event_type().to_string().starts_with("m.key.verification") => {
402 self.handle_verification_event(&e, room_id).await?;
403 }
404 _ => (),
405 }
406 }
407 event
408 }
409 RoomEventDecryptionResult::UnableToDecrypt(utd_info) => {
410 TimelineEvent::new_utd_event(event.clone(), utd_info)
411 }
412 };
413
414 Ok(Some(event))
415 }
416
417 #[allow(clippy::too_many_arguments)]
418 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
419 pub(crate) async fn handle_timeline(
420 &self,
421 room: &Room,
422 limited: bool,
423 events: Vec<Raw<AnySyncTimelineEvent>>,
424 ignore_state_events: bool,
425 prev_batch: Option<String>,
426 push_rules: &Ruleset,
427 user_ids: &mut BTreeSet<OwnedUserId>,
428 room_info: &mut RoomInfo,
429 changes: &mut StateChanges,
430 notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
431 ambiguity_cache: &mut AmbiguityCache,
432 ) -> Result<Timeline> {
433 let mut timeline = Timeline::new(limited, prev_batch);
434 let mut push_context = self.get_push_room_context(room, room_info, changes).await?;
435
436 for raw_event in events {
437 let mut event = TimelineEvent::new(raw_event);
440
441 match event.raw().deserialize() {
442 Ok(e) => {
443 #[allow(clippy::single_match)]
444 match &e {
445 AnySyncTimelineEvent::State(s) if !ignore_state_events => {
446 match s {
447 AnySyncStateEvent::RoomMember(member) => {
448 Box::pin(ambiguity_cache.handle_event(
449 changes,
450 room.room_id(),
451 member,
452 ))
453 .await?;
454
455 match member.membership() {
456 MembershipState::Join | MembershipState::Invite => {
457 user_ids.insert(member.state_key().to_owned());
458 }
459 _ => {
460 user_ids.remove(member.state_key());
461 }
462 }
463
464 handle_room_member_event_for_profiles(
465 room.room_id(),
466 member,
467 changes,
468 );
469 }
470 _ => {
471 room_info.handle_state_event(s);
472 }
473 }
474
475 let raw_event: Raw<AnySyncStateEvent> = event.raw().clone().cast();
476 changes.add_state_event(room.room_id(), s.clone(), raw_event);
477 }
478
479 AnySyncTimelineEvent::State(_) => { }
480
481 AnySyncTimelineEvent::MessageLike(
482 AnySyncMessageLikeEvent::RoomRedaction(r),
483 ) => {
484 let room_version =
485 room_info.room_version().unwrap_or(&RoomVersionId::V1);
486
487 if let Some(redacts) = r.redacts(room_version) {
488 room_info.handle_redaction(r, event.raw().cast_ref());
489 let raw_event = event.raw().clone().cast();
490
491 changes.add_redaction(room.room_id(), redacts, raw_event);
492 }
493 }
494
495 #[cfg(feature = "e2e-encryption")]
496 AnySyncTimelineEvent::MessageLike(e) => match e {
497 AnySyncMessageLikeEvent::RoomEncrypted(
498 SyncMessageLikeEvent::Original(_),
499 ) => {
500 if let Some(e) = Box::pin(
501 self.decrypt_sync_room_event(event.raw(), room.room_id()),
502 )
503 .await?
504 {
505 event = e;
506 }
507 }
508 AnySyncMessageLikeEvent::RoomMessage(
509 SyncMessageLikeEvent::Original(original_event),
510 ) => match &original_event.content.msgtype {
511 MessageType::VerificationRequest(_) => {
512 Box::pin(self.handle_verification_event(e, room.room_id()))
513 .await?;
514 }
515 _ => (),
516 },
517 _ if e.event_type().to_string().starts_with("m.key.verification") => {
518 Box::pin(self.handle_verification_event(e, room.room_id())).await?;
519 }
520 _ => (),
521 },
522
523 #[cfg(not(feature = "e2e-encryption"))]
524 AnySyncTimelineEvent::MessageLike(_) => (),
525 }
526
527 if let Some(context) = &mut push_context {
528 self.update_push_room_context(
529 context,
530 room.own_user_id(),
531 room_info,
532 changes,
533 )
534 } else {
535 push_context = self.get_push_room_context(room, room_info, changes).await?;
536 }
537
538 if let Some(context) = &push_context {
539 let actions = push_rules.get_actions(event.raw(), context);
540
541 if actions.iter().any(Action::should_notify) {
542 notifications.entry(room.room_id().to_owned()).or_default().push(
543 Notification {
544 actions: actions.to_owned(),
545 event: RawAnySyncOrStrippedTimelineEvent::Sync(
546 event.raw().clone(),
547 ),
548 },
549 );
550 }
551 event.push_actions = Some(actions.to_owned());
552 }
553 }
554 Err(e) => {
555 warn!("Error deserializing event: {e}");
556 }
557 }
558
559 timeline.events.push(event);
560 }
561
562 Ok(timeline)
563 }
564
565 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
577 pub(crate) async fn handle_invited_state(
578 &self,
579 room: &Room,
580 events: &[(Raw<AnyStrippedStateEvent>, AnyStrippedStateEvent)],
581 push_rules: &Ruleset,
582 room_info: &mut RoomInfo,
583 changes: &mut StateChanges,
584 notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
585 ) -> Result<()> {
586 let mut state_events = BTreeMap::new();
587
588 for (raw_event, event) in events {
589 room_info.handle_stripped_state_event(event);
590 state_events
591 .entry(event.event_type())
592 .or_insert_with(BTreeMap::new)
593 .insert(event.state_key().to_owned(), raw_event.clone());
594 }
595
596 changes.stripped_state.insert(room_info.room_id().to_owned(), state_events.clone());
597
598 if let Some(push_context) = self.get_push_room_context(room, room_info, changes).await? {
601 for event in state_events.values().flat_map(|map| map.values()) {
603 let actions = push_rules.get_actions(event, &push_context);
604 if actions.iter().any(Action::should_notify) {
605 notifications.entry(room.room_id().to_owned()).or_default().push(
606 Notification {
607 actions: actions.to_owned(),
608 event: RawAnySyncOrStrippedTimelineEvent::Stripped(event.clone()),
609 },
610 );
611 }
612 }
613 }
614
615 Ok(())
616 }
617
618 #[instrument(skip_all, fields(room_id = ?room_info.room_id))]
624 pub(crate) async fn handle_state(
625 &self,
626 raw_events: &[Raw<AnySyncStateEvent>],
627 events: &[AnySyncStateEvent],
628 room_info: &mut RoomInfo,
629 changes: &mut StateChanges,
630 ambiguity_cache: &mut AmbiguityCache,
631 ) -> StoreResult<BTreeSet<OwnedUserId>> {
632 let mut state_events = BTreeMap::new();
633 let mut user_ids = BTreeSet::new();
634
635 assert_eq!(raw_events.len(), events.len());
636
637 for (raw_event, event) in iter::zip(raw_events, events) {
638 room_info.handle_state_event(event);
639
640 if let AnySyncStateEvent::RoomMember(member) = &event {
641 ambiguity_cache.handle_event(changes, &room_info.room_id, member).await?;
642
643 match member.membership() {
644 MembershipState::Join | MembershipState::Invite => {
645 user_ids.insert(member.state_key().to_owned());
646 }
647 _ => (),
648 }
649
650 handle_room_member_event_for_profiles(&room_info.room_id, member, changes);
651 }
652
653 state_events
654 .entry(event.event_type())
655 .or_insert_with(BTreeMap::new)
656 .insert(event.state_key().to_owned(), raw_event.clone());
657 }
658
659 changes.state.insert((*room_info.room_id).to_owned(), state_events);
660
661 Ok(user_ids)
662 }
663
664 #[instrument(skip_all, fields(?room_id))]
665 pub(crate) async fn handle_room_account_data(
666 &self,
667 room_id: &RoomId,
668 events: &[Raw<AnyRoomAccountDataEvent>],
669 changes: &mut StateChanges,
670 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
671 ) {
672 fn on_room_info<F>(
677 room_id: &RoomId,
678 changes: &mut StateChanges,
679 client: &BaseClient,
680 mut on_room_info: F,
681 ) where
682 F: FnMut(&mut RoomInfo),
683 {
684 if let Some(room_info) = changes.room_infos.get_mut(room_id) {
686 on_room_info(room_info);
688 }
689 else if let Some(room) = client.store.room(room_id) {
691 let mut room_info = room.clone_info();
693
694 on_room_info(&mut room_info);
696
697 changes.add_room(room_info);
699 }
700 }
701
702 fn on_unread_marker(
704 room_id: &RoomId,
705 content: &MarkedUnreadEventContent,
706 room_info: &mut RoomInfo,
707 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
708 ) {
709 if room_info.base_info.is_marked_unread != content.unread {
710 room_info_notable_updates
713 .entry(room_id.to_owned())
714 .or_default()
715 .insert(RoomInfoNotableUpdateReasons::UNREAD_MARKER);
716 }
717
718 room_info.base_info.is_marked_unread = content.unread;
719 }
720
721 for raw_event in events {
723 match raw_event.deserialize() {
724 Ok(event) => {
725 changes.add_room_account_data(room_id, event.clone(), raw_event.clone());
726
727 match event {
728 AnyRoomAccountDataEvent::MarkedUnread(event) => {
729 on_room_info(room_id, changes, self, |room_info| {
730 on_unread_marker(
731 room_id,
732 &event.content,
733 room_info,
734 room_info_notable_updates,
735 );
736 });
737 }
738 AnyRoomAccountDataEvent::UnstableMarkedUnread(event) => {
739 on_room_info(room_id, changes, self, |room_info| {
740 on_unread_marker(
741 room_id,
742 &event.content.0,
743 room_info,
744 room_info_notable_updates,
745 );
746 });
747 }
748 AnyRoomAccountDataEvent::Tag(event) => {
749 on_room_info(room_id, changes, self, |room_info| {
750 room_info.base_info.handle_notable_tags(&event.content.tags);
751 });
752 }
753
754 _ => {}
756 }
757 }
758
759 Err(err) => {
760 warn!("unable to deserialize account data event: {err}");
761 }
762 }
763 }
764 }
765
766 #[cfg(feature = "e2e-encryption")]
767 #[instrument(skip_all)]
768 pub(crate) async fn preprocess_to_device_events(
769 &self,
770 encryption_sync_changes: EncryptionSyncChanges<'_>,
771 changes: &mut StateChanges,
772 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
773 ) -> Result<Vec<Raw<ruma::events::AnyToDeviceEvent>>> {
774 if let Some(o) = self.olm_machine().await.as_ref() {
775 let (events, room_key_updates) =
780 o.receive_sync_changes(encryption_sync_changes).await?;
781
782 for room_key_update in room_key_updates {
783 if let Some(room) = self.get_room(&room_key_update.room_id) {
784 self.decrypt_latest_events(&room, changes, room_info_notable_updates).await;
785 }
786 }
787
788 Ok(events)
789 } else {
790 Ok(encryption_sync_changes.to_device_events)
794 }
795 }
796
797 #[cfg(feature = "e2e-encryption")]
802 async fn decrypt_latest_events(
803 &self,
804 room: &Room,
805 changes: &mut StateChanges,
806 room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
807 ) {
808 if let Some((found, found_index)) = self.decrypt_latest_suitable_event(room).await {
812 room.on_latest_event_decrypted(found, found_index, changes, room_info_notable_updates);
813 }
814 }
815
816 #[cfg(feature = "e2e-encryption")]
823 async fn decrypt_latest_suitable_event(
824 &self,
825 room: &Room,
826 ) -> Option<(Box<LatestEvent>, usize)> {
827 let enc_events = room.latest_encrypted_events();
828 let power_levels = room.power_levels().await.ok();
829 let power_levels_info = Some(room.own_user_id()).zip(power_levels.as_ref());
830
831 for (i, event) in enc_events.iter().enumerate().rev() {
833 let decrypt_sync_room_event =
837 Box::pin(self.decrypt_sync_room_event(event, room.room_id()));
838
839 if let Ok(Some(decrypted)) = decrypt_sync_room_event.await {
840 if let Ok(any_sync_event) = decrypted.raw().deserialize() {
842 match is_suitable_for_latest_event(&any_sync_event, power_levels_info) {
844 PossibleLatestEvent::YesRoomMessage(_)
845 | PossibleLatestEvent::YesPoll(_)
846 | PossibleLatestEvent::YesCallInvite(_)
847 | PossibleLatestEvent::YesCallNotify(_)
848 | PossibleLatestEvent::YesSticker(_)
849 | PossibleLatestEvent::YesKnockedStateEvent(_) => {
850 return Some((Box::new(LatestEvent::new(decrypted)), i));
851 }
852 _ => (),
853 }
854 }
855 }
856 }
857 None
858 }
859
860 pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
864 let room = self.store.get_or_create_room(
865 room_id,
866 RoomState::Knocked,
867 self.room_info_notable_update_sender.clone(),
868 );
869
870 if room.state() != RoomState::Knocked {
871 let _sync_lock = self.sync_lock().lock().await;
872
873 let mut room_info = room.clone_info();
874 room_info.mark_as_knocked();
875 room_info.mark_state_partially_synced();
876 room_info.mark_members_missing(); let mut changes = StateChanges::default();
878 changes.add_room(room_info.clone());
879 self.store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
881 }
882
883 Ok(room)
884 }
885
886 pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
890 let room = self.store.get_or_create_room(
891 room_id,
892 RoomState::Joined,
893 self.room_info_notable_update_sender.clone(),
894 );
895
896 if room.state() != RoomState::Joined {
897 let _sync_lock = self.sync_lock().lock().await;
898
899 let mut room_info = room.clone_info();
900 room_info.mark_as_joined();
901 room_info.mark_state_partially_synced();
902 room_info.mark_members_missing(); let mut changes = StateChanges::default();
904 changes.add_room(room_info.clone());
905 self.store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
907 }
908
909 Ok(room)
910 }
911
912 pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
916 let room = self.store.get_or_create_room(
917 room_id,
918 RoomState::Left,
919 self.room_info_notable_update_sender.clone(),
920 );
921
922 if room.state() != RoomState::Left {
923 let _sync_lock = self.sync_lock().lock().await;
924
925 let mut room_info = room.clone_info();
926 room_info.mark_as_left();
927 room_info.mark_state_partially_synced();
928 room_info.mark_members_missing(); let mut changes = StateChanges::default();
930 changes.add_room(room_info.clone());
931 self.store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
933 }
934
935 Ok(())
936 }
937
938 pub fn sync_lock(&self) -> &Mutex<()> {
940 self.store.sync_lock()
941 }
942
943 #[instrument(skip_all)]
949 pub async fn receive_sync_response(
950 &self,
951 response: api::sync::sync_events::v3::Response,
952 ) -> Result<SyncResponse> {
953 if self.store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
957 info!("Got the same sync response twice");
958 return Ok(SyncResponse::default());
959 }
960
961 let now = Instant::now();
962 let mut changes = Box::new(StateChanges::new(response.next_batch.clone()));
963
964 #[cfg_attr(not(feature = "e2e-encryption"), allow(unused_mut))]
965 let mut room_info_notable_updates =
966 BTreeMap::<OwnedRoomId, RoomInfoNotableUpdateReasons>::new();
967
968 #[cfg(feature = "e2e-encryption")]
969 let to_device = self
970 .preprocess_to_device_events(
971 EncryptionSyncChanges {
972 to_device_events: response.to_device.events,
973 changed_devices: &response.device_lists,
974 one_time_keys_counts: &response.device_one_time_keys_count,
975 unused_fallback_keys: response.device_unused_fallback_key_types.as_deref(),
976 next_batch_token: Some(response.next_batch.clone()),
977 },
978 &mut changes,
979 &mut room_info_notable_updates,
980 )
981 .await?;
982
983 #[cfg(not(feature = "e2e-encryption"))]
984 let to_device = response.to_device.events;
985
986 let mut ambiguity_cache = AmbiguityCache::new(self.store.inner.clone());
987
988 let account_data_processor = AccountDataProcessor::process(&response.account_data.events);
989
990 let push_rules = self.get_push_rules(&account_data_processor).await?;
991
992 let mut new_rooms = RoomUpdates::default();
993 let mut notifications = Default::default();
994
995 let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
996 BTreeMap::new();
997
998 for (room_id, new_info) in response.rooms.join {
999 let room = self.store.get_or_create_room(
1000 &room_id,
1001 RoomState::Joined,
1002 self.room_info_notable_update_sender.clone(),
1003 );
1004
1005 let mut room_info = room.clone_info();
1006
1007 room_info.mark_as_joined();
1008 room_info.update_from_ruma_summary(&new_info.summary);
1009 room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
1010 room_info.mark_state_fully_synced();
1011
1012 let state_events = Self::deserialize_state_events(&new_info.state.events);
1013 let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
1014 state_events.into_iter().unzip();
1015
1016 let mut user_ids = self
1017 .handle_state(
1018 &raw_state_events,
1019 &state_events,
1020 &mut room_info,
1021 &mut changes,
1022 &mut ambiguity_cache,
1023 )
1024 .await?;
1025
1026 updated_members_in_room.insert(room_id.to_owned(), user_ids.clone());
1027
1028 for raw in &new_info.ephemeral.events {
1029 match raw.deserialize() {
1030 Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => {
1031 changes.add_receipts(&room_id, event.content);
1032 }
1033 Ok(_) => {}
1034 Err(e) => {
1035 let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
1036 #[rustfmt::skip]
1037 info!(
1038 ?room_id, event_id,
1039 "Failed to deserialize ephemeral room event: {e}"
1040 );
1041 }
1042 }
1043 }
1044
1045 if new_info.timeline.limited {
1046 room_info.mark_members_missing();
1047 }
1048
1049 let timeline = self
1050 .handle_timeline(
1051 &room,
1052 new_info.timeline.limited,
1053 new_info.timeline.events,
1054 false,
1055 new_info.timeline.prev_batch,
1056 &push_rules,
1057 &mut user_ids,
1058 &mut room_info,
1059 &mut changes,
1060 &mut notifications,
1061 &mut ambiguity_cache,
1062 )
1063 .await?;
1064
1065 changes.add_room(room_info);
1067
1068 self.handle_room_account_data(
1069 &room_id,
1070 &new_info.account_data.events,
1071 &mut changes,
1072 &mut Default::default(),
1073 )
1074 .await;
1075
1076 let mut room_info = changes.room_infos.get(&room_id).unwrap().clone();
1082
1083 #[cfg(feature = "e2e-encryption")]
1084 if room_info.is_encrypted() {
1085 if let Some(o) = self.olm_machine().await.as_ref() {
1086 if !room.is_encrypted() {
1087 let user_ids =
1091 self.store.get_user_ids(&room_id, RoomMemberships::ACTIVE).await?;
1092 o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
1093 }
1094
1095 o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?;
1096 }
1097 }
1098
1099 let notification_count = new_info.unread_notifications.into();
1100 room_info.update_notification_count(notification_count);
1101
1102 let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
1103
1104 new_rooms.join.insert(
1105 room_id,
1106 JoinedRoomUpdate::new(
1107 timeline,
1108 new_info.state.events,
1109 new_info.account_data.events,
1110 new_info.ephemeral.events,
1111 notification_count,
1112 ambiguity_changes,
1113 ),
1114 );
1115
1116 changes.add_room(room_info);
1117 }
1118
1119 for (room_id, new_info) in response.rooms.leave {
1120 let room = self.store.get_or_create_room(
1121 &room_id,
1122 RoomState::Left,
1123 self.room_info_notable_update_sender.clone(),
1124 );
1125
1126 let mut room_info = room.clone_info();
1127 room_info.mark_as_left();
1128 room_info.mark_state_partially_synced();
1129
1130 let state_events = Self::deserialize_state_events(&new_info.state.events);
1131 let (raw_state_events, state_events): (Vec<_>, Vec<_>) =
1132 state_events.into_iter().unzip();
1133
1134 let mut user_ids = self
1135 .handle_state(
1136 &raw_state_events,
1137 &state_events,
1138 &mut room_info,
1139 &mut changes,
1140 &mut ambiguity_cache,
1141 )
1142 .await?;
1143
1144 let timeline = self
1145 .handle_timeline(
1146 &room,
1147 new_info.timeline.limited,
1148 new_info.timeline.events,
1149 false,
1150 new_info.timeline.prev_batch,
1151 &push_rules,
1152 &mut user_ids,
1153 &mut room_info,
1154 &mut changes,
1155 &mut notifications,
1156 &mut ambiguity_cache,
1157 )
1158 .await?;
1159
1160 changes.add_room(room_info);
1162
1163 self.handle_room_account_data(
1164 &room_id,
1165 &new_info.account_data.events,
1166 &mut changes,
1167 &mut Default::default(),
1168 )
1169 .await;
1170
1171 let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
1172
1173 new_rooms.leave.insert(
1174 room_id,
1175 LeftRoomUpdate::new(
1176 timeline,
1177 new_info.state.events,
1178 new_info.account_data.events,
1179 ambiguity_changes,
1180 ),
1181 );
1182 }
1183
1184 for (room_id, new_info) in response.rooms.invite {
1185 let room = self.store.get_or_create_room(
1186 &room_id,
1187 RoomState::Invited,
1188 self.room_info_notable_update_sender.clone(),
1189 );
1190
1191 let invite_state =
1192 Self::deserialize_stripped_state_events(&new_info.invite_state.events);
1193
1194 let mut room_info = room.clone_info();
1195 room_info.mark_as_invited();
1196 room_info.mark_state_fully_synced();
1197
1198 self.handle_invited_state(
1199 &room,
1200 &invite_state,
1201 &push_rules,
1202 &mut room_info,
1203 &mut changes,
1204 &mut notifications,
1205 )
1206 .await?;
1207
1208 changes.add_room(room_info);
1209
1210 new_rooms.invite.insert(room_id, new_info);
1211 }
1212
1213 for (room_id, new_info) in response.rooms.knock {
1214 let room = self.store.get_or_create_room(
1215 &room_id,
1216 RoomState::Knocked,
1217 self.room_info_notable_update_sender.clone(),
1218 );
1219
1220 let knock_state = Self::deserialize_stripped_state_events(&new_info.knock_state.events);
1221
1222 let mut room_info = room.clone_info();
1223 room_info.mark_as_knocked();
1224 room_info.mark_state_fully_synced();
1225
1226 self.handle_invited_state(
1227 &room,
1228 &knock_state,
1229 &push_rules,
1230 &mut room_info,
1231 &mut changes,
1232 &mut notifications,
1233 )
1234 .await?;
1235
1236 changes.add_room(room_info);
1237
1238 new_rooms.knocked.insert(room_id, new_info);
1239 }
1240
1241 account_data_processor.apply(&mut changes, &self.store).await;
1242
1243 changes.presence = response
1244 .presence
1245 .events
1246 .iter()
1247 .filter_map(|e| {
1248 let event = e.deserialize().ok()?;
1249 Some((event.sender, e.clone()))
1250 })
1251 .collect();
1252
1253 changes.ambiguity_maps = ambiguity_cache.cache;
1254
1255 {
1256 let _sync_lock = self.sync_lock().lock().await;
1257 self.store.save_changes(&changes).await?;
1258 *self.store.sync_token.write().await = Some(response.next_batch.clone());
1259 self.apply_changes(&changes, room_info_notable_updates);
1260 }
1261
1262 new_rooms.update_in_memory_caches(&self.store).await;
1268
1269 for (room_id, member_ids) in updated_members_in_room {
1270 if let Some(room) = self.get_room(&room_id) {
1271 let _ =
1272 room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
1273 }
1274 }
1275
1276 info!("Processed a sync response in {:?}", now.elapsed());
1277
1278 let response = SyncResponse {
1279 rooms: new_rooms,
1280 presence: response.presence.events,
1281 account_data: response.account_data.events,
1282 to_device,
1283 notifications,
1284 };
1285
1286 Ok(response)
1287 }
1288
1289 pub(crate) fn apply_changes(
1290 &self,
1291 changes: &StateChanges,
1292 room_info_notable_updates: BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
1293 ) {
1294 if let Some(event) = changes.account_data.get(&GlobalAccountDataEventType::IgnoredUserList)
1295 {
1296 match event.deserialize_as::<IgnoredUserListEvent>() {
1297 Ok(event) => {
1298 let user_ids: Vec<String> =
1299 event.content.ignored_users.keys().map(|id| id.to_string()).collect();
1300
1301 self.ignore_user_list_changes.set(user_ids);
1302 }
1303 Err(error) => {
1304 error!("Failed to deserialize ignored user list event: {error}")
1305 }
1306 }
1307 }
1308
1309 for (room_id, room_info) in &changes.room_infos {
1310 if let Some(room) = self.store.room(room_id) {
1311 let room_info_notable_update_reasons =
1312 room_info_notable_updates.get(room_id).copied().unwrap_or_default();
1313
1314 room.set_room_info(room_info.clone(), room_info_notable_update_reasons)
1315 }
1316 }
1317 }
1318
1319 #[instrument(skip_all, fields(?room_id))]
1331 pub async fn receive_all_members(
1332 &self,
1333 room_id: &RoomId,
1334 request: &api::membership::get_member_events::v3::Request,
1335 response: &api::membership::get_member_events::v3::Response,
1336 ) -> Result<()> {
1337 if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
1338 {
1339 return Err(Error::InvalidReceiveMembersParameters);
1343 }
1344
1345 let Some(room) = self.store.room(room_id) else {
1346 return Ok(());
1348 };
1349
1350 let mut chunk = Vec::with_capacity(response.chunk.len());
1351 let mut changes = StateChanges::default();
1352
1353 #[cfg(feature = "e2e-encryption")]
1354 let mut user_ids = BTreeSet::new();
1355
1356 let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
1357
1358 for raw_event in &response.chunk {
1359 let member = match raw_event.deserialize() {
1360 Ok(ev) => ev,
1361 Err(e) => {
1362 let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
1363 debug!(event_id, "Failed to deserialize member event: {e}");
1364 continue;
1365 }
1366 };
1367
1368 #[cfg(feature = "e2e-encryption")]
1378 match member.membership() {
1379 MembershipState::Join | MembershipState::Invite => {
1380 user_ids.insert(member.state_key().to_owned());
1381 }
1382 _ => (),
1383 }
1384
1385 if let StateEvent::Original(e) = &member {
1386 if let Some(d) = &e.content.displayname {
1387 let display_name = DisplayName::new(d);
1388 ambiguity_map
1389 .entry(display_name)
1390 .or_default()
1391 .insert(member.state_key().clone());
1392 }
1393 }
1394
1395 let sync_member: SyncRoomMemberEvent = member.clone().into();
1396 handle_room_member_event_for_profiles(room_id, &sync_member, &mut changes);
1397
1398 changes
1399 .state
1400 .entry(room_id.to_owned())
1401 .or_default()
1402 .entry(member.event_type())
1403 .or_default()
1404 .insert(member.state_key().to_string(), raw_event.clone().cast());
1405 chunk.push(member);
1406 }
1407
1408 #[cfg(feature = "e2e-encryption")]
1409 if room.is_encrypted() {
1410 if let Some(o) = self.olm_machine().await.as_ref() {
1411 o.update_tracked_users(user_ids.iter().map(Deref::deref)).await?
1412 }
1413 }
1414
1415 changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
1416
1417 let _sync_lock = self.sync_lock().lock().await;
1418 let mut room_info = room.clone_info();
1419 room_info.mark_members_synced();
1420 changes.add_room(room_info);
1421
1422 self.store.save_changes(&changes).await?;
1423 self.apply_changes(&changes, Default::default());
1424
1425 let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
1426
1427 Ok(())
1428 }
1429
1430 pub async fn receive_filter_upload(
1446 &self,
1447 filter_name: &str,
1448 response: &api::filter::create_filter::v3::Response,
1449 ) -> Result<()> {
1450 Ok(self
1451 .store
1452 .set_kv_data(
1453 StateStoreDataKey::Filter(filter_name),
1454 StateStoreDataValue::Filter(response.filter_id.clone()),
1455 )
1456 .await?)
1457 }
1458
1459 pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
1471 let filter = self
1472 .store
1473 .get_kv_data(StateStoreDataKey::Filter(filter_name))
1474 .await?
1475 .map(|d| d.into_filter().expect("State store data not a filter"));
1476
1477 Ok(filter)
1478 }
1479
1480 #[cfg(feature = "e2e-encryption")]
1482 pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
1483 match self.olm_machine().await.as_ref() {
1484 Some(o) => {
1485 let Some(room) = self.get_room(room_id) else {
1486 return Err(Error::InsufficientData);
1487 };
1488
1489 let history_visibility = room.history_visibility_or_default();
1490 let Some(room_encryption_event) = room.encryption_settings() else {
1491 return Err(Error::EncryptionNotEnabled);
1492 };
1493
1494 let filter = if history_visibility == HistoryVisibility::Joined {
1497 RoomMemberships::JOIN
1498 } else {
1499 RoomMemberships::ACTIVE
1500 };
1501
1502 let members = self.store.get_user_ids(room_id, filter).await?;
1503
1504 let settings = EncryptionSettings::new(
1505 room_encryption_event,
1506 history_visibility,
1507 self.room_key_recipient_strategy.clone(),
1508 );
1509
1510 Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1511 }
1512 None => panic!("Olm machine wasn't started"),
1513 }
1514 }
1515
1516 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1522 self.store.room(room_id)
1523 }
1524
1525 pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1533 self.store.forget_room(room_id).await?;
1535
1536 self.event_cache_store().lock().await?.remove_room(room_id).await?;
1538
1539 Ok(())
1540 }
1541
1542 #[cfg(feature = "e2e-encryption")]
1544 pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1545 self.olm_machine.read().await
1546 }
1547
1548 pub(crate) async fn get_push_rules(
1554 &self,
1555 account_data_processor: &AccountDataProcessor,
1556 ) -> Result<Ruleset> {
1557 if let Some(event) = account_data_processor
1558 .push_rules()
1559 .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
1560 {
1561 Ok(event.content.global)
1562 } else if let Some(event) = self
1563 .store
1564 .get_account_data_event_static::<PushRulesEventContent>()
1565 .await?
1566 .and_then(|ev| ev.deserialize().ok())
1567 {
1568 Ok(event.content.global)
1569 } else if let Some(session_meta) = self.store.session_meta() {
1570 Ok(Ruleset::server_default(&session_meta.user_id))
1571 } else {
1572 Ok(Ruleset::new())
1573 }
1574 }
1575
1576 pub async fn get_push_room_context(
1584 &self,
1585 room: &Room,
1586 room_info: &RoomInfo,
1587 changes: &StateChanges,
1588 ) -> Result<Option<PushConditionRoomCtx>> {
1589 let room_id = room.room_id();
1590 let user_id = room.own_user_id();
1591
1592 let member_count = room_info.active_members_count();
1593
1594 let user_display_name = if let Some(AnySyncStateEvent::RoomMember(member)) =
1596 changes.state.get(room_id).and_then(|events| {
1597 events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1598 }) {
1599 member
1600 .as_original()
1601 .and_then(|ev| ev.content.displayname.clone())
1602 .unwrap_or_else(|| user_id.localpart().to_owned())
1603 } else if let Some(AnyStrippedStateEvent::RoomMember(member)) =
1604 changes.stripped_state.get(room_id).and_then(|events| {
1605 events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1606 })
1607 {
1608 member.content.displayname.unwrap_or_else(|| user_id.localpart().to_owned())
1609 } else if let Some(member) = Box::pin(room.get_member(user_id)).await? {
1610 member.name().to_owned()
1611 } else {
1612 trace!("Couldn't get push context because of missing own member information");
1613 return Ok(None);
1614 };
1615
1616 let power_levels = if let Some(event) = changes.state.get(room_id).and_then(|types| {
1617 types
1618 .get(&StateEventType::RoomPowerLevels)?
1619 .get("")?
1620 .deserialize_as::<RoomPowerLevelsEvent>()
1621 .ok()
1622 }) {
1623 Some(event.power_levels().into())
1624 } else if let Some(event) = changes.stripped_state.get(room_id).and_then(|types| {
1625 types
1626 .get(&StateEventType::RoomPowerLevels)?
1627 .get("")?
1628 .deserialize_as::<StrippedRoomPowerLevelsEvent>()
1629 .ok()
1630 }) {
1631 Some(event.power_levels().into())
1632 } else {
1633 self.store
1634 .get_state_event_static::<RoomPowerLevelsEventContent>(room_id)
1635 .await?
1636 .and_then(|e| e.deserialize().ok())
1637 .map(|event| event.power_levels().into())
1638 };
1639
1640 Ok(Some(PushConditionRoomCtx {
1641 user_id: user_id.to_owned(),
1642 room_id: room_id.to_owned(),
1643 member_count: UInt::new(member_count).unwrap_or(UInt::MAX),
1644 user_display_name,
1645 power_levels,
1646 }))
1647 }
1648
1649 pub fn update_push_room_context(
1653 &self,
1654 push_rules: &mut PushConditionRoomCtx,
1655 user_id: &UserId,
1656 room_info: &RoomInfo,
1657 changes: &StateChanges,
1658 ) {
1659 let room_id = &*room_info.room_id;
1660
1661 push_rules.member_count = UInt::new(room_info.active_members_count()).unwrap_or(UInt::MAX);
1662
1663 if let Some(AnySyncStateEvent::RoomMember(member)) =
1665 changes.state.get(room_id).and_then(|events| {
1666 events.get(&StateEventType::RoomMember)?.get(user_id.as_str())?.deserialize().ok()
1667 })
1668 {
1669 push_rules.user_display_name = member
1670 .as_original()
1671 .and_then(|ev| ev.content.displayname.clone())
1672 .unwrap_or_else(|| user_id.localpart().to_owned())
1673 }
1674
1675 if let Some(AnySyncStateEvent::RoomPowerLevels(event)) =
1676 changes.state.get(room_id).and_then(|types| {
1677 types.get(&StateEventType::RoomPowerLevels)?.get("")?.deserialize().ok()
1678 })
1679 {
1680 push_rules.power_levels = Some(event.power_levels().into());
1681 }
1682 }
1683
1684 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1687 self.ignore_user_list_changes.subscribe()
1688 }
1689
1690 pub(crate) fn deserialize_state_events(
1691 raw_events: &[Raw<AnySyncStateEvent>],
1692 ) -> Vec<(Raw<AnySyncStateEvent>, AnySyncStateEvent)> {
1693 raw_events
1694 .iter()
1695 .filter_map(|raw_event| match raw_event.deserialize() {
1696 Ok(event) => Some((raw_event.clone(), event)),
1697 Err(e) => {
1698 warn!("Couldn't deserialize state event: {e}");
1699 None
1700 }
1701 })
1702 .collect()
1703 }
1704
1705 pub(crate) fn deserialize_stripped_state_events(
1706 raw_events: &[Raw<AnyStrippedStateEvent>],
1707 ) -> Vec<(Raw<AnyStrippedStateEvent>, AnyStrippedStateEvent)> {
1708 raw_events
1709 .iter()
1710 .filter_map(|raw_event| match raw_event.deserialize() {
1711 Ok(event) => Some((raw_event.clone(), event)),
1712 Err(e) => {
1713 warn!("Couldn't deserialize stripped state event: {e}");
1714 None
1715 }
1716 })
1717 .collect()
1718 }
1719
1720 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1724 self.room_info_notable_update_sender.subscribe()
1725 }
1726}
1727
1728fn handle_room_member_event_for_profiles(
1729 room_id: &RoomId,
1730 event: &SyncStateEvent<RoomMemberEventContent>,
1731 changes: &mut StateChanges,
1732) {
1733 if event.state_key() == event.sender() {
1737 changes
1738 .profiles
1739 .entry(room_id.to_owned())
1740 .or_default()
1741 .insert(event.sender().to_owned(), event.into());
1742 }
1743
1744 if *event.membership() == MembershipState::Invite {
1745 changes
1752 .profiles_to_delete
1753 .entry(room_id.to_owned())
1754 .or_default()
1755 .push(event.state_key().clone());
1756 }
1757}
1758
1759#[cfg(test)]
1760mod tests {
1761 use matrix_sdk_test::{
1762 async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1763 LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder,
1764 };
1765 use ruma::{
1766 api::client as api, event_id, events::room::member::MembershipState, room_id, serde::Raw,
1767 user_id,
1768 };
1769 use serde_json::{json, value::to_raw_value};
1770
1771 use super::BaseClient;
1772 use crate::{
1773 store::{StateStoreExt, StoreConfig},
1774 test_utils::logged_in_base_client,
1775 RoomDisplayName, RoomState, SessionMeta,
1776 };
1777
1778 #[async_test]
1779 async fn test_invite_after_leaving() {
1780 let user_id = user_id!("@alice:example.org");
1781 let room_id = room_id!("!test:example.org");
1782
1783 let client = logged_in_base_client(Some(user_id)).await;
1784
1785 let mut sync_builder = SyncResponseBuilder::new();
1786
1787 let response = sync_builder
1788 .add_left_room(
1789 LeftRoomBuilder::new(room_id).add_timeline_event(
1790 EventFactory::new()
1791 .member(user_id)
1792 .membership(MembershipState::Leave)
1793 .display_name("Alice")
1794 .event_id(event_id!("$994173582443PhrSn:example.org")),
1795 ),
1796 )
1797 .build_sync_response();
1798 client.receive_sync_response(response).await.unwrap();
1799 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1800
1801 let response = sync_builder
1802 .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1803 StrippedStateTestEvent::Custom(json!({
1804 "content": {
1805 "displayname": "Alice",
1806 "membership": "invite",
1807 },
1808 "event_id": "$143273582443PhrSn:example.org",
1809 "origin_server_ts": 1432735824653u64,
1810 "sender": "@example:example.org",
1811 "state_key": user_id,
1812 "type": "m.room.member",
1813 })),
1814 ))
1815 .build_sync_response();
1816 client.receive_sync_response(response).await.unwrap();
1817 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1818 }
1819
1820 #[async_test]
1821 async fn test_invite_displayname() {
1822 let user_id = user_id!("@alice:example.org");
1823 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1824
1825 let client = logged_in_base_client(Some(user_id)).await;
1826
1827 let response = ruma_response_from_json(&json!({
1828 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1829 "device_one_time_keys_count": {
1830 "signed_curve25519": 50u64
1831 },
1832 "device_unused_fallback_key_types": [
1833 "signed_curve25519"
1834 ],
1835 "rooms": {
1836 "invite": {
1837 "!ithpyNKDtmhneaTQja:example.org": {
1838 "invite_state": {
1839 "events": [
1840 {
1841 "content": {
1842 "creator": "@test:example.org",
1843 "room_version": "9"
1844 },
1845 "sender": "@test:example.org",
1846 "state_key": "",
1847 "type": "m.room.create"
1848 },
1849 {
1850 "content": {
1851 "join_rule": "invite"
1852 },
1853 "sender": "@test:example.org",
1854 "state_key": "",
1855 "type": "m.room.join_rules"
1856 },
1857 {
1858 "content": {
1859 "algorithm": "m.megolm.v1.aes-sha2"
1860 },
1861 "sender": "@test:example.org",
1862 "state_key": "",
1863 "type": "m.room.encryption"
1864 },
1865 {
1866 "content": {
1867 "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1868 "displayname": "Kyra",
1869 "membership": "join"
1870 },
1871 "sender": "@test:example.org",
1872 "state_key": "@test:example.org",
1873 "type": "m.room.member"
1874 },
1875 {
1876 "content": {
1877 "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1878 "displayname": "alice",
1879 "is_direct": true,
1880 "membership": "invite"
1881 },
1882 "origin_server_ts": 1650878657984u64,
1883 "sender": "@test:example.org",
1884 "state_key": "@alice:example.org",
1885 "type": "m.room.member",
1886 "unsigned": {
1887 "age": 14u64
1888 },
1889 "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1890 }
1891 ]
1892 }
1893 }
1894 }
1895 }
1896 }));
1897
1898 client.receive_sync_response(response).await.unwrap();
1899
1900 let room = client.get_room(room_id).expect("Room not found");
1901 assert_eq!(room.state(), RoomState::Invited);
1902 assert_eq!(
1903 room.compute_display_name().await.expect("fetching display name failed"),
1904 RoomDisplayName::Calculated("Kyra".to_owned())
1905 );
1906 }
1907
1908 #[cfg(feature = "e2e-encryption")]
1909 #[async_test]
1910 async fn test_when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() {
1911 use std::collections::BTreeMap;
1912
1913 use matrix_sdk_test::event_factory::EventFactory;
1914 use ruma::{event_id, events::room::member::MembershipState};
1915
1916 use crate::{rooms::normal::RoomInfoNotableUpdateReasons, StateChanges};
1917
1918 let user_id = user_id!("@u:u.to");
1920 let room_id = room_id!("!r:u.to");
1921 let client = logged_in_base_client(Some(user_id)).await;
1922
1923 let mut sync_builder = SyncResponseBuilder::new();
1924
1925 let response = sync_builder
1926 .add_joined_room(
1927 matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_timeline_event(
1928 EventFactory::new()
1929 .member(user_id)
1930 .display_name("Alice")
1931 .membership(MembershipState::Join)
1932 .event_id(event_id!("$1")),
1933 ),
1934 )
1935 .build_sync_response();
1936 client.receive_sync_response(response).await.unwrap();
1937
1938 let room = client.get_room(room_id).expect("Just-created room not found!");
1939
1940 assert!(room.latest_encrypted_events().is_empty());
1942 assert!(room.latest_event().is_none());
1943
1944 let mut changes = StateChanges::default();
1946 let mut room_info_notable_updates = BTreeMap::new();
1947 client.decrypt_latest_events(&room, &mut changes, &mut room_info_notable_updates).await;
1948
1949 assert!(room.latest_encrypted_events().is_empty());
1951 assert!(room.latest_event().is_none());
1952 assert!(changes.room_infos.is_empty());
1953 assert!(!room_info_notable_updates
1954 .get(room_id)
1955 .copied()
1956 .unwrap_or_default()
1957 .contains(RoomInfoNotableUpdateReasons::LATEST_EVENT));
1958 }
1959
1960 #[async_test]
1961 async fn test_deserialization_failure() {
1962 let user_id = user_id!("@alice:example.org");
1963 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1964
1965 let client = BaseClient::with_store_config(StoreConfig::new(
1966 "cross-process-store-locks-holder-name".to_owned(),
1967 ));
1968 client
1969 .set_session_meta(
1970 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1971 #[cfg(feature = "e2e-encryption")]
1972 None,
1973 )
1974 .await
1975 .unwrap();
1976
1977 let response = ruma_response_from_json(&json!({
1978 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1979 "rooms": {
1980 "join": {
1981 "!ithpyNKDtmhneaTQja:example.org": {
1982 "state": {
1983 "events": [
1984 {
1985 "invalid": "invalid",
1986 },
1987 {
1988 "content": {
1989 "name": "The room name"
1990 },
1991 "event_id": "$143273582443PhrSn:example.org",
1992 "origin_server_ts": 1432735824653u64,
1993 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1994 "sender": "@example:example.org",
1995 "state_key": "",
1996 "type": "m.room.name",
1997 "unsigned": {
1998 "age": 1234
1999 }
2000 },
2001 ]
2002 }
2003 }
2004 }
2005 }
2006 }));
2007
2008 client.receive_sync_response(response).await.unwrap();
2009 client
2010 .store()
2011 .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
2012 .await
2013 .expect("Failed to fetch state event")
2014 .expect("State event not found")
2015 .deserialize()
2016 .expect("Failed to deserialize state event");
2017 }
2018
2019 #[async_test]
2020 async fn test_invited_members_arent_ignored() {
2021 let user_id = user_id!("@alice:example.org");
2022 let inviter_user_id = user_id!("@bob:example.org");
2023 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2024
2025 let client = BaseClient::with_store_config(StoreConfig::new(
2026 "cross-process-store-locks-holder-name".to_owned(),
2027 ));
2028 client
2029 .set_session_meta(
2030 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2031 #[cfg(feature = "e2e-encryption")]
2032 None,
2033 )
2034 .await
2035 .unwrap();
2036
2037 let mut sync_builder = SyncResponseBuilder::new();
2039 let response = sync_builder
2040 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
2041 .build_sync_response();
2042 client.receive_sync_response(response).await.unwrap();
2043
2044 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
2047
2048 let raw_member_event = json!({
2049 "content": {
2050 "avatar_url": "mxc://localhost/fewjilfewjil42",
2051 "displayname": "Invited Alice",
2052 "membership": "invite"
2053 },
2054 "event_id": "$151800140517rfvjc:localhost",
2055 "origin_server_ts": 151800140,
2056 "room_id": room_id,
2057 "sender": inviter_user_id,
2058 "state_key": user_id,
2059 "type": "m.room.member",
2060 "unsigned": {
2061 "age": 13374242,
2062 }
2063 });
2064 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
2065 to_raw_value(&raw_member_event).unwrap(),
2066 )]);
2067
2068 client.receive_all_members(room_id, &request, &response).await.unwrap();
2070
2071 let room = client.get_room(room_id).unwrap();
2072
2073 let member = room.get_member(user_id).await.expect("ok").expect("exists");
2075
2076 assert_eq!(member.user_id(), user_id);
2077 assert_eq!(member.display_name().unwrap(), "Invited Alice");
2078 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
2079 }
2080
2081 #[async_test]
2082 async fn test_reinvited_members_get_a_display_name() {
2083 let user_id = user_id!("@alice:example.org");
2084 let inviter_user_id = user_id!("@bob:example.org");
2085 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
2086
2087 let client = BaseClient::with_store_config(StoreConfig::new(
2088 "cross-process-store-locks-holder-name".to_owned(),
2089 ));
2090 client
2091 .set_session_meta(
2092 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
2093 #[cfg(feature = "e2e-encryption")]
2094 None,
2095 )
2096 .await
2097 .unwrap();
2098
2099 let mut sync_builder = SyncResponseBuilder::new();
2101 let response = sync_builder
2102 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
2103 StateTestEvent::Custom(json!({
2104 "content": {
2105 "avatar_url": null,
2106 "displayname": null,
2107 "membership": "leave"
2108 },
2109 "event_id": "$151803140217rkvjc:localhost",
2110 "origin_server_ts": 151800139,
2111 "room_id": room_id,
2112 "sender": user_id,
2113 "state_key": user_id,
2114 "type": "m.room.member",
2115 })),
2116 ))
2117 .build_sync_response();
2118 client.receive_sync_response(response).await.unwrap();
2119
2120 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
2122
2123 let raw_member_event = json!({
2124 "content": {
2125 "avatar_url": "mxc://localhost/fewjilfewjil42",
2126 "displayname": "Invited Alice",
2127 "membership": "invite"
2128 },
2129 "event_id": "$151800140517rfvjc:localhost",
2130 "origin_server_ts": 151800140,
2131 "room_id": room_id,
2132 "sender": inviter_user_id,
2133 "state_key": user_id,
2134 "type": "m.room.member",
2135 "unsigned": {
2136 "age": 13374242,
2137 }
2138 });
2139 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
2140 to_raw_value(&raw_member_event).unwrap(),
2141 )]);
2142
2143 client.receive_all_members(room_id, &request, &response).await.unwrap();
2145
2146 let room = client.get_room(room_id).unwrap();
2147
2148 let member = room.get_member(user_id).await.expect("ok").expect("exists");
2150
2151 assert_eq!(member.user_id(), user_id);
2152 assert_eq!(member.display_name().unwrap(), "Invited Alice");
2153 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
2154 }
2155}