1use std::{
16 collections::BTreeMap,
17 ops::Deref,
18 sync::{Arc, Mutex},
19 time::Duration,
20};
21
22use futures_util::{StreamExt as _, pin_mut};
23use matrix_sdk::{
24 Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
25};
26use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
27use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
28use ruma::{
29 EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
30 api::client::sync::sync_events::v5 as http,
31 assign,
32 events::{
33 AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
34 AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
35 TimelineEventType,
36 room::{
37 encrypted::OriginalSyncRoomEncryptedEvent,
38 join_rules::JoinRule,
39 member::{MembershipState, StrippedRoomMemberEvent},
40 message::{Relation, SyncRoomMessageEvent},
41 },
42 },
43 html::RemoveReplyFallback,
44 push::Action,
45 serde::Raw,
46 uint,
47};
48use thiserror::Error;
49use tokio::sync::Mutex as AsyncMutex;
50use tracing::{debug, info, instrument, trace, warn};
51
52use crate::{
53 DEFAULT_SANITIZER_MODE,
54 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService},
55 sync_service::SyncService,
56};
57
58#[derive(Clone)]
60pub enum NotificationProcessSetup {
61 MultipleProcesses,
70
71 SingleProcess { sync_service: Arc<SyncService> },
79}
80
81pub struct NotificationClient {
87 client: Client,
89
90 parent_client: Client,
92
93 process_setup: NotificationProcessSetup,
95
96 notification_sync_mutex: AsyncMutex<()>,
104
105 encryption_sync_mutex: AsyncMutex<()>,
110}
111
112impl NotificationClient {
113 const CONNECTION_ID: &'static str = "notifications";
114 const LOCK_ID: &'static str = "notifications";
115
116 pub async fn new(
118 parent_client: Client,
119 process_setup: NotificationProcessSetup,
120 ) -> Result<Self, Error> {
121 let cross_process_store_config = match process_setup {
123 NotificationProcessSetup::MultipleProcesses => {
124 CrossProcessLockConfig::multi_process(Self::LOCK_ID)
125 }
126 NotificationProcessSetup::SingleProcess { .. } => CrossProcessLockConfig::SingleProcess,
127 };
128 let client = parent_client.notification_client(cross_process_store_config).await?;
129
130 Ok(NotificationClient {
131 client,
132 parent_client,
133 notification_sync_mutex: AsyncMutex::new(()),
134 encryption_sync_mutex: AsyncMutex::new(()),
135 process_setup,
136 })
137 }
138
139 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
143 self.client.get_room(room_id)
144 }
145
146 #[instrument(skip(self))]
155 pub async fn get_notification(
156 &self,
157 room_id: &RoomId,
158 event_id: &EventId,
159 ) -> Result<NotificationStatus, Error> {
160 let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
161 match status {
162 NotificationStatus::Event(..)
163 | NotificationStatus::EventFilteredOut
164 | NotificationStatus::EventRedacted => Ok(status),
165 NotificationStatus::EventNotFound => {
166 self.get_notification_with_context(room_id, event_id).await
167 }
168 }
169 }
170
171 pub async fn get_notifications(
185 &self,
186 requests: &[NotificationItemsRequest],
187 ) -> Result<BatchNotificationFetchingResult, Error> {
188 let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
189
190 for request in requests {
191 for event_id in &request.event_ids {
192 match notifications.get_mut(event_id) {
193 Some(Ok(NotificationStatus::EventNotFound)) | None => {
196 notifications.insert(
197 event_id.to_owned(),
198 self.get_notification_with_context(&request.room_id, event_id).await,
199 );
200 }
201
202 _ => {}
203 }
204 }
205 }
206
207 Ok(notifications)
208 }
209
210 #[instrument(skip_all)]
220 async fn retry_decryption(
221 &self,
222 room: &Room,
223 raw_event: &Raw<AnySyncTimelineEvent>,
224 ) -> Result<Option<TimelineEvent>, Error> {
225 let event: AnySyncTimelineEvent =
226 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
227
228 if !is_event_encrypted(event.event_type()) {
229 return Ok(None);
230 }
231
232 let _guard = self.encryption_sync_mutex.lock().await;
234
235 let push_ctx = room.push_context().await?;
246 let sync_permit_guard = match &self.process_setup {
247 NotificationProcessSetup::MultipleProcesses => {
248 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
252 sync_permit.lock_owned().await
253 }
254
255 NotificationProcessSetup::SingleProcess { sync_service } => {
256 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
257 permit_guard
258 } else {
259 let mut wait = 200;
268
269 debug!("Encryption sync running in background");
270 for _ in 0..3 {
271 trace!("waiting for decryption…");
272
273 sleep(Duration::from_millis(wait)).await;
274
275 let new_event = room
279 .decrypt_event(
280 raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
281 push_ctx.as_ref(),
282 )
283 .await?;
284
285 match new_event.kind {
286 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
287 utd_info, ..} => {
288 if utd_info.reason.is_missing_room_key() {
289 wait *= 2;
292 } else {
293 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
294 return Ok(None);
295 }
296 }
297 _ => {
298 trace!("Waiting succeeded and event could be decrypted!");
299 return Ok(Some(new_event));
300 }
301 }
302 }
303
304 debug!("Timeout waiting for the encryption sync to decrypt notification.");
306 return Ok(None);
307 }
308 }
309 };
310
311 let encryption_sync = EncryptionSyncService::new(
312 self.client.clone(),
313 Some((Duration::from_secs(3), Duration::from_secs(4))),
314 )
315 .await;
316
317 match encryption_sync {
322 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
323 Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
327 Ok(new_event) => match new_event.kind {
328 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
329 utd_info, ..
330 } => {
331 trace!(
332 "Encryption sync failed to decrypt the event: {:?}",
333 utd_info.reason
334 );
335 Ok(None)
336 }
337 _ => {
338 trace!("Encryption sync managed to decrypt the event.");
339 Ok(Some(new_event))
340 }
341 },
342 Err(err) => {
343 trace!("Encryption sync failed to decrypt the event: {err}");
344 Ok(None)
345 }
346 },
347 Err(err) => {
348 warn!("Encryption sync error: {err:#}");
349 Ok(None)
350 }
351 },
352 Err(err) => {
353 warn!("Encryption sync build error: {err:#}",);
354 Ok(None)
355 }
356 }
357 }
358
359 #[instrument(skip_all)]
378 async fn try_sliding_sync(
379 &self,
380 requests: &[NotificationItemsRequest],
381 ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
382 const MAX_SLIDING_SYNC_ATTEMPTS: u64 = 3;
383 let _guard = self.notification_sync_mutex.lock().await;
386
387 let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
392 let handler_raw_notification = raw_notifications.clone();
393
394 let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
395 let handler_raw_invites = raw_invites.clone();
396
397 let user_id = self.client.user_id().unwrap().to_owned();
398 let room_ids = requests.iter().map(|req| req.room_id.clone()).collect::<Vec<_>>();
399
400 let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
401
402 let timeline_event_handler = self.client.add_event_handler({
403 let requests = requests.clone();
404 move |raw: Raw<AnySyncTimelineEvent>| async move {
405 match &raw.get_field::<OwnedEventId>("event_id") {
406 Ok(Some(event_id)) => {
407 let Some(request) =
408 &requests.iter().find(|request| request.event_ids.contains(event_id))
409 else {
410 return;
411 };
412
413 let room_id = request.room_id.clone();
414
415 handler_raw_notification.lock().unwrap().insert(
419 event_id.to_owned(),
420 (room_id, Some(RawNotificationEvent::Timeline(raw))),
421 );
422 }
423 Ok(None) => {
424 warn!("a sync event had no event id");
425 }
426 Err(err) => {
427 warn!("failed to deserialize sync event id: {err}");
428 }
429 }
430 }
431 });
432
433 let handler_raw_notifications = raw_notifications.clone();
434 let stripped_member_handler = self.client.add_event_handler({
435 let requests = requests.clone();
436 let room_ids: Vec<_> = room_ids.clone();
437 move |raw: Raw<StrippedRoomMemberEvent>, room: Room| async move {
438 if !room_ids.contains(&room.room_id().to_owned()) {
439 return;
440 }
441
442 let deserialized = match raw.deserialize() {
443 Ok(d) => d,
444 Err(err) => {
445 warn!("failed to deserialize raw stripped room member event: {err}");
446 return;
447 }
448 };
449
450 trace!("received a stripped room member event");
451
452 match &raw.get_field::<OwnedEventId>("event_id") {
455 Ok(Some(event_id)) => {
456 let request =
457 &requests.iter().find(|request| request.event_ids.contains(event_id));
458 if request.is_none() {
459 return;
460 }
461 let room_id = request.unwrap().room_id.clone();
462
463 handler_raw_notifications.lock().unwrap().insert(
467 event_id.to_owned(),
468 (room_id, Some(RawNotificationEvent::Invite(raw))),
469 );
470 return;
471 }
472 Ok(None) => {
473 warn!("a room member event had no id");
474 }
475 Err(err) => {
476 warn!("failed to deserialize room member event id: {err}");
477 }
478 }
479
480 if deserialized.content.membership == MembershipState::Invite
482 && deserialized.state_key == user_id
483 {
484 trace!("found an invite event for the current user");
485 handler_raw_invites
489 .lock()
490 .unwrap()
491 .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
492 } else {
493 trace!("not an invite event, or not for the current user");
494 }
495 }
496 });
497
498 let required_state = vec![
500 (StateEventType::RoomEncryption, "".to_owned()),
501 (StateEventType::RoomMember, "$LAZY".to_owned()),
502 (StateEventType::RoomMember, "$ME".to_owned()),
503 (StateEventType::RoomCanonicalAlias, "".to_owned()),
504 (StateEventType::RoomName, "".to_owned()),
505 (StateEventType::RoomAvatar, "".to_owned()),
506 (StateEventType::RoomPowerLevels, "".to_owned()),
507 (StateEventType::RoomJoinRules, "".to_owned()),
508 (StateEventType::CallMember, "*".to_owned()),
509 (StateEventType::RoomCreate, "".to_owned()),
510 (StateEventType::MemberHints, "".to_owned()),
511 ];
512
513 let invites = SlidingSyncList::builder("invites")
514 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
515 .timeline_limit(8)
516 .required_state(required_state.clone())
517 .filters(Some(assign!(http::request::ListFilters::default(), {
518 is_invite: Some(true),
519 })));
520
521 let sync = self
522 .client
523 .sliding_sync(Self::CONNECTION_ID)?
524 .poll_timeout(Duration::from_secs(1))
525 .network_timeout(Duration::from_secs(3))
526 .with_account_data_extension(
527 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
528 )
529 .add_list(invites)
530 .build()
531 .await?;
532
533 sync.subscribe_to_rooms(
534 &room_ids.iter().map(|id| id.deref()).collect::<Vec<&RoomId>>(),
535 Some(assign!(http::request::RoomSubscription::default(), {
536 required_state,
537 timeline_limit: uint!(16)
538 })),
539 true,
540 );
541
542 let mut remaining_attempts = MAX_SLIDING_SYNC_ATTEMPTS;
543
544 let stream = sync.sync();
545 pin_mut!(stream);
546
547 let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
549
550 loop {
551 if stream.next().await.is_none() {
552 break;
554 }
555
556 let event_count = raw_notifications.lock().unwrap().len();
557 let invite_count = raw_invites.lock().unwrap().len();
558
559 let current_attempt = 1 + MAX_SLIDING_SYNC_ATTEMPTS - remaining_attempts;
560 trace!(
561 "Attempt #{current_attempt}: \
562 Found {event_count} notification(s), \
563 {invite_count} invite event(s), \
564 expected {expected_event_count} total",
565 );
566
567 if event_count + invite_count == expected_event_count {
572 break;
574 }
575
576 remaining_attempts -= 1;
577 warn!("There are some missing notifications, remaining attempts: {remaining_attempts}");
578 if remaining_attempts == 0 {
579 break;
581 }
582 }
583
584 self.client.remove_event_handler(stripped_member_handler);
585 self.client.remove_event_handler(timeline_event_handler);
586
587 let mut notifications = raw_notifications.clone().lock().unwrap().clone();
588 let mut missing_event_ids = Vec::new();
589
590 for request in requests.iter() {
592 for event_id in &request.event_ids {
593 if !notifications.contains_key(event_id) {
594 missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
595 }
596 }
597 }
598
599 for (room_id, missing_event_id) in missing_event_ids {
601 trace!("we didn't have a non-invite event, looking for invited room now");
602 if let Some(room) = self.client.get_room(&room_id) {
603 if room.state() == RoomState::Invited {
604 if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
605 notifications.insert(
606 missing_event_id.to_owned(),
607 (room_id.to_owned(), stripped_event),
608 );
609 }
610 } else {
611 debug!("the room isn't in the invited state");
612 }
613 } else {
614 warn!(%room_id, "unknown room, can't check for invite events");
615 }
616 }
617
618 let found = if notifications.len() == expected_event_count { "" } else { "not " };
619 trace!("all notification events have{found} been found");
620
621 Ok(notifications)
622 }
623
624 pub async fn get_notification_with_sliding_sync(
625 &self,
626 room_id: &RoomId,
627 event_id: &EventId,
628 ) -> Result<NotificationStatus, Error> {
629 info!("fetching notification event with a sliding sync");
630
631 let request = NotificationItemsRequest {
632 room_id: room_id.to_owned(),
633 event_ids: vec![event_id.to_owned()],
634 };
635
636 let mut get_notifications_result =
637 self.get_notifications_with_sliding_sync(&[request]).await?;
638
639 get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
640 }
641
642 async fn compute_status(
647 &self,
648 room: &Room,
649 push_actions: Option<&[Action]>,
650 raw_event: RawNotificationEvent,
651 state_events: Vec<Raw<AnyStateEvent>>,
652 ) -> Result<NotificationStatus, Error> {
653 if let Some(actions) = push_actions
654 && !actions.iter().any(|a| a.should_notify())
655 {
656 return Ok(NotificationStatus::EventFilteredOut);
658 }
659
660 let notification_item =
661 NotificationItem::new(room, raw_event, push_actions, state_events).await?;
662
663 if self.client.is_user_ignored(notification_item.event.sender()).await {
664 Ok(NotificationStatus::EventFilteredOut)
665 } else {
666 Ok(NotificationStatus::Event(Box::new(notification_item)))
667 }
668 }
669
670 pub async fn get_notifications_with_sliding_sync(
675 &self,
676 requests: &[NotificationItemsRequest],
677 ) -> Result<BatchNotificationFetchingResult, Error> {
678 let raw_events = self.try_sliding_sync(requests).await?;
679
680 let mut batch_result = BatchNotificationFetchingResult::new();
681
682 for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
683 let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
685
686 let Some(raw_event) = raw_event else {
687 batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
689 continue;
690 };
691
692 let (raw_event, push_actions) = match &raw_event {
693 RawNotificationEvent::Timeline(timeline_event) => {
694 let event_for_redaction_check: AnySyncTimelineEvent =
696 match timeline_event.deserialize() {
697 Ok(event) => event,
698 Err(_) => {
699 batch_result.insert(event_id, Err(Error::InvalidRumaEvent));
700 continue;
701 }
702 };
703
704 if is_event_redacted(&event_for_redaction_check) {
705 batch_result.insert(event_id, Ok(NotificationStatus::EventRedacted));
706 continue;
707 }
708
709 match self.retry_decryption(&room, timeline_event).await {
711 Ok(Some(timeline_event)) => {
712 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
713 (
714 RawNotificationEvent::Timeline(timeline_event.into_raw()),
715 push_actions,
716 )
717 }
718
719 Ok(None) => {
720 match room.event_push_actions(timeline_event).await {
723 Ok(push_actions) => (raw_event.clone(), push_actions),
724 Err(err) => {
725 batch_result.insert(event_id, Err(err.into()));
727 continue;
728 }
729 }
730 }
731
732 Err(err) => {
733 batch_result.insert(event_id, Err(err));
734 continue;
735 }
736 }
737 }
738
739 RawNotificationEvent::Invite(invite_event) => {
740 match room.event_push_actions(invite_event).await {
742 Ok(push_actions) => {
743 (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
744 }
745 Err(err) => {
746 batch_result.insert(event_id, Err(err.into()));
747 continue;
748 }
749 }
750 }
751 };
752
753 let notification_status_result =
754 self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
755
756 batch_result.insert(event_id, notification_status_result);
757 }
758
759 Ok(batch_result)
760 }
761
762 pub async fn get_notification_with_context(
775 &self,
776 room_id: &RoomId,
777 event_id: &EventId,
778 ) -> Result<NotificationStatus, Error> {
779 info!("fetching notification event with a /context query");
780
781 let Some(room) = self.parent_client.get_room(room_id) else {
783 return Err(Error::UnknownRoom);
784 };
785
786 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
787
788 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
789 let state_events = response.state;
790
791 let event_for_redaction_check: AnySyncTimelineEvent =
793 timeline_event.raw().deserialize().map_err(|_| Error::InvalidRumaEvent)?;
794
795 if is_event_redacted(&event_for_redaction_check) {
796 return Ok(NotificationStatus::EventRedacted);
797 }
798
799 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
800 timeline_event = decrypted_event;
801 }
802
803 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
804
805 self.compute_status(
806 &room,
807 push_actions.as_deref(),
808 RawNotificationEvent::Timeline(timeline_event.into_raw()),
809 state_events,
810 )
811 .await
812 }
813}
814
815fn is_event_encrypted(event_type: TimelineEventType) -> bool {
816 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
817
818 #[cfg(feature = "unstable-msc3956")]
819 let is_still_encrypted =
820 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
821
822 is_still_encrypted
823}
824
825fn is_event_redacted(event: &AnySyncTimelineEvent) -> bool {
826 match event {
829 AnySyncTimelineEvent::MessageLike(msg) => msg.is_redacted(),
830 _ => false,
831 }
832}
833
834#[derive(Debug)]
835pub enum NotificationStatus {
836 Event(Box<NotificationItem>),
838 EventNotFound,
840 EventFilteredOut,
844 EventRedacted,
846}
847
848#[derive(Debug, Clone)]
849pub struct NotificationItemsRequest {
850 pub room_id: OwnedRoomId,
851 pub event_ids: Vec<OwnedEventId>,
852}
853
854type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
855
856#[derive(Debug, Clone)]
861pub enum RawNotificationEvent {
862 Timeline(Raw<AnySyncTimelineEvent>),
864 Invite(Raw<StrippedRoomMemberEvent>),
867}
868
869#[derive(Debug)]
872pub enum NotificationEvent {
873 Timeline(Box<AnySyncTimelineEvent>),
875 Invite(Box<StrippedRoomMemberEvent>),
877}
878
879impl NotificationEvent {
880 pub fn sender(&self) -> &UserId {
881 match self {
882 NotificationEvent::Timeline(ev) => ev.sender(),
883 NotificationEvent::Invite(ev) => &ev.sender,
884 }
885 }
886
887 fn thread_id(&self) -> Option<OwnedEventId> {
890 let NotificationEvent::Timeline(sync_timeline_event) = &self else {
891 return None;
892 };
893 let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
894 return None;
895 };
896 let content = event.original_content()?;
897 match content {
898 AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
899 Relation::Thread(thread) => Some(thread.event_id),
900 _ => None,
901 },
902 _ => None,
903 }
904 }
905}
906
907#[derive(Debug)]
909pub struct NotificationItem {
910 pub event: NotificationEvent,
912
913 pub raw_event: RawNotificationEvent,
915
916 pub sender_display_name: Option<String>,
918 pub sender_avatar_url: Option<String>,
920 pub is_sender_name_ambiguous: bool,
922
923 pub room_computed_display_name: String,
925 pub room_avatar_url: Option<String>,
927 pub room_canonical_alias: Option<String>,
929 pub room_topic: Option<String>,
931 pub room_join_rule: Option<JoinRule>,
935 pub is_room_encrypted: Option<bool>,
937 pub is_direct_message_room: bool,
939 pub joined_members_count: u64,
941 pub is_space: bool,
943
944 pub is_noisy: Option<bool>,
949 pub has_mention: Option<bool>,
950 pub thread_id: Option<OwnedEventId>,
951
952 pub actions: Option<Vec<Action>>,
954}
955
956impl NotificationItem {
957 async fn new(
958 room: &Room,
959 raw_event: RawNotificationEvent,
960 push_actions: Option<&[Action]>,
961 state_events: Vec<Raw<AnyStateEvent>>,
962 ) -> Result<Self, Error> {
963 let event = match &raw_event {
964 RawNotificationEvent::Timeline(raw_event) => {
965 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
966 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
967 SyncRoomMessageEvent::Original(ev),
968 )) = &mut event
969 {
970 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
971 }
972 NotificationEvent::Timeline(Box::new(event))
973 }
974 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
975 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
976 )),
977 };
978
979 let sender = match room.state() {
980 RoomState::Invited => room.invite_details().await?.inviter,
981 _ => room.get_member_no_sync(event.sender()).await?,
982 };
983
984 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
985 match &sender {
986 Some(sender) => (
987 sender.display_name().map(|s| s.to_owned()),
988 sender.avatar_url().map(|s| s.to_string()),
989 sender.name_ambiguous(),
990 ),
991 None => (None, None, false),
992 };
993
994 if sender_display_name.is_none() || sender_avatar_url.is_none() {
995 let sender_id = event.sender();
996 for ev in state_events {
997 let ev = match ev.deserialize() {
998 Ok(ev) => ev,
999 Err(err) => {
1000 warn!("Failed to deserialize a state event: {err}");
1001 continue;
1002 }
1003 };
1004 if ev.sender() != sender_id {
1005 continue;
1006 }
1007 if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
1008 content,
1009 ..
1010 }) = ev.content()
1011 {
1012 if sender_display_name.is_none() {
1013 sender_display_name = content.displayname;
1014 }
1015 if sender_avatar_url.is_none() {
1016 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
1017 }
1018 }
1019 }
1020 }
1021
1022 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
1023 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
1024 let thread_id = event.thread_id().clone();
1025
1026 let item = NotificationItem {
1027 event,
1028 raw_event,
1029 sender_display_name,
1030 sender_avatar_url,
1031 is_sender_name_ambiguous,
1032 room_computed_display_name: room.display_name().await?.to_string(),
1033 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
1034 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
1035 room_topic: room.topic(),
1036 room_join_rule: room.join_rule(),
1037 is_direct_message_room: room.is_direct().await?,
1038 is_room_encrypted: room
1039 .latest_encryption_state()
1040 .await
1041 .map(|state| state.is_encrypted())
1042 .ok(),
1043 joined_members_count: room.joined_members_count(),
1044 is_space: room.is_space(),
1045 is_noisy,
1046 has_mention,
1047 thread_id,
1048 actions: push_actions.map(|actions| actions.to_vec()),
1049 };
1050
1051 Ok(item)
1052 }
1053
1054 pub fn is_public(&self) -> Option<bool> {
1058 self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
1059 }
1060}
1061
1062#[derive(Debug, Error)]
1064pub enum Error {
1065 #[error(transparent)]
1066 BuildingLocalClient(ClientBuildError),
1067
1068 #[error("unknown room for a notification")]
1070 UnknownRoom,
1071
1072 #[error("invalid ruma event")]
1074 InvalidRumaEvent,
1075
1076 #[error("the sliding sync response doesn't include the target room")]
1079 SlidingSyncEmptyRoom,
1080
1081 #[error("the event was missing in the `/context` query")]
1082 ContextMissingEvent,
1083
1084 #[error(transparent)]
1086 SdkError(#[from] matrix_sdk::Error),
1087
1088 #[error(transparent)]
1090 StoreError(#[from] StoreError),
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095 use std::collections::BTreeMap;
1096
1097 use assert_matches2::assert_let;
1098 use matrix_sdk::test_utils::mocks::MatrixMockServer;
1099 use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
1100 use ruma::{
1101 api::client::sync::sync_events::v5,
1102 assign, event_id,
1103 events::room::{member::MembershipState, message::RedactedRoomMessageEventContent},
1104 owned_event_id, owned_room_id, room_id, user_id,
1105 };
1106
1107 use crate::notification_client::{
1108 NotificationClient, NotificationItem, NotificationItemsRequest, NotificationProcessSetup,
1109 NotificationStatus, RawNotificationEvent,
1110 };
1111
1112 #[async_test]
1113 async fn test_notification_item_returns_thread_id() {
1114 let server = MatrixMockServer::new().await;
1115 let client = server.client_builder().build().await;
1116
1117 let room_id = room_id!("!a:b.c");
1118 let thread_root_event_id = event_id!("$root:b.c");
1119 let message = EventFactory::new()
1120 .room(room_id)
1121 .sender(user_id!("@sender:b.c"))
1122 .text_msg("Threaded")
1123 .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1124 .into_raw_sync();
1125 let room = server.sync_joined_room(&client, room_id).await;
1126
1127 let raw_notification_event = RawNotificationEvent::Timeline(message);
1128 let notification_item =
1129 NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1130 .await
1131 .expect("Could not create notification item");
1132
1133 assert_let!(Some(thread_id) = notification_item.thread_id);
1134 assert_eq!(thread_id, thread_root_event_id);
1135 }
1136
1137 #[async_test]
1138 async fn test_try_sliding_sync_ignores_invites_for_non_subscribed_rooms() {
1139 let server = MatrixMockServer::new().await;
1140 let client = server.client_builder().build().await;
1141
1142 let user_id = client.user_id().unwrap();
1143 let room_id = room_id!("!a:b.c");
1144 let invite = EventFactory::new()
1145 .room(room_id)
1146 .member(user_id)
1147 .membership(MembershipState::Invite)
1148 .no_event_id()
1149 .into_raw_sync_state();
1150 let mut room = v5::response::Room::new();
1151 room.invite_state = Some(vec![invite.cast_unchecked()]);
1152 let rooms = BTreeMap::from_iter([(room_id.to_owned(), room)]);
1153 server
1154 .mock_sliding_sync()
1155 .ok(assign!(v5::Response::new("1".to_owned()), {
1156 rooms: rooms,
1157 }))
1158 .mount()
1159 .await;
1160
1161 let notification_client =
1162 NotificationClient::new(client.clone(), NotificationProcessSetup::MultipleProcesses)
1163 .await
1164 .expect("Could not create a notification client");
1165
1166 let event_id = owned_event_id!("$a:b.c");
1169 let result = notification_client
1170 .try_sliding_sync(&[NotificationItemsRequest {
1171 room_id: owned_room_id!("!other:b.c"),
1172 event_ids: vec![event_id.clone()],
1173 }])
1174 .await
1175 .expect("Could not run sliding sync");
1176
1177 assert!(result.is_empty());
1178
1179 let result = notification_client
1181 .try_sliding_sync(&[NotificationItemsRequest {
1182 room_id: room_id.to_owned(),
1183 event_ids: vec![event_id.clone()],
1184 }])
1185 .await
1186 .expect("Could not run sliding sync");
1187
1188 assert!(!result.is_empty());
1190
1191 let (in_room_id, event) = &result[&event_id];
1194 assert_eq!(room_id, in_room_id);
1195 assert_let!(Some(RawNotificationEvent::Invite(raw_invite)) = event);
1196
1197 let invite = raw_invite.deserialize().expect("Could not deserialize invite event");
1198 assert_eq!(invite.state_key, user_id.to_string());
1199 assert_eq!(invite.content.membership, MembershipState::Invite);
1200 }
1201
1202 #[async_test]
1203 async fn test_redacted_event_returns_event_redacted_status() {
1204 let server = MatrixMockServer::new().await;
1205 let client = server.client_builder().build().await;
1206
1207 let room_id = room_id!("!a:b.c");
1208
1209 let event_id = owned_event_id!("$redacted:b.c");
1211 let redacted_event = EventFactory::new()
1212 .room(room_id)
1213 .sender(user_id!("@sender:b.c"))
1214 .redacted(&ALICE, RedactedRoomMessageEventContent::new())
1215 .event_id(&event_id)
1216 .into_raw();
1217 let mut room = v5::response::Room::new();
1218 room.timeline = vec![redacted_event];
1219
1220 let mut rooms = BTreeMap::new();
1221 rooms.insert(room_id.to_owned(), room);
1222
1223 server
1224 .mock_sliding_sync()
1225 .ok(assign!(v5::Response::new("1".to_owned()), {
1226 rooms: rooms,
1227 }))
1228 .mount()
1229 .await;
1230
1231 let notification_client =
1232 NotificationClient::new(client.clone(), NotificationProcessSetup::MultipleProcesses)
1233 .await
1234 .expect("Could not create a notification client");
1235
1236 let result: NotificationStatus = notification_client
1237 .get_notification_with_sliding_sync(room_id, &event_id)
1238 .await
1239 .expect("Could not get notification");
1240
1241 match result {
1242 NotificationStatus::EventRedacted => {
1243 }
1245 other => panic!("Expected EventRedacted, got {:?}", other),
1246 }
1247 }
1248}