1use std::{
16 collections::BTreeMap,
17 ops::Deref,
18 sync::{Arc, Mutex},
19 time::Duration,
20};
21
22use futures_util::{StreamExt as _, pin_mut};
23use itertools::Itertools;
24use matrix_sdk::{
25 Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
26};
27use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
28use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
29use ruma::{
30 EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
31 api::client::sync::sync_events::v5 as http,
32 assign,
33 events::{
34 AnyMessageLikeEventContent, AnyStateEvent, AnyStateEventContentChange,
35 AnySyncMessageLikeEvent, AnySyncTimelineEvent, StateEventContentChange, StateEventType,
36 TimelineEventType,
37 room::{
38 encrypted::OriginalSyncRoomEncryptedEvent,
39 join_rules::JoinRule,
40 member::{MembershipState, StrippedRoomMemberEvent},
41 message::{Relation, SyncRoomMessageEvent},
42 },
43 },
44 html::RemoveReplyFallback,
45 push::Action,
46 serde::Raw,
47 uint,
48};
49use thiserror::Error;
50use tokio::sync::Mutex as AsyncMutex;
51use tracing::{debug, info, instrument, trace, warn};
52
53use crate::{
54 DEFAULT_SANITIZER_MODE,
55 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService},
56 sync_service::SyncService,
57};
58
59#[derive(Clone)]
61pub enum NotificationProcessSetup {
62 MultipleProcesses,
71
72 SingleProcess { sync_service: Arc<SyncService> },
80}
81
82pub struct NotificationClient {
88 client: Client,
90
91 parent_client: Client,
93
94 process_setup: NotificationProcessSetup,
96
97 notification_sync_mutex: AsyncMutex<()>,
105
106 encryption_sync_mutex: AsyncMutex<()>,
111}
112
113impl NotificationClient {
114 const CONNECTION_ID: &'static str = "notifications";
115 const LOCK_ID: &'static str = "notifications";
116
117 pub async fn new(
119 parent_client: Client,
120 process_setup: NotificationProcessSetup,
121 ) -> Result<Self, Error> {
122 let cross_process_store_config = match process_setup {
124 NotificationProcessSetup::MultipleProcesses => {
125 CrossProcessLockConfig::multi_process(Self::LOCK_ID)
126 }
127 NotificationProcessSetup::SingleProcess { .. } => CrossProcessLockConfig::SingleProcess,
128 };
129 let client = parent_client.notification_client(cross_process_store_config).await?;
130
131 Ok(NotificationClient {
132 client,
133 parent_client,
134 notification_sync_mutex: AsyncMutex::new(()),
135 encryption_sync_mutex: AsyncMutex::new(()),
136 process_setup,
137 })
138 }
139
140 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
144 self.client.get_room(room_id)
145 }
146
147 #[instrument(skip(self))]
156 pub async fn get_notification(
157 &self,
158 room_id: &RoomId,
159 event_id: &EventId,
160 ) -> Result<NotificationStatus, Error> {
161 let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
162 match status {
163 NotificationStatus::Event(..)
164 | NotificationStatus::EventFilteredOut
165 | NotificationStatus::EventRedacted => Ok(status),
166 NotificationStatus::EventNotFound => {
167 self.get_notification_with_context(room_id, event_id).await
168 }
169 }
170 }
171
172 pub async fn get_notifications(
186 &self,
187 requests: &[NotificationItemsRequest],
188 ) -> Result<BatchNotificationFetchingResult, Error> {
189 let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
190
191 for request in requests {
192 for event_id in &request.event_ids {
193 match notifications.get_mut(event_id) {
194 Some(Ok(NotificationStatus::EventNotFound)) | None => {
197 notifications.insert(
198 event_id.to_owned(),
199 self.get_notification_with_context(&request.room_id, event_id).await,
200 );
201 }
202
203 _ => {}
204 }
205 }
206 }
207
208 Ok(notifications)
209 }
210
211 #[instrument(skip_all)]
221 async fn retry_decryption(
222 &self,
223 room: &Room,
224 raw_event: &Raw<AnySyncTimelineEvent>,
225 ) -> Result<Option<TimelineEvent>, Error> {
226 let event: AnySyncTimelineEvent =
227 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
228
229 if !is_event_encrypted(event.event_type()) {
230 return Ok(None);
231 }
232
233 let _guard = self.encryption_sync_mutex.lock().await;
235
236 let push_ctx = room.push_context().await?;
247 let sync_permit_guard = match &self.process_setup {
248 NotificationProcessSetup::MultipleProcesses => {
249 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
253 sync_permit.lock_owned().await
254 }
255
256 NotificationProcessSetup::SingleProcess { sync_service } => {
257 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
258 permit_guard
259 } else {
260 let mut wait = 200;
269
270 debug!("Encryption sync running in background");
271 for _ in 0..3 {
272 trace!("waiting for decryption…");
273
274 sleep(Duration::from_millis(wait)).await;
275
276 let new_event = room
280 .decrypt_event(
281 raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
282 push_ctx.as_ref(),
283 )
284 .await?;
285
286 match new_event.kind {
287 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
288 utd_info, ..} => {
289 if utd_info.reason.is_missing_room_key() {
290 wait *= 2;
293 } else {
294 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
295 return Ok(None);
296 }
297 }
298 _ => {
299 trace!("Waiting succeeded and event could be decrypted!");
300 return Ok(Some(new_event));
301 }
302 }
303 }
304
305 debug!("Timeout waiting for the encryption sync to decrypt notification.");
307 return Ok(None);
308 }
309 }
310 };
311
312 let encryption_sync = EncryptionSyncService::new(
313 self.client.clone(),
314 Some((Duration::from_secs(3), Duration::from_secs(4))),
315 )
316 .await;
317
318 match encryption_sync {
323 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
324 Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
328 Ok(new_event) => match new_event.kind {
329 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
330 utd_info, ..
331 } => {
332 trace!(
333 "Encryption sync failed to decrypt the event: {:?}",
334 utd_info.reason
335 );
336 Ok(None)
337 }
338 _ => {
339 trace!("Encryption sync managed to decrypt the event.");
340 Ok(Some(new_event))
341 }
342 },
343 Err(err) => {
344 trace!("Encryption sync failed to decrypt the event: {err}");
345 Ok(None)
346 }
347 },
348 Err(err) => {
349 warn!("Encryption sync error: {err:#}");
350 Ok(None)
351 }
352 },
353 Err(err) => {
354 warn!("Encryption sync build error: {err:#}",);
355 Ok(None)
356 }
357 }
358 }
359
360 #[instrument(skip_all)]
379 async fn try_sliding_sync(
380 &self,
381 requests: &[NotificationItemsRequest],
382 ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
383 const MAX_SLIDING_SYNC_ATTEMPTS: u64 = 3;
384 let _guard = self.notification_sync_mutex.lock().await;
387
388 let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
393 let handler_raw_notification = raw_notifications.clone();
394
395 let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
396 let handler_raw_invites = raw_invites.clone();
397
398 let user_id = self.client.user_id().unwrap().to_owned();
399 let room_ids = requests.iter().map(|req| req.room_id.clone()).collect::<Vec<_>>();
400
401 let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
402
403 let timeline_event_handler = self.client.add_event_handler({
404 let requests = requests.clone();
405 move |raw: Raw<AnySyncTimelineEvent>| async move {
406 match &raw.get_field::<OwnedEventId>("event_id") {
407 Ok(Some(event_id)) => {
408 let Some(request) =
409 &requests.iter().find(|request| request.event_ids.contains(event_id))
410 else {
411 return;
412 };
413
414 let room_id = request.room_id.clone();
415
416 handler_raw_notification.lock().unwrap().insert(
420 event_id.to_owned(),
421 (room_id, Some(RawNotificationEvent::Timeline(raw))),
422 );
423 }
424 Ok(None) => {
425 warn!("a sync event had no event id");
426 }
427 Err(err) => {
428 warn!("failed to deserialize sync event id: {err}");
429 }
430 }
431 }
432 });
433
434 let handler_raw_notifications = raw_notifications.clone();
435 let stripped_member_handler = self.client.add_event_handler({
436 let requests = requests.clone();
437 let room_ids: Vec<_> = room_ids.clone();
438 move |raw: Raw<StrippedRoomMemberEvent>, room: Room| async move {
439 if !room_ids.contains(&room.room_id().to_owned()) {
440 return;
441 }
442
443 let deserialized = match raw.deserialize() {
444 Ok(d) => d,
445 Err(err) => {
446 warn!("failed to deserialize raw stripped room member event: {err}");
447 return;
448 }
449 };
450
451 trace!("received a stripped room member event");
452
453 match &raw.get_field::<OwnedEventId>("event_id") {
456 Ok(Some(event_id)) => {
457 let request =
458 &requests.iter().find(|request| request.event_ids.contains(event_id));
459 if request.is_none() {
460 return;
461 }
462 let room_id = request.unwrap().room_id.clone();
463
464 handler_raw_notifications.lock().unwrap().insert(
468 event_id.to_owned(),
469 (room_id, Some(RawNotificationEvent::Invite(raw))),
470 );
471 return;
472 }
473 Ok(None) => {
474 warn!("a room member event had no id");
475 }
476 Err(err) => {
477 warn!("failed to deserialize room member event id: {err}");
478 }
479 }
480
481 if deserialized.content.membership == MembershipState::Invite
483 && deserialized.state_key == user_id
484 {
485 trace!("found an invite event for the current user");
486 handler_raw_invites
490 .lock()
491 .unwrap()
492 .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
493 } else {
494 trace!("not an invite event, or not for the current user");
495 }
496 }
497 });
498
499 let required_state = vec![
501 (StateEventType::RoomEncryption, "".to_owned()),
502 (StateEventType::RoomMember, "$LAZY".to_owned()),
503 (StateEventType::RoomMember, "$ME".to_owned()),
504 (StateEventType::RoomCanonicalAlias, "".to_owned()),
505 (StateEventType::RoomName, "".to_owned()),
506 (StateEventType::RoomAvatar, "".to_owned()),
507 (StateEventType::RoomPowerLevels, "".to_owned()),
508 (StateEventType::RoomJoinRules, "".to_owned()),
509 (StateEventType::CallMember, "*".to_owned()),
510 (StateEventType::RoomCreate, "".to_owned()),
511 (StateEventType::MemberHints, "".to_owned()),
512 ];
513
514 let invites = SlidingSyncList::builder("invites")
515 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
516 .timeline_limit(8)
517 .required_state(required_state.clone())
518 .filters(Some(assign!(http::request::ListFilters::default(), {
519 is_invite: Some(true),
520 })));
521
522 let sync = self
523 .client
524 .sliding_sync(Self::CONNECTION_ID)?
525 .poll_timeout(Duration::from_secs(1))
526 .network_timeout(Duration::from_secs(3))
527 .with_account_data_extension(
528 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
529 )
530 .add_list(invites)
531 .build()
532 .await?;
533
534 sync.subscribe_to_rooms(
535 &room_ids.iter().map(|id| id.deref()).collect::<Vec<&RoomId>>(),
536 Some(assign!(http::request::RoomSubscription::default(), {
537 required_state,
538 timeline_limit: uint!(16)
539 })),
540 true,
541 );
542
543 let mut remaining_attempts = MAX_SLIDING_SYNC_ATTEMPTS;
544
545 let stream = sync.sync();
546 pin_mut!(stream);
547
548 let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
550
551 loop {
552 if stream.next().await.is_none() {
553 break;
555 }
556
557 let event_count = raw_notifications.lock().unwrap().len();
558 let invite_count = raw_invites.lock().unwrap().len();
559
560 let current_attempt = 1 + MAX_SLIDING_SYNC_ATTEMPTS - remaining_attempts;
561 trace!(
562 "Attempt #{current_attempt}: \
563 Found {event_count} notification(s), \
564 {invite_count} invite event(s), \
565 expected {expected_event_count} total",
566 );
567
568 if event_count + invite_count == expected_event_count {
573 break;
575 }
576
577 remaining_attempts -= 1;
578 warn!("There are some missing notifications, remaining attempts: {remaining_attempts}");
579 if remaining_attempts == 0 {
580 break;
582 }
583 }
584
585 self.client.remove_event_handler(stripped_member_handler);
586 self.client.remove_event_handler(timeline_event_handler);
587
588 let mut notifications = raw_notifications.clone().lock().unwrap().clone();
589 let mut missing_event_ids = Vec::new();
590
591 for request in requests.iter() {
593 for event_id in &request.event_ids {
594 if !notifications.contains_key(event_id) {
595 missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
596 }
597 }
598 }
599
600 for (room_id, missing_event_id) in missing_event_ids {
602 trace!("we didn't have a non-invite event, looking for invited room now");
603 if let Some(room) = self.client.get_room(&room_id) {
604 if room.state() == RoomState::Invited {
605 if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
606 notifications.insert(
607 missing_event_id.to_owned(),
608 (room_id.to_owned(), stripped_event),
609 );
610 }
611 } else {
612 debug!("the room isn't in the invited state");
613 }
614 } else {
615 warn!(%room_id, "unknown room, can't check for invite events");
616 }
617 }
618
619 let found = if notifications.len() == expected_event_count { "" } else { "not " };
620 trace!("all notification events have{found} been found");
621
622 Ok(notifications)
623 }
624
625 pub async fn get_notification_with_sliding_sync(
626 &self,
627 room_id: &RoomId,
628 event_id: &EventId,
629 ) -> Result<NotificationStatus, Error> {
630 info!("fetching notification event with a sliding sync");
631
632 let request = NotificationItemsRequest {
633 room_id: room_id.to_owned(),
634 event_ids: vec![event_id.to_owned()],
635 };
636
637 let mut get_notifications_result =
638 self.get_notifications_with_sliding_sync(&[request]).await?;
639
640 get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
641 }
642
643 async fn compute_status(
648 &self,
649 room: &Room,
650 push_actions: Option<&[Action]>,
651 raw_event: RawNotificationEvent,
652 state_events: Vec<Raw<AnyStateEvent>>,
653 ) -> Result<NotificationStatus, Error> {
654 if let Some(actions) = push_actions
655 && !actions.iter().any(|a| a.should_notify())
656 {
657 return Ok(NotificationStatus::EventFilteredOut);
659 }
660
661 let notification_item =
662 NotificationItem::new(room, raw_event, push_actions, state_events).await?;
663
664 if self.client.is_user_ignored(notification_item.event.sender()).await {
665 Ok(NotificationStatus::EventFilteredOut)
666 } else {
667 Ok(NotificationStatus::Event(Box::new(notification_item)))
668 }
669 }
670
671 pub async fn get_notifications_with_sliding_sync(
676 &self,
677 requests: &[NotificationItemsRequest],
678 ) -> Result<BatchNotificationFetchingResult, Error> {
679 let raw_events = self.try_sliding_sync(requests).await?;
680
681 let mut batch_result = BatchNotificationFetchingResult::new();
682
683 for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
684 let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
686
687 let Some(raw_event) = raw_event else {
688 batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
690 continue;
691 };
692
693 let (raw_event, push_actions) = match &raw_event {
694 RawNotificationEvent::Timeline(timeline_event) => {
695 let event_for_redaction_check: AnySyncTimelineEvent =
697 match timeline_event.deserialize() {
698 Ok(event) => event,
699 Err(_) => {
700 batch_result.insert(event_id, Err(Error::InvalidRumaEvent));
701 continue;
702 }
703 };
704
705 if is_event_redacted(&event_for_redaction_check) {
706 batch_result.insert(event_id, Ok(NotificationStatus::EventRedacted));
707 continue;
708 }
709
710 match self.retry_decryption(&room, timeline_event).await {
712 Ok(Some(timeline_event)) => {
713 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
714 (
715 RawNotificationEvent::Timeline(timeline_event.into_raw()),
716 push_actions,
717 )
718 }
719
720 Ok(None) => {
721 match room.event_push_actions(timeline_event).await {
724 Ok(push_actions) => (raw_event.clone(), push_actions),
725 Err(err) => {
726 batch_result.insert(event_id, Err(err.into()));
728 continue;
729 }
730 }
731 }
732
733 Err(err) => {
734 batch_result.insert(event_id, Err(err));
735 continue;
736 }
737 }
738 }
739
740 RawNotificationEvent::Invite(invite_event) => {
741 match room.event_push_actions(invite_event).await {
743 Ok(push_actions) => {
744 (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
745 }
746 Err(err) => {
747 batch_result.insert(event_id, Err(err.into()));
748 continue;
749 }
750 }
751 }
752 };
753
754 let notification_status_result =
755 self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
756
757 batch_result.insert(event_id, notification_status_result);
758 }
759
760 Ok(batch_result)
761 }
762
763 pub async fn get_notification_with_context(
776 &self,
777 room_id: &RoomId,
778 event_id: &EventId,
779 ) -> Result<NotificationStatus, Error> {
780 info!("fetching notification event with a /context query");
781
782 let Some(room) = self.parent_client.get_room(room_id) else {
784 return Err(Error::UnknownRoom);
785 };
786
787 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
788
789 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
790 let state_events = response.state;
791
792 let event_for_redaction_check: AnySyncTimelineEvent =
794 timeline_event.raw().deserialize().map_err(|_| Error::InvalidRumaEvent)?;
795
796 if is_event_redacted(&event_for_redaction_check) {
797 return Ok(NotificationStatus::EventRedacted);
798 }
799
800 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
801 timeline_event = decrypted_event;
802 }
803
804 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
805
806 self.compute_status(
807 &room,
808 push_actions.as_deref(),
809 RawNotificationEvent::Timeline(timeline_event.into_raw()),
810 state_events,
811 )
812 .await
813 }
814}
815
816fn is_event_encrypted(event_type: TimelineEventType) -> bool {
817 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
818
819 #[cfg(feature = "unstable-msc3956")]
820 let is_still_encrypted =
821 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
822
823 is_still_encrypted
824}
825
826fn is_event_redacted(event: &AnySyncTimelineEvent) -> bool {
827 match event {
830 AnySyncTimelineEvent::MessageLike(msg) => msg.is_redacted(),
831 _ => false,
832 }
833}
834
835#[derive(Debug)]
836pub enum NotificationStatus {
837 Event(Box<NotificationItem>),
839 EventNotFound,
841 EventFilteredOut,
845 EventRedacted,
847}
848
849#[derive(Debug, Clone)]
850pub struct NotificationItemsRequest {
851 pub room_id: OwnedRoomId,
852 pub event_ids: Vec<OwnedEventId>,
853}
854
855type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
856
857#[derive(Debug, Clone)]
862pub enum RawNotificationEvent {
863 Timeline(Raw<AnySyncTimelineEvent>),
865 Invite(Raw<StrippedRoomMemberEvent>),
868}
869
870#[derive(Debug)]
873pub enum NotificationEvent {
874 Timeline(Box<AnySyncTimelineEvent>),
876 Invite(Box<StrippedRoomMemberEvent>),
878}
879
880impl NotificationEvent {
881 pub fn sender(&self) -> &UserId {
882 match self {
883 NotificationEvent::Timeline(ev) => ev.sender(),
884 NotificationEvent::Invite(ev) => &ev.sender,
885 }
886 }
887
888 fn thread_id(&self) -> Option<OwnedEventId> {
891 let NotificationEvent::Timeline(sync_timeline_event) = &self else {
892 return None;
893 };
894 let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
895 return None;
896 };
897 let content = event.original_content()?;
898 match content {
899 AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
900 Relation::Thread(thread) => Some(thread.event_id),
901 _ => None,
902 },
903 _ => None,
904 }
905 }
906}
907
908#[derive(Debug)]
910pub struct NotificationItem {
911 pub event: NotificationEvent,
913
914 pub raw_event: RawNotificationEvent,
916
917 pub sender_display_name: Option<String>,
919 pub sender_avatar_url: Option<String>,
921 pub is_sender_name_ambiguous: bool,
923
924 pub room_computed_display_name: String,
926 pub room_avatar_url: Option<String>,
928 pub room_canonical_alias: Option<String>,
930 pub room_topic: Option<String>,
932 pub room_join_rule: Option<JoinRule>,
936 pub is_room_encrypted: Option<bool>,
938 pub is_direct_message_room: bool,
940 pub joined_members_count: u64,
942 pub service_members: Vec<String>,
944 pub active_service_members_count: u64,
945 pub is_space: bool,
947
948 pub is_noisy: Option<bool>,
953 pub has_mention: Option<bool>,
954 pub thread_id: Option<OwnedEventId>,
955
956 pub actions: Option<Vec<Action>>,
958
959 pub room_is_dm: bool,
961}
962
963impl NotificationItem {
964 async fn new(
965 room: &Room,
966 raw_event: RawNotificationEvent,
967 push_actions: Option<&[Action]>,
968 state_events: Vec<Raw<AnyStateEvent>>,
969 ) -> Result<Self, Error> {
970 let event = match &raw_event {
971 RawNotificationEvent::Timeline(raw_event) => {
972 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
973 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
974 SyncRoomMessageEvent::Original(ev),
975 )) = &mut event
976 {
977 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
978 }
979 NotificationEvent::Timeline(Box::new(event))
980 }
981 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
982 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
983 )),
984 };
985
986 let sender = match room.state() {
987 RoomState::Invited => room.invite_details().await?.inviter,
988 _ => room.get_member_no_sync(event.sender()).await?,
989 };
990
991 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
992 match &sender {
993 Some(sender) => (
994 sender.display_name().map(|s| s.to_owned()),
995 sender.avatar_url().map(|s| s.to_string()),
996 sender.name_ambiguous(),
997 ),
998 None => (None, None, false),
999 };
1000
1001 if sender_display_name.is_none() || sender_avatar_url.is_none() {
1002 let sender_id = event.sender();
1003 for ev in state_events {
1004 let ev = match ev.deserialize() {
1005 Ok(ev) => ev,
1006 Err(err) => {
1007 warn!("Failed to deserialize a state event: {err}");
1008 continue;
1009 }
1010 };
1011 if ev.sender() != sender_id {
1012 continue;
1013 }
1014 if let AnyStateEventContentChange::RoomMember(StateEventContentChange::Original {
1015 content,
1016 ..
1017 }) = ev.content_change()
1018 {
1019 if sender_display_name.is_none() {
1020 sender_display_name = content.displayname;
1021 }
1022 if sender_avatar_url.is_none() {
1023 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
1024 }
1025 }
1026 }
1027 }
1028
1029 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
1030 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
1031 let thread_id = event.thread_id().clone();
1032 let service_members = room
1033 .service_members()
1034 .unwrap_or_default()
1035 .iter()
1036 .map(ToString::to_string)
1037 .collect_vec();
1038
1039 let active_service_members_count =
1040 room.update_active_service_members().await?.unwrap_or_default().len() as u64;
1041
1042 let item = NotificationItem {
1043 event,
1044 raw_event,
1045 sender_display_name,
1046 sender_avatar_url,
1047 is_sender_name_ambiguous,
1048 room_computed_display_name: room.display_name().await?.to_string(),
1049 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
1050 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
1051 room_topic: room.topic(),
1052 room_join_rule: room.join_rule(),
1053 is_direct_message_room: room.is_direct().await?,
1054 is_room_encrypted: room
1055 .latest_encryption_state()
1056 .await
1057 .map(|state| state.is_encrypted())
1058 .ok(),
1059 joined_members_count: room.joined_members_count(),
1060 service_members,
1061 active_service_members_count,
1062 is_space: room.is_space(),
1063 is_noisy,
1064 has_mention,
1065 thread_id,
1066 actions: push_actions.map(|actions| actions.to_vec()),
1067 room_is_dm: room.compute_is_dm().await?,
1068 };
1069
1070 Ok(item)
1071 }
1072
1073 pub fn is_public(&self) -> Option<bool> {
1077 self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
1078 }
1079}
1080
1081#[derive(Debug, Error)]
1083pub enum Error {
1084 #[error(transparent)]
1085 BuildingLocalClient(ClientBuildError),
1086
1087 #[error("unknown room for a notification")]
1089 UnknownRoom,
1090
1091 #[error("invalid ruma event")]
1093 InvalidRumaEvent,
1094
1095 #[error("the sliding sync response doesn't include the target room")]
1098 SlidingSyncEmptyRoom,
1099
1100 #[error("the event was missing in the `/context` query")]
1101 ContextMissingEvent,
1102
1103 #[error(transparent)]
1105 SdkError(#[from] matrix_sdk::Error),
1106
1107 #[error(transparent)]
1109 StoreError(#[from] StoreError),
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114 use std::collections::BTreeMap;
1115
1116 use assert_matches2::assert_let;
1117 use matrix_sdk::test_utils::mocks::MatrixMockServer;
1118 use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
1119 use ruma::{
1120 api::client::sync::sync_events::v5,
1121 assign, event_id,
1122 events::room::{member::MembershipState, message::RedactedRoomMessageEventContent},
1123 owned_event_id, owned_room_id, room_id, user_id,
1124 };
1125
1126 use crate::notification_client::{
1127 NotificationClient, NotificationItem, NotificationItemsRequest, NotificationProcessSetup,
1128 NotificationStatus, RawNotificationEvent,
1129 };
1130
1131 #[async_test]
1132 async fn test_notification_item_returns_thread_id() {
1133 let server = MatrixMockServer::new().await;
1134 let client = server.client_builder().build().await;
1135
1136 let room_id = room_id!("!a:b.c");
1137 let thread_root_event_id = event_id!("$root:b.c");
1138 let message = EventFactory::new()
1139 .room(room_id)
1140 .sender(user_id!("@sender:b.c"))
1141 .text_msg("Threaded")
1142 .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1143 .into_raw_sync();
1144 let room = server.sync_joined_room(&client, room_id).await;
1145
1146 let raw_notification_event = RawNotificationEvent::Timeline(message);
1147 let notification_item =
1148 NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1149 .await
1150 .expect("Could not create notification item");
1151
1152 assert_let!(Some(thread_id) = notification_item.thread_id);
1153 assert_eq!(thread_id, thread_root_event_id);
1154 }
1155
1156 #[async_test]
1157 async fn test_try_sliding_sync_ignores_invites_for_non_subscribed_rooms() {
1158 let server = MatrixMockServer::new().await;
1159 let client = server.client_builder().build().await;
1160
1161 let user_id = client.user_id().unwrap();
1162 let room_id = room_id!("!a:b.c");
1163 let invite = EventFactory::new()
1164 .room(room_id)
1165 .member(user_id)
1166 .membership(MembershipState::Invite)
1167 .no_event_id()
1168 .into_raw_sync_state();
1169 let mut room = v5::response::Room::new();
1170 room.invite_state = Some(vec![invite.cast_unchecked()]);
1171 let rooms = BTreeMap::from_iter([(room_id.to_owned(), room)]);
1172 server
1173 .mock_sliding_sync()
1174 .ok(assign!(v5::Response::new("1".to_owned()), {
1175 rooms: rooms,
1176 }))
1177 .mount()
1178 .await;
1179
1180 let notification_client =
1181 NotificationClient::new(client.clone(), NotificationProcessSetup::MultipleProcesses)
1182 .await
1183 .expect("Could not create a notification client");
1184
1185 let event_id = owned_event_id!("$a:b.c");
1188 let result = notification_client
1189 .try_sliding_sync(&[NotificationItemsRequest {
1190 room_id: owned_room_id!("!other:b.c"),
1191 event_ids: vec![event_id.clone()],
1192 }])
1193 .await
1194 .expect("Could not run sliding sync");
1195
1196 assert!(result.is_empty());
1197
1198 let result = notification_client
1200 .try_sliding_sync(&[NotificationItemsRequest {
1201 room_id: room_id.to_owned(),
1202 event_ids: vec![event_id.clone()],
1203 }])
1204 .await
1205 .expect("Could not run sliding sync");
1206
1207 assert!(!result.is_empty());
1209
1210 let (in_room_id, event) = &result[&event_id];
1213 assert_eq!(room_id, in_room_id);
1214 assert_let!(Some(RawNotificationEvent::Invite(raw_invite)) = event);
1215
1216 let invite = raw_invite.deserialize().expect("Could not deserialize invite event");
1217 assert_eq!(invite.state_key, user_id.to_string());
1218 assert_eq!(invite.content.membership, MembershipState::Invite);
1219 }
1220
1221 #[async_test]
1222 async fn test_redacted_event_returns_event_redacted_status() {
1223 let server = MatrixMockServer::new().await;
1224 let client = server.client_builder().build().await;
1225
1226 let room_id = room_id!("!a:b.c");
1227
1228 let event_id = owned_event_id!("$redacted:b.c");
1230 let redacted_event = EventFactory::new()
1231 .room(room_id)
1232 .sender(user_id!("@sender:b.c"))
1233 .redacted(&ALICE, RedactedRoomMessageEventContent::new())
1234 .event_id(&event_id)
1235 .into_raw();
1236 let mut room = v5::response::Room::new();
1237 room.timeline = vec![redacted_event];
1238
1239 let mut rooms = BTreeMap::new();
1240 rooms.insert(room_id.to_owned(), room);
1241
1242 server
1243 .mock_sliding_sync()
1244 .ok(assign!(v5::Response::new("1".to_owned()), {
1245 rooms: rooms,
1246 }))
1247 .mount()
1248 .await;
1249
1250 let notification_client =
1251 NotificationClient::new(client.clone(), NotificationProcessSetup::MultipleProcesses)
1252 .await
1253 .expect("Could not create a notification client");
1254
1255 let result: NotificationStatus = notification_client
1256 .get_notification_with_sliding_sync(room_id, &event_id)
1257 .await
1258 .expect("Could not get notification");
1259
1260 match result {
1261 NotificationStatus::EventRedacted => {
1262 }
1264 other => panic!("Expected EventRedacted, got {:?}", other),
1265 }
1266 }
1267}