1#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19 collections::{BTreeMap, BTreeSet, HashMap},
20 fmt,
21 ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29 store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, EncryptionSettings,
30 OlmError, OlmMachine, TrustRequirement,
31};
32#[cfg(feature = "e2e-encryption")]
33use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
34#[cfg(doc)]
35use ruma::DeviceId;
36use ruma::{
37 api::client::{self as api, sync::sync_events::v5},
38 events::{
39 push_rules::{PushRulesEvent, PushRulesEventContent},
40 room::member::SyncRoomMemberEvent,
41 StateEvent, StateEventType,
42 },
43 push::Ruleset,
44 time::Instant,
45 OwnedRoomId, OwnedUserId, RoomId,
46};
47use tokio::sync::{broadcast, Mutex};
48#[cfg(feature = "e2e-encryption")]
49use tokio::sync::{RwLock, RwLockReadGuard};
50use tracing::{debug, info, instrument};
51
52#[cfg(feature = "e2e-encryption")]
53use crate::RoomMemberships;
54use crate::{
55 deserialized_responses::DisplayName,
56 error::{Error, Result},
57 event_cache::store::EventCacheStoreLock,
58 response_processors::{self as processors, Context},
59 rooms::{
60 normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate},
61 Room, RoomState,
62 },
63 store::{
64 ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore,
65 Result as StoreResult, RoomLoadSettings, StateChanges, StateStoreDataKey,
66 StateStoreDataValue, StateStoreExt, StoreConfig,
67 },
68 sync::{JoinedRoomUpdate, LeftRoomUpdate, RoomUpdates, SyncResponse},
69 RoomStateFilter, SessionMeta,
70};
71
72#[derive(Clone)]
86pub struct BaseClient {
87 pub(crate) state_store: BaseStateStore,
89
90 event_cache_store: EventCacheStoreLock,
92
93 #[cfg(feature = "e2e-encryption")]
98 crypto_store: Arc<DynCryptoStore>,
99
100 #[cfg(feature = "e2e-encryption")]
104 olm_machine: Arc<RwLock<Option<OlmMachine>>>,
105
106 pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
108
109 pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
112
113 #[cfg(feature = "e2e-encryption")]
116 pub room_key_recipient_strategy: CollectStrategy,
117
118 #[cfg(feature = "e2e-encryption")]
120 pub decryption_trust_requirement: TrustRequirement,
121
122 #[cfg(feature = "e2e-encryption")]
124 pub handle_verification_events: bool,
125}
126
127#[cfg(not(tarpaulin_include))]
128impl fmt::Debug for BaseClient {
129 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
130 f.debug_struct("BaseClient")
131 .field("session_meta", &self.state_store.session_meta())
132 .field("sync_token", &self.state_store.sync_token)
133 .finish_non_exhaustive()
134 }
135}
136
137impl BaseClient {
138 pub fn new(config: StoreConfig) -> Self {
145 let store = BaseStateStore::new(config.state_store);
146
147 let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
157 broadcast::channel(500);
158
159 BaseClient {
160 state_store: store,
161 event_cache_store: config.event_cache_store,
162 #[cfg(feature = "e2e-encryption")]
163 crypto_store: config.crypto_store,
164 #[cfg(feature = "e2e-encryption")]
165 olm_machine: Default::default(),
166 ignore_user_list_changes: Default::default(),
167 room_info_notable_update_sender,
168 #[cfg(feature = "e2e-encryption")]
169 room_key_recipient_strategy: Default::default(),
170 #[cfg(feature = "e2e-encryption")]
171 decryption_trust_requirement: TrustRequirement::Untrusted,
172 #[cfg(feature = "e2e-encryption")]
173 handle_verification_events: true,
174 }
175 }
176
177 #[cfg(feature = "e2e-encryption")]
180 pub async fn clone_with_in_memory_state_store(
181 &self,
182 cross_process_store_locks_holder_name: &str,
183 handle_verification_events: bool,
184 ) -> Result<Self> {
185 let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
186 .state_store(MemoryStore::new());
187 let config = config.crypto_store(self.crypto_store.clone());
188
189 let copy = Self {
190 state_store: BaseStateStore::new(config.state_store),
191 event_cache_store: config.event_cache_store,
192 crypto_store: self.crypto_store.clone(),
199 olm_machine: self.olm_machine.clone(),
200 ignore_user_list_changes: Default::default(),
201 room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
202 room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
203 decryption_trust_requirement: self.decryption_trust_requirement,
204 handle_verification_events,
205 };
206
207 copy.state_store
208 .derive_from_other(&self.state_store, ©.room_info_notable_update_sender)
209 .await?;
210
211 Ok(copy)
212 }
213
214 #[cfg(not(feature = "e2e-encryption"))]
217 #[allow(clippy::unused_async)]
218 pub async fn clone_with_in_memory_state_store(
219 &self,
220 cross_process_store_locks_holder: &str,
221 _handle_verification_events: bool,
222 ) -> Result<Self> {
223 let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
224 .state_store(MemoryStore::new());
225 Ok(Self::new(config))
226 }
227
228 pub fn session_meta(&self) -> Option<&SessionMeta> {
234 self.state_store.session_meta()
235 }
236
237 pub fn rooms(&self) -> Vec<Room> {
239 self.state_store.rooms()
240 }
241
242 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
244 self.state_store.rooms_filtered(filter)
245 }
246
247 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
250 self.state_store.rooms_stream()
251 }
252
253 pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
256 self.state_store.get_or_create_room(
257 room_id,
258 room_state,
259 self.room_info_notable_update_sender.clone(),
260 )
261 }
262
263 pub fn state_store(&self) -> &DynStateStore {
265 self.state_store.deref()
266 }
267
268 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
270 &self.event_cache_store
271 }
272
273 pub fn is_active(&self) -> bool {
277 self.state_store.session_meta().is_some()
278 }
279
280 pub async fn activate(
312 &self,
313 session_meta: SessionMeta,
314 room_load_settings: RoomLoadSettings,
315 #[cfg(feature = "e2e-encryption")] custom_account: Option<
316 crate::crypto::vodozemac::olm::Account,
317 >,
318 ) -> Result<()> {
319 debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
320
321 self.state_store
322 .load_rooms(
323 &session_meta.user_id,
324 room_load_settings,
325 &self.room_info_notable_update_sender,
326 )
327 .await?;
328 self.state_store.load_sync_token().await?;
329 self.state_store.set_session_meta(session_meta);
330
331 #[cfg(feature = "e2e-encryption")]
332 self.regenerate_olm(custom_account).await?;
333
334 Ok(())
335 }
336
337 #[cfg(feature = "e2e-encryption")]
341 pub async fn regenerate_olm(
342 &self,
343 custom_account: Option<crate::crypto::vodozemac::olm::Account>,
344 ) -> Result<()> {
345 tracing::debug!("regenerating OlmMachine");
346 let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
347
348 let olm_machine = OlmMachine::with_store(
351 &session_meta.user_id,
352 &session_meta.device_id,
353 self.crypto_store.clone(),
354 custom_account,
355 )
356 .await
357 .map_err(OlmError::from)?;
358
359 *self.olm_machine.write().await = Some(olm_machine);
360 Ok(())
361 }
362
363 pub async fn sync_token(&self) -> Option<String> {
366 self.state_store.sync_token.read().await.clone()
367 }
368
369 pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
373 let room = self.state_store.get_or_create_room(
374 room_id,
375 RoomState::Knocked,
376 self.room_info_notable_update_sender.clone(),
377 );
378
379 if room.state() != RoomState::Knocked {
380 let _sync_lock = self.sync_lock().lock().await;
381
382 let mut room_info = room.clone_info();
383 room_info.mark_as_knocked();
384 room_info.mark_state_partially_synced();
385 room_info.mark_members_missing(); let mut changes = StateChanges::default();
387 changes.add_room(room_info.clone());
388 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
390 }
391
392 Ok(room)
393 }
394
395 pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
399 let room = self.state_store.get_or_create_room(
400 room_id,
401 RoomState::Joined,
402 self.room_info_notable_update_sender.clone(),
403 );
404
405 if room.state() != RoomState::Joined {
406 let _sync_lock = self.sync_lock().lock().await;
407
408 let mut room_info = room.clone_info();
409 room_info.mark_as_joined();
410 room_info.mark_state_partially_synced();
411 room_info.mark_members_missing(); let mut changes = StateChanges::default();
413 changes.add_room(room_info.clone());
414 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
416 }
417
418 Ok(room)
419 }
420
421 pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
425 let room = self.state_store.get_or_create_room(
426 room_id,
427 RoomState::Left,
428 self.room_info_notable_update_sender.clone(),
429 );
430
431 if room.state() != RoomState::Left {
432 let _sync_lock = self.sync_lock().lock().await;
433
434 let mut room_info = room.clone_info();
435 room_info.mark_as_left();
436 room_info.mark_state_partially_synced();
437 room_info.mark_members_missing(); let mut changes = StateChanges::default();
439 changes.add_room(room_info.clone());
440 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
442 }
443
444 Ok(())
445 }
446
447 pub fn sync_lock(&self) -> &Mutex<()> {
449 self.state_store.sync_lock()
450 }
451
452 #[instrument(skip_all)]
458 pub async fn receive_sync_response(
459 &self,
460 response: api::sync::sync_events::v3::Response,
461 ) -> Result<SyncResponse> {
462 self.receive_sync_response_with_requested_required_states(
463 response,
464 &RequestedRequiredStates::default(),
465 )
466 .await
467 }
468
469 pub async fn receive_sync_response_with_requested_required_states(
477 &self,
478 response: api::sync::sync_events::v3::Response,
479 requested_required_states: &RequestedRequiredStates,
480 ) -> Result<SyncResponse> {
481 if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
485 info!("Got the same sync response twice");
486 return Ok(SyncResponse::default());
487 }
488
489 let now = Instant::now();
490
491 #[cfg(feature = "e2e-encryption")]
492 let olm_machine = self.olm_machine().await;
493
494 let mut context =
495 Context::new(StateChanges::new(response.next_batch.clone()), Default::default());
496
497 #[cfg(feature = "e2e-encryption")]
498 let to_device = {
499 let processors::e2ee::to_device::Output {
500 decrypted_to_device_events: to_device,
501 room_key_updates,
502 } = processors::e2ee::to_device::from_sync_v2(
503 &mut context,
504 &response,
505 olm_machine.as_ref(),
506 )
507 .await?;
508
509 processors::latest_event::decrypt_from_rooms(
510 &mut context,
511 room_key_updates
512 .into_iter()
513 .flatten()
514 .filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
515 .collect(),
516 olm_machine.as_ref(),
517 self.decryption_trust_requirement,
518 self.handle_verification_events,
519 )
520 .await?;
521
522 to_device
523 };
524
525 #[cfg(not(feature = "e2e-encryption"))]
526 let to_device = response.to_device.events;
527
528 let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
529
530 let global_account_data_processor =
531 processors::account_data::global(&response.account_data.events);
532
533 let push_rules = self.get_push_rules(&global_account_data_processor).await?;
534
535 let mut new_rooms = RoomUpdates::default();
536 let mut notifications = Default::default();
537
538 let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
539 BTreeMap::new();
540
541 for (room_id, new_info) in response.rooms.join {
542 let room = self.state_store.get_or_create_room(
543 &room_id,
544 RoomState::Joined,
545 self.room_info_notable_update_sender.clone(),
546 );
547
548 let mut room_info = room.clone_info();
549
550 room_info.mark_as_joined();
551 room_info.update_from_ruma_summary(&new_info.summary);
552 room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
553 room_info.mark_state_fully_synced();
554 room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
555
556 let (raw_state_events, state_events) =
557 processors::state_events::sync::collect(&mut context, &new_info.state.events);
558
559 let mut new_user_ids = processors::state_events::sync::dispatch_and_get_new_users(
560 &mut context,
561 (&raw_state_events, &state_events),
562 &mut room_info,
563 &mut ambiguity_cache,
564 )
565 .await?;
566
567 processors::ephemeral_events::dispatch(
568 &mut context,
569 &new_info.ephemeral.events,
570 &room_id,
571 );
572
573 if new_info.timeline.limited {
574 room_info.mark_members_missing();
575 }
576
577 let (raw_state_events_from_timeline, state_events_from_timeline) =
578 processors::state_events::sync::collect_from_timeline(
579 &mut context,
580 &new_info.timeline.events,
581 );
582
583 let mut other_new_user_ids =
584 processors::state_events::sync::dispatch_and_get_new_users(
585 &mut context,
586 (&raw_state_events_from_timeline, &state_events_from_timeline),
587 &mut room_info,
588 &mut ambiguity_cache,
589 )
590 .await?;
591 new_user_ids.append(&mut other_new_user_ids);
592 updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone());
593
594 let timeline = processors::timeline::build(
595 &mut context,
596 &room,
597 &mut room_info,
598 processors::timeline::builder::Timeline::from(new_info.timeline),
599 processors::timeline::builder::Notification::new(
600 &push_rules,
601 &mut notifications,
602 &self.state_store,
603 ),
604 #[cfg(feature = "e2e-encryption")]
605 processors::timeline::builder::E2EE::new(
606 olm_machine.as_ref(),
607 self.decryption_trust_requirement,
608 self.handle_verification_events,
609 ),
610 )
611 .await?;
612
613 context.state_changes.add_room(room_info);
615
616 processors::account_data::for_room(
617 &mut context,
618 &room_id,
619 &new_info.account_data.events,
620 &self.state_store,
621 )
622 .await;
623
624 let mut room_info = context.state_changes.room_infos.get(&room_id).unwrap().clone();
630
631 #[cfg(feature = "e2e-encryption")]
632 processors::e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
633 &mut context,
634 olm_machine.as_ref(),
635 &new_user_ids,
636 room_info.encryption_state(),
637 room.encryption_state(),
638 &room_id,
639 &self.state_store,
640 )
641 .await?;
642
643 let notification_count = new_info.unread_notifications.into();
644 room_info.update_notification_count(notification_count);
645
646 let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
647
648 new_rooms.join.insert(
649 room_id,
650 JoinedRoomUpdate::new(
651 timeline,
652 new_info.state.events,
653 new_info.account_data.events,
654 new_info.ephemeral.events,
655 notification_count,
656 ambiguity_changes,
657 ),
658 );
659
660 context.state_changes.add_room(room_info);
661 }
662
663 for (room_id, new_info) in response.rooms.leave {
664 let room = self.state_store.get_or_create_room(
665 &room_id,
666 RoomState::Left,
667 self.room_info_notable_update_sender.clone(),
668 );
669
670 let mut room_info = room.clone_info();
671 room_info.mark_as_left();
672 room_info.mark_state_partially_synced();
673 room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
674
675 let (raw_state_events, state_events) =
676 processors::state_events::sync::collect(&mut context, &new_info.state.events);
677
678 let mut new_user_ids = processors::state_events::sync::dispatch_and_get_new_users(
679 &mut context,
680 (&raw_state_events, &state_events),
681 &mut room_info,
682 &mut ambiguity_cache,
683 )
684 .await?;
685
686 let (raw_state_events_from_timeline, state_events_from_timeline) =
687 processors::state_events::sync::collect_from_timeline(
688 &mut context,
689 &new_info.timeline.events,
690 );
691
692 let mut other_new_user_ids =
693 processors::state_events::sync::dispatch_and_get_new_users(
694 &mut context,
695 (&raw_state_events_from_timeline, &state_events_from_timeline),
696 &mut room_info,
697 &mut ambiguity_cache,
698 )
699 .await?;
700 new_user_ids.append(&mut other_new_user_ids);
701
702 let timeline = processors::timeline::build(
703 &mut context,
704 &room,
705 &mut room_info,
706 processors::timeline::builder::Timeline::from(new_info.timeline),
707 processors::timeline::builder::Notification::new(
708 &push_rules,
709 &mut notifications,
710 &self.state_store,
711 ),
712 #[cfg(feature = "e2e-encryption")]
713 processors::timeline::builder::E2EE::new(
714 olm_machine.as_ref(),
715 self.decryption_trust_requirement,
716 self.handle_verification_events,
717 ),
718 )
719 .await?;
720
721 context.state_changes.add_room(room_info);
723
724 processors::account_data::for_room(
725 &mut context,
726 &room_id,
727 &new_info.account_data.events,
728 &self.state_store,
729 )
730 .await;
731
732 let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
733
734 new_rooms.leave.insert(
735 room_id,
736 LeftRoomUpdate::new(
737 timeline,
738 new_info.state.events,
739 new_info.account_data.events,
740 ambiguity_changes,
741 ),
742 );
743 }
744
745 for (room_id, new_info) in response.rooms.invite {
746 let room = self.state_store.get_or_create_room(
747 &room_id,
748 RoomState::Invited,
749 self.room_info_notable_update_sender.clone(),
750 );
751
752 let (raw_events, events) = processors::state_events::stripped::collect(
753 &mut context,
754 &new_info.invite_state.events,
755 );
756
757 let mut room_info = room.clone_info();
758 room_info.mark_as_invited();
759 room_info.mark_state_fully_synced();
760
761 processors::state_events::stripped::dispatch_invite_or_knock(
762 &mut context,
763 (&raw_events, &events),
764 &room,
765 &mut room_info,
766 &push_rules,
767 &mut notifications,
768 &self.state_store,
769 )
770 .await?;
771
772 context.state_changes.add_room(room_info);
773
774 new_rooms.invite.insert(room_id, new_info);
775 }
776
777 for (room_id, new_info) in response.rooms.knock {
778 let room = self.state_store.get_or_create_room(
779 &room_id,
780 RoomState::Knocked,
781 self.room_info_notable_update_sender.clone(),
782 );
783
784 let (raw_events, events) = processors::state_events::stripped::collect(
785 &mut context,
786 &new_info.knock_state.events,
787 );
788
789 let mut room_info = room.clone_info();
790 room_info.mark_as_knocked();
791 room_info.mark_state_fully_synced();
792
793 processors::state_events::stripped::dispatch_invite_or_knock(
794 &mut context,
795 (&raw_events, &events),
796 &room,
797 &mut room_info,
798 &push_rules,
799 &mut notifications,
800 &self.state_store,
801 )
802 .await?;
803
804 context.state_changes.add_room(room_info);
805 new_rooms.knocked.insert(room_id, new_info);
806 }
807
808 global_account_data_processor.apply(&mut context, &self.state_store).await;
809
810 context.state_changes.presence = response
811 .presence
812 .events
813 .iter()
814 .filter_map(|e| {
815 let event = e.deserialize().ok()?;
816 Some((event.sender, e.clone()))
817 })
818 .collect();
819
820 context.state_changes.ambiguity_maps = ambiguity_cache.cache;
821
822 {
823 let _sync_lock = self.sync_lock().lock().await;
824
825 processors::changes::save_and_apply(
826 context,
827 &self.state_store,
828 &self.ignore_user_list_changes,
829 Some(response.next_batch.clone()),
830 )
831 .await?;
832 }
833
834 new_rooms.update_in_memory_caches(&self.state_store).await;
840
841 for (room_id, member_ids) in updated_members_in_room {
842 if let Some(room) = self.get_room(&room_id) {
843 let _ =
844 room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
845 }
846 }
847
848 info!("Processed a sync response in {:?}", now.elapsed());
849
850 let response = SyncResponse {
851 rooms: new_rooms,
852 presence: response.presence.events,
853 account_data: response.account_data.events,
854 to_device,
855 notifications,
856 };
857
858 Ok(response)
859 }
860
861 #[instrument(skip_all, fields(?room_id))]
873 pub async fn receive_all_members(
874 &self,
875 room_id: &RoomId,
876 request: &api::membership::get_member_events::v3::Request,
877 response: &api::membership::get_member_events::v3::Response,
878 ) -> Result<()> {
879 if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
880 {
881 return Err(Error::InvalidReceiveMembersParameters);
885 }
886
887 let Some(room) = self.state_store.room(room_id) else {
888 return Ok(());
890 };
891
892 let mut chunk = Vec::with_capacity(response.chunk.len());
893 let mut context = Context::new(StateChanges::default(), Default::default());
894
895 #[cfg(feature = "e2e-encryption")]
896 let mut user_ids = BTreeSet::new();
897
898 let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
899
900 for raw_event in &response.chunk {
901 let member = match raw_event.deserialize() {
902 Ok(ev) => ev,
903 Err(e) => {
904 let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
905 debug!(event_id, "Failed to deserialize member event: {e}");
906 continue;
907 }
908 };
909
910 #[cfg(feature = "e2e-encryption")]
920 match member.membership() {
921 MembershipState::Join | MembershipState::Invite => {
922 user_ids.insert(member.state_key().to_owned());
923 }
924 _ => (),
925 }
926
927 if let StateEvent::Original(e) = &member {
928 if let Some(d) = &e.content.displayname {
929 let display_name = DisplayName::new(d);
930 ambiguity_map
931 .entry(display_name)
932 .or_default()
933 .insert(member.state_key().clone());
934 }
935 }
936
937 let sync_member: SyncRoomMemberEvent = member.clone().into();
938 processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
939
940 context
941 .state_changes
942 .state
943 .entry(room_id.to_owned())
944 .or_default()
945 .entry(member.event_type())
946 .or_default()
947 .insert(member.state_key().to_string(), raw_event.clone().cast());
948 chunk.push(member);
949 }
950
951 #[cfg(feature = "e2e-encryption")]
952 processors::e2ee::tracked_users::update(
953 &mut context,
954 self.olm_machine().await.as_ref(),
955 room.encryption_state(),
956 &user_ids,
957 )
958 .await?;
959
960 context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
961
962 let _sync_lock = self.sync_lock().lock().await;
963 let mut room_info = room.clone_info();
964 room_info.mark_members_synced();
965 context.state_changes.add_room(room_info);
966
967 processors::changes::save_and_apply(
968 context,
969 &self.state_store,
970 &self.ignore_user_list_changes,
971 None,
972 )
973 .await?;
974
975 let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
976
977 Ok(())
978 }
979
980 pub async fn receive_filter_upload(
996 &self,
997 filter_name: &str,
998 response: &api::filter::create_filter::v3::Response,
999 ) -> Result<()> {
1000 Ok(self
1001 .state_store
1002 .set_kv_data(
1003 StateStoreDataKey::Filter(filter_name),
1004 StateStoreDataValue::Filter(response.filter_id.clone()),
1005 )
1006 .await?)
1007 }
1008
1009 pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
1021 let filter = self
1022 .state_store
1023 .get_kv_data(StateStoreDataKey::Filter(filter_name))
1024 .await?
1025 .map(|d| d.into_filter().expect("State store data not a filter"));
1026
1027 Ok(filter)
1028 }
1029
1030 #[cfg(feature = "e2e-encryption")]
1032 pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
1033 match self.olm_machine().await.as_ref() {
1034 Some(o) => {
1035 let Some(room) = self.get_room(room_id) else {
1036 return Err(Error::InsufficientData);
1037 };
1038
1039 let history_visibility = room.history_visibility_or_default();
1040 let Some(room_encryption_event) = room.encryption_settings() else {
1041 return Err(Error::EncryptionNotEnabled);
1042 };
1043
1044 let filter = if history_visibility == HistoryVisibility::Joined {
1047 RoomMemberships::JOIN
1048 } else {
1049 RoomMemberships::ACTIVE
1050 };
1051
1052 let members = self.state_store.get_user_ids(room_id, filter).await?;
1053
1054 let settings = EncryptionSettings::new(
1055 room_encryption_event,
1056 history_visibility,
1057 self.room_key_recipient_strategy.clone(),
1058 );
1059
1060 Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
1061 }
1062 None => panic!("Olm machine wasn't started"),
1063 }
1064 }
1065
1066 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
1072 self.state_store.room(room_id)
1073 }
1074
1075 pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
1083 self.state_store.forget_room(room_id).await?;
1085
1086 self.event_cache_store().lock().await?.remove_room(room_id).await?;
1088
1089 Ok(())
1090 }
1091
1092 #[cfg(feature = "e2e-encryption")]
1094 pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
1095 self.olm_machine.read().await
1096 }
1097
1098 pub(crate) async fn get_push_rules(
1104 &self,
1105 global_account_data_processor: &processors::account_data::Global,
1106 ) -> Result<Ruleset> {
1107 if let Some(event) = global_account_data_processor
1108 .push_rules()
1109 .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
1110 {
1111 Ok(event.content.global)
1112 } else if let Some(event) = self
1113 .state_store
1114 .get_account_data_event_static::<PushRulesEventContent>()
1115 .await?
1116 .and_then(|ev| ev.deserialize().ok())
1117 {
1118 Ok(event.content.global)
1119 } else if let Some(session_meta) = self.state_store.session_meta() {
1120 Ok(Ruleset::server_default(&session_meta.user_id))
1121 } else {
1122 Ok(Ruleset::new())
1123 }
1124 }
1125
1126 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
1129 self.ignore_user_list_changes.subscribe()
1130 }
1131
1132 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
1136 self.room_info_notable_update_sender.subscribe()
1137 }
1138}
1139
1140#[derive(Debug, Default)]
1152pub struct RequestedRequiredStates {
1153 default: Vec<(StateEventType, String)>,
1154 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1155}
1156
1157impl RequestedRequiredStates {
1158 pub fn new(
1163 default: Vec<(StateEventType, String)>,
1164 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
1165 ) -> Self {
1166 Self { default, for_rooms }
1167 }
1168
1169 pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
1171 self.for_rooms.get(room_id).unwrap_or(&self.default)
1172 }
1173}
1174
1175impl From<&v5::Request> for RequestedRequiredStates {
1176 fn from(request: &v5::Request) -> Self {
1177 let mut default = BTreeSet::new();
1184
1185 for list in request.lists.values() {
1186 default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1187 }
1188
1189 for room_subscription in request.room_subscriptions.values() {
1190 default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1191 }
1192
1193 Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1194 }
1195}
1196
1197#[cfg(test)]
1198mod tests {
1199 use std::collections::HashMap;
1200
1201 use assert_matches2::assert_let;
1202 use futures_util::FutureExt as _;
1203 use matrix_sdk_test::{
1204 async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1205 LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder, BOB,
1206 };
1207 use ruma::{
1208 api::client::{self as api, sync::sync_events::v5},
1209 event_id,
1210 events::{room::member::MembershipState, StateEventType},
1211 room_id,
1212 serde::Raw,
1213 user_id,
1214 };
1215 use serde_json::{json, value::to_raw_value};
1216
1217 use super::{BaseClient, RequestedRequiredStates};
1218 use crate::{
1219 store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1220 test_utils::logged_in_base_client,
1221 RoomDisplayName, RoomState, SessionMeta,
1222 };
1223
1224 #[test]
1225 fn test_requested_required_states() {
1226 let room_id_0 = room_id!("!r0");
1227 let room_id_1 = room_id!("!r1");
1228
1229 let requested_required_states = RequestedRequiredStates::new(
1230 vec![(StateEventType::RoomAvatar, "".to_owned())],
1231 HashMap::from([(
1232 room_id_0.to_owned(),
1233 vec![
1234 (StateEventType::RoomMember, "foo".to_owned()),
1235 (StateEventType::RoomEncryption, "".to_owned()),
1236 ],
1237 )]),
1238 );
1239
1240 assert_eq!(
1242 requested_required_states.for_room(room_id_0),
1243 &[
1244 (StateEventType::RoomMember, "foo".to_owned()),
1245 (StateEventType::RoomEncryption, "".to_owned()),
1246 ]
1247 );
1248
1249 assert_eq!(
1251 requested_required_states.for_room(room_id_1),
1252 &[(StateEventType::RoomAvatar, "".to_owned()),]
1253 );
1254 }
1255
1256 #[test]
1257 fn test_requested_required_states_from_sync_v5_request() {
1258 let room_id_0 = room_id!("!r0");
1259 let room_id_1 = room_id!("!r1");
1260
1261 let mut request = v5::Request::new();
1263
1264 {
1265 let requested_required_states = RequestedRequiredStates::from(&request);
1266
1267 assert!(requested_required_states.default.is_empty());
1268 assert!(requested_required_states.for_rooms.is_empty());
1269 }
1270
1271 request.lists.insert("foo".to_owned(), {
1273 let mut list = v5::request::List::default();
1274 list.room_details.required_state = vec![
1275 (StateEventType::RoomAvatar, "".to_owned()),
1276 (StateEventType::RoomEncryption, "".to_owned()),
1277 ];
1278
1279 list
1280 });
1281
1282 {
1283 let requested_required_states = RequestedRequiredStates::from(&request);
1284
1285 assert_eq!(
1286 requested_required_states.default,
1287 &[
1288 (StateEventType::RoomAvatar, "".to_owned()),
1289 (StateEventType::RoomEncryption, "".to_owned())
1290 ]
1291 );
1292 assert!(requested_required_states.for_rooms.is_empty());
1293 }
1294
1295 request.lists.insert("bar".to_owned(), {
1297 let mut list = v5::request::List::default();
1298 list.room_details.required_state = vec![
1299 (StateEventType::RoomEncryption, "".to_owned()),
1300 (StateEventType::RoomName, "".to_owned()),
1301 ];
1302
1303 list
1304 });
1305
1306 {
1307 let requested_required_states = RequestedRequiredStates::from(&request);
1308
1309 assert_eq!(
1311 requested_required_states.default,
1312 &[
1313 (StateEventType::RoomAvatar, "".to_owned()),
1314 (StateEventType::RoomEncryption, "".to_owned()),
1315 (StateEventType::RoomName, "".to_owned()),
1316 ]
1317 );
1318 assert!(requested_required_states.for_rooms.is_empty());
1319 }
1320
1321 request.room_subscriptions.insert(room_id_0.to_owned(), {
1323 let mut room_subscription = v5::request::RoomSubscription::default();
1324
1325 room_subscription.required_state = vec![
1326 (StateEventType::RoomJoinRules, "".to_owned()),
1327 (StateEventType::RoomEncryption, "".to_owned()),
1328 ];
1329
1330 room_subscription
1331 });
1332
1333 {
1334 let requested_required_states = RequestedRequiredStates::from(&request);
1335
1336 assert_eq!(
1338 requested_required_states.default,
1339 &[
1340 (StateEventType::RoomAvatar, "".to_owned()),
1341 (StateEventType::RoomEncryption, "".to_owned()),
1342 (StateEventType::RoomJoinRules, "".to_owned()),
1343 (StateEventType::RoomName, "".to_owned()),
1344 ]
1345 );
1346 assert!(requested_required_states.for_rooms.is_empty());
1347 }
1348
1349 request.room_subscriptions.insert(room_id_1.to_owned(), {
1351 let mut room_subscription = v5::request::RoomSubscription::default();
1352
1353 room_subscription.required_state = vec![
1354 (StateEventType::RoomName, "".to_owned()),
1355 (StateEventType::RoomTopic, "".to_owned()),
1356 ];
1357
1358 room_subscription
1359 });
1360
1361 {
1362 let requested_required_states = RequestedRequiredStates::from(&request);
1363
1364 assert_eq!(
1366 requested_required_states.default,
1367 &[
1368 (StateEventType::RoomAvatar, "".to_owned()),
1369 (StateEventType::RoomEncryption, "".to_owned()),
1370 (StateEventType::RoomJoinRules, "".to_owned()),
1371 (StateEventType::RoomName, "".to_owned()),
1372 (StateEventType::RoomTopic, "".to_owned()),
1373 ]
1374 );
1375 }
1376 }
1377
1378 #[async_test]
1379 async fn test_invite_after_leaving() {
1380 let user_id = user_id!("@alice:example.org");
1381 let room_id = room_id!("!test:example.org");
1382
1383 let client = logged_in_base_client(Some(user_id)).await;
1384
1385 let mut sync_builder = SyncResponseBuilder::new();
1386
1387 let response = sync_builder
1388 .add_left_room(
1389 LeftRoomBuilder::new(room_id).add_timeline_event(
1390 EventFactory::new()
1391 .member(user_id)
1392 .membership(MembershipState::Leave)
1393 .display_name("Alice")
1394 .event_id(event_id!("$994173582443PhrSn:example.org")),
1395 ),
1396 )
1397 .build_sync_response();
1398 client.receive_sync_response(response).await.unwrap();
1399 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1400
1401 let response = sync_builder
1402 .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1403 StrippedStateTestEvent::Custom(json!({
1404 "content": {
1405 "displayname": "Alice",
1406 "membership": "invite",
1407 },
1408 "event_id": "$143273582443PhrSn:example.org",
1409 "origin_server_ts": 1432735824653u64,
1410 "sender": "@example:example.org",
1411 "state_key": user_id,
1412 "type": "m.room.member",
1413 })),
1414 ))
1415 .build_sync_response();
1416 client.receive_sync_response(response).await.unwrap();
1417 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1418 }
1419
1420 #[async_test]
1421 async fn test_invite_displayname() {
1422 let user_id = user_id!("@alice:example.org");
1423 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1424
1425 let client = logged_in_base_client(Some(user_id)).await;
1426
1427 let response = ruma_response_from_json(&json!({
1428 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1429 "device_one_time_keys_count": {
1430 "signed_curve25519": 50u64
1431 },
1432 "device_unused_fallback_key_types": [
1433 "signed_curve25519"
1434 ],
1435 "rooms": {
1436 "invite": {
1437 "!ithpyNKDtmhneaTQja:example.org": {
1438 "invite_state": {
1439 "events": [
1440 {
1441 "content": {
1442 "creator": "@test:example.org",
1443 "room_version": "9"
1444 },
1445 "sender": "@test:example.org",
1446 "state_key": "",
1447 "type": "m.room.create"
1448 },
1449 {
1450 "content": {
1451 "join_rule": "invite"
1452 },
1453 "sender": "@test:example.org",
1454 "state_key": "",
1455 "type": "m.room.join_rules"
1456 },
1457 {
1458 "content": {
1459 "algorithm": "m.megolm.v1.aes-sha2"
1460 },
1461 "sender": "@test:example.org",
1462 "state_key": "",
1463 "type": "m.room.encryption"
1464 },
1465 {
1466 "content": {
1467 "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1468 "displayname": "Kyra",
1469 "membership": "join"
1470 },
1471 "sender": "@test:example.org",
1472 "state_key": "@test:example.org",
1473 "type": "m.room.member"
1474 },
1475 {
1476 "content": {
1477 "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1478 "displayname": "alice",
1479 "is_direct": true,
1480 "membership": "invite"
1481 },
1482 "origin_server_ts": 1650878657984u64,
1483 "sender": "@test:example.org",
1484 "state_key": "@alice:example.org",
1485 "type": "m.room.member",
1486 "unsigned": {
1487 "age": 14u64
1488 },
1489 "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1490 }
1491 ]
1492 }
1493 }
1494 }
1495 }
1496 }));
1497
1498 client.receive_sync_response(response).await.unwrap();
1499
1500 let room = client.get_room(room_id).expect("Room not found");
1501 assert_eq!(room.state(), RoomState::Invited);
1502 assert_eq!(
1503 room.compute_display_name().await.expect("fetching display name failed"),
1504 RoomDisplayName::Calculated("Kyra".to_owned())
1505 );
1506 }
1507
1508 #[async_test]
1509 async fn test_deserialization_failure() {
1510 let user_id = user_id!("@alice:example.org");
1511 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1512
1513 let client =
1514 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1515 client
1516 .activate(
1517 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1518 RoomLoadSettings::default(),
1519 #[cfg(feature = "e2e-encryption")]
1520 None,
1521 )
1522 .await
1523 .unwrap();
1524
1525 let response = ruma_response_from_json(&json!({
1526 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1527 "rooms": {
1528 "join": {
1529 "!ithpyNKDtmhneaTQja:example.org": {
1530 "state": {
1531 "events": [
1532 {
1533 "invalid": "invalid",
1534 },
1535 {
1536 "content": {
1537 "name": "The room name"
1538 },
1539 "event_id": "$143273582443PhrSn:example.org",
1540 "origin_server_ts": 1432735824653u64,
1541 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1542 "sender": "@example:example.org",
1543 "state_key": "",
1544 "type": "m.room.name",
1545 "unsigned": {
1546 "age": 1234
1547 }
1548 },
1549 ]
1550 }
1551 }
1552 }
1553 }
1554 }));
1555
1556 client.receive_sync_response(response).await.unwrap();
1557 client
1558 .state_store()
1559 .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1560 .await
1561 .expect("Failed to fetch state event")
1562 .expect("State event not found")
1563 .deserialize()
1564 .expect("Failed to deserialize state event");
1565 }
1566
1567 #[async_test]
1568 async fn test_invited_members_arent_ignored() {
1569 let user_id = user_id!("@alice:example.org");
1570 let inviter_user_id = user_id!("@bob:example.org");
1571 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1572
1573 let client =
1574 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1575 client
1576 .activate(
1577 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1578 RoomLoadSettings::default(),
1579 #[cfg(feature = "e2e-encryption")]
1580 None,
1581 )
1582 .await
1583 .unwrap();
1584
1585 let mut sync_builder = SyncResponseBuilder::new();
1587 let response = sync_builder
1588 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1589 .build_sync_response();
1590 client.receive_sync_response(response).await.unwrap();
1591
1592 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1595
1596 let raw_member_event = json!({
1597 "content": {
1598 "avatar_url": "mxc://localhost/fewjilfewjil42",
1599 "displayname": "Invited Alice",
1600 "membership": "invite"
1601 },
1602 "event_id": "$151800140517rfvjc:localhost",
1603 "origin_server_ts": 151800140,
1604 "room_id": room_id,
1605 "sender": inviter_user_id,
1606 "state_key": user_id,
1607 "type": "m.room.member",
1608 "unsigned": {
1609 "age": 13374242,
1610 }
1611 });
1612 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1613 to_raw_value(&raw_member_event).unwrap(),
1614 )]);
1615
1616 client.receive_all_members(room_id, &request, &response).await.unwrap();
1618
1619 let room = client.get_room(room_id).unwrap();
1620
1621 let member = room.get_member(user_id).await.expect("ok").expect("exists");
1623
1624 assert_eq!(member.user_id(), user_id);
1625 assert_eq!(member.display_name().unwrap(), "Invited Alice");
1626 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1627 }
1628
1629 #[async_test]
1630 async fn test_reinvited_members_get_a_display_name() {
1631 let user_id = user_id!("@alice:example.org");
1632 let inviter_user_id = user_id!("@bob:example.org");
1633 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1634
1635 let client =
1636 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1637 client
1638 .activate(
1639 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1640 RoomLoadSettings::default(),
1641 #[cfg(feature = "e2e-encryption")]
1642 None,
1643 )
1644 .await
1645 .unwrap();
1646
1647 let mut sync_builder = SyncResponseBuilder::new();
1649 let response = sync_builder
1650 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
1651 StateTestEvent::Custom(json!({
1652 "content": {
1653 "avatar_url": null,
1654 "displayname": null,
1655 "membership": "leave"
1656 },
1657 "event_id": "$151803140217rkvjc:localhost",
1658 "origin_server_ts": 151800139,
1659 "room_id": room_id,
1660 "sender": user_id,
1661 "state_key": user_id,
1662 "type": "m.room.member",
1663 })),
1664 ))
1665 .build_sync_response();
1666 client.receive_sync_response(response).await.unwrap();
1667
1668 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1670
1671 let raw_member_event = json!({
1672 "content": {
1673 "avatar_url": "mxc://localhost/fewjilfewjil42",
1674 "displayname": "Invited Alice",
1675 "membership": "invite"
1676 },
1677 "event_id": "$151800140517rfvjc:localhost",
1678 "origin_server_ts": 151800140,
1679 "room_id": room_id,
1680 "sender": inviter_user_id,
1681 "state_key": user_id,
1682 "type": "m.room.member",
1683 "unsigned": {
1684 "age": 13374242,
1685 }
1686 });
1687 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1688 to_raw_value(&raw_member_event).unwrap(),
1689 )]);
1690
1691 client.receive_all_members(room_id, &request, &response).await.unwrap();
1693
1694 let room = client.get_room(room_id).unwrap();
1695
1696 let member = room.get_member(user_id).await.expect("ok").expect("exists");
1698
1699 assert_eq!(member.user_id(), user_id);
1700 assert_eq!(member.display_name().unwrap(), "Invited Alice");
1701 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1702 }
1703
1704 #[async_test]
1705 async fn test_ignored_user_list_changes() {
1706 let user_id = user_id!("@alice:example.org");
1707 let client =
1708 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1709
1710 client
1711 .activate(
1712 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1713 RoomLoadSettings::default(),
1714 #[cfg(feature = "e2e-encryption")]
1715 None,
1716 )
1717 .await
1718 .unwrap();
1719
1720 let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1721 assert!(subscriber.next().now_or_never().is_none());
1722
1723 let mut sync_builder = SyncResponseBuilder::new();
1724 let response = sync_builder
1725 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1726 json!({
1727 "content": {
1728 "ignored_users": {
1729 *BOB: {}
1730 }
1731 },
1732 "type": "m.ignored_user_list",
1733 }),
1734 ))
1735 .build_sync_response();
1736 client.receive_sync_response(response).await.unwrap();
1737
1738 assert_let!(Some(ignored) = subscriber.next().await);
1739 assert_eq!(ignored, [BOB.to_string()]);
1740
1741 let response = sync_builder
1743 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1744 json!({
1745 "content": {
1746 "ignored_users": {
1747 *BOB: {}
1748 }
1749 },
1750 "type": "m.ignored_user_list",
1751 }),
1752 ))
1753 .build_sync_response();
1754 client.receive_sync_response(response).await.unwrap();
1755
1756 assert!(subscriber.next().now_or_never().is_none());
1758
1759 let response = sync_builder
1761 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1762 json!({
1763 "content": {
1764 "ignored_users": {}
1765 },
1766 "type": "m.ignored_user_list",
1767 }),
1768 ))
1769 .build_sync_response();
1770 client.receive_sync_response(response).await.unwrap();
1771
1772 assert_let!(Some(ignored) = subscriber.next().await);
1773 assert!(ignored.is_empty());
1774 }
1775}