1use std::{
16 collections::BTreeMap,
17 sync::{Arc, Mutex},
18 time::Duration,
19};
20
21use futures_util::{StreamExt as _, pin_mut};
22use matrix_sdk::{
23 Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
24};
25use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
26use ruma::{
27 EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
28 api::client::sync::sync_events::v5 as http,
29 assign,
30 events::{
31 AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
32 AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
33 TimelineEventType,
34 room::{
35 encrypted::OriginalSyncRoomEncryptedEvent,
36 join_rules::JoinRule,
37 member::{MembershipState, StrippedRoomMemberEvent},
38 message::{Relation, SyncRoomMessageEvent},
39 },
40 },
41 html::RemoveReplyFallback,
42 push::Action,
43 serde::Raw,
44 uint,
45};
46use thiserror::Error;
47use tokio::sync::Mutex as AsyncMutex;
48use tracing::{debug, info, instrument, trace, warn};
49
50use crate::{
51 DEFAULT_SANITIZER_MODE,
52 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
53 sync_service::SyncService,
54};
55
56#[derive(Clone)]
58pub enum NotificationProcessSetup {
59 MultipleProcesses,
68
69 SingleProcess { sync_service: Arc<SyncService> },
77}
78
79pub struct NotificationClient {
85 client: Client,
87
88 parent_client: Client,
90
91 process_setup: NotificationProcessSetup,
93
94 notification_sync_mutex: AsyncMutex<()>,
102
103 encryption_sync_mutex: AsyncMutex<()>,
108}
109
110impl NotificationClient {
111 const CONNECTION_ID: &'static str = "notifications";
112 const LOCK_ID: &'static str = "notifications";
113
114 pub async fn new(
116 parent_client: Client,
117 process_setup: NotificationProcessSetup,
118 ) -> Result<Self, Error> {
119 let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
120
121 Ok(NotificationClient {
122 client,
123 parent_client,
124 notification_sync_mutex: AsyncMutex::new(()),
125 encryption_sync_mutex: AsyncMutex::new(()),
126 process_setup,
127 })
128 }
129
130 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
134 self.client.get_room(room_id)
135 }
136
137 #[instrument(skip(self))]
146 pub async fn get_notification(
147 &self,
148 room_id: &RoomId,
149 event_id: &EventId,
150 ) -> Result<NotificationStatus, Error> {
151 let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
152 match status {
153 NotificationStatus::Event(..) | NotificationStatus::EventFilteredOut => Ok(status),
154 NotificationStatus::EventNotFound => {
155 self.get_notification_with_context(room_id, event_id).await
156 }
157 }
158 }
159
160 pub async fn get_notifications(
174 &self,
175 requests: &[NotificationItemsRequest],
176 ) -> Result<BatchNotificationFetchingResult, Error> {
177 let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
178
179 for request in requests {
180 for event_id in &request.event_ids {
181 match notifications.get_mut(event_id) {
182 Some(Ok(NotificationStatus::EventNotFound)) | None => {
185 notifications.insert(
186 event_id.to_owned(),
187 self.get_notification_with_context(&request.room_id, event_id).await,
188 );
189 }
190
191 _ => {}
192 }
193 }
194 }
195
196 Ok(notifications)
197 }
198
199 #[instrument(skip_all)]
209 async fn retry_decryption(
210 &self,
211 room: &Room,
212 raw_event: &Raw<AnySyncTimelineEvent>,
213 ) -> Result<Option<TimelineEvent>, Error> {
214 let event: AnySyncTimelineEvent =
215 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
216
217 if !is_event_encrypted(event.event_type()) {
218 return Ok(None);
219 }
220
221 let _guard = self.encryption_sync_mutex.lock().await;
223
224 let with_locking = WithLocking::from(matches!(
235 self.process_setup,
236 NotificationProcessSetup::MultipleProcesses
237 ));
238
239 let push_ctx = room.push_context().await?;
240 let sync_permit_guard = match &self.process_setup {
241 NotificationProcessSetup::MultipleProcesses => {
242 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
246 sync_permit.lock_owned().await
247 }
248
249 NotificationProcessSetup::SingleProcess { sync_service } => {
250 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
251 permit_guard
252 } else {
253 let mut wait = 200;
262
263 debug!("Encryption sync running in background");
264 for _ in 0..3 {
265 trace!("waiting for decryption…");
266
267 sleep(Duration::from_millis(wait)).await;
268
269 let new_event = room
273 .decrypt_event(
274 raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
275 push_ctx.as_ref(),
276 )
277 .await?;
278
279 match new_event.kind {
280 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
281 utd_info, ..} => {
282 if utd_info.reason.is_missing_room_key() {
283 wait *= 2;
286 } else {
287 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
288 return Ok(None);
289 }
290 }
291 _ => {
292 trace!("Waiting succeeded and event could be decrypted!");
293 return Ok(Some(new_event));
294 }
295 }
296 }
297
298 debug!("Timeout waiting for the encryption sync to decrypt notification.");
300 return Ok(None);
301 }
302 }
303 };
304
305 let encryption_sync = EncryptionSyncService::new(
306 self.client.clone(),
307 Some((Duration::from_secs(3), Duration::from_secs(4))),
308 with_locking,
309 )
310 .await;
311
312 match encryption_sync {
317 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
318 Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
322 Ok(new_event) => match new_event.kind {
323 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
324 utd_info, ..
325 } => {
326 trace!(
327 "Encryption sync failed to decrypt the event: {:?}",
328 utd_info.reason
329 );
330 Ok(None)
331 }
332 _ => {
333 trace!("Encryption sync managed to decrypt the event.");
334 Ok(Some(new_event))
335 }
336 },
337 Err(err) => {
338 trace!("Encryption sync failed to decrypt the event: {err}");
339 Ok(None)
340 }
341 },
342 Err(err) => {
343 warn!("Encryption sync error: {err:#}");
344 Ok(None)
345 }
346 },
347 Err(err) => {
348 warn!("Encryption sync build error: {err:#}",);
349 Ok(None)
350 }
351 }
352 }
353
354 #[instrument(skip_all)]
373 async fn try_sliding_sync(
374 &self,
375 requests: &[NotificationItemsRequest],
376 ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
377 let _guard = self.notification_sync_mutex.lock().await;
380
381 let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
386
387 let handler_raw_notification = raw_notifications.clone();
388
389 let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
390
391 let timeline_event_handler = self.client.add_event_handler({
392 let requests = requests.clone();
393 move |raw: Raw<AnySyncTimelineEvent>| async move {
394 match &raw.get_field::<OwnedEventId>("event_id") {
395 Ok(Some(event_id)) => {
396 let Some(request) =
397 &requests.iter().find(|request| request.event_ids.contains(event_id))
398 else {
399 return;
400 };
401
402 let room_id = request.room_id.clone();
403
404 handler_raw_notification.lock().unwrap().insert(
408 event_id.to_owned(),
409 (room_id, Some(RawNotificationEvent::Timeline(raw))),
410 );
411 }
412 Ok(None) => {
413 warn!("a sync event had no event id");
414 }
415 Err(err) => {
416 warn!("failed to deserialize sync event id: {err}");
417 }
418 }
419 }
420 });
421
422 let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
424
425 let user_id = self.client.user_id().unwrap().to_owned();
426 let handler_raw_invites = raw_invites.clone();
427 let handler_raw_notifications = raw_notifications.clone();
428 let stripped_member_handler = self.client.add_event_handler({
429 let requests = requests.clone();
430 move |raw: Raw<StrippedRoomMemberEvent>| async move {
431 let deserialized = match raw.deserialize() {
432 Ok(d) => d,
433 Err(err) => {
434 warn!("failed to deserialize raw stripped room member event: {err}");
435 return;
436 }
437 };
438
439 trace!("received a stripped room member event");
440
441 match &raw.get_field::<OwnedEventId>("event_id") {
444 Ok(Some(event_id)) => {
445 let request =
446 &requests.iter().find(|request| request.event_ids.contains(event_id));
447 if request.is_none() {
448 return;
449 }
450 let room_id = request.unwrap().room_id.clone();
451
452 handler_raw_notifications.lock().unwrap().insert(
456 event_id.to_owned(),
457 (room_id, Some(RawNotificationEvent::Invite(raw))),
458 );
459 return;
460 }
461 Ok(None) => {
462 warn!("a room member event had no id");
463 }
464 Err(err) => {
465 warn!("failed to deserialize room member event id: {err}");
466 }
467 }
468
469 if deserialized.content.membership == MembershipState::Invite
471 && deserialized.state_key == user_id
472 {
473 trace!("found an invite event for the current user");
474 handler_raw_invites
478 .lock()
479 .unwrap()
480 .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
481 } else {
482 trace!("not an invite event, or not for the current user");
483 }
484 }
485 });
486
487 let required_state = vec![
489 (StateEventType::RoomEncryption, "".to_owned()),
490 (StateEventType::RoomMember, "$LAZY".to_owned()),
491 (StateEventType::RoomMember, "$ME".to_owned()),
492 (StateEventType::RoomCanonicalAlias, "".to_owned()),
493 (StateEventType::RoomName, "".to_owned()),
494 (StateEventType::RoomAvatar, "".to_owned()),
495 (StateEventType::RoomPowerLevels, "".to_owned()),
496 (StateEventType::RoomJoinRules, "".to_owned()),
497 (StateEventType::CallMember, "*".to_owned()),
498 (StateEventType::RoomCreate, "".to_owned()),
499 (StateEventType::MemberHints, "".to_owned()),
500 ];
501
502 let invites = SlidingSyncList::builder("invites")
503 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
504 .timeline_limit(8)
505 .required_state(required_state.clone())
506 .filters(Some(assign!(http::request::ListFilters::default(), {
507 is_invite: Some(true),
508 })));
509
510 let sync = self
511 .client
512 .sliding_sync(Self::CONNECTION_ID)?
513 .poll_timeout(Duration::from_secs(1))
514 .network_timeout(Duration::from_secs(3))
515 .with_account_data_extension(
516 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
517 )
518 .add_list(invites)
519 .build()
520 .await?;
521
522 let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
523 sync.subscribe_to_rooms(
524 &room_ids,
525 Some(assign!(http::request::RoomSubscription::default(), {
526 required_state,
527 timeline_limit: uint!(16)
528 })),
529 true,
530 );
531
532 let mut remaining_attempts = 3;
533
534 let stream = sync.sync();
535 pin_mut!(stream);
536
537 let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
539
540 loop {
541 if stream.next().await.is_none() {
542 break;
544 }
545
546 if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
547 == expected_event_count
548 {
549 break;
551 }
552
553 remaining_attempts -= 1;
554 if remaining_attempts == 0 {
555 break;
557 }
558 }
559
560 self.client.remove_event_handler(stripped_member_handler);
561 self.client.remove_event_handler(timeline_event_handler);
562
563 let mut notifications = raw_notifications.clone().lock().unwrap().clone();
564 let mut missing_event_ids = Vec::new();
565
566 for request in requests.iter() {
568 for event_id in &request.event_ids {
569 if !notifications.contains_key(event_id) {
570 missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
571 }
572 }
573 }
574
575 for (room_id, missing_event_id) in missing_event_ids {
577 trace!("we didn't have a non-invite event, looking for invited room now");
578 if let Some(room) = self.client.get_room(&room_id) {
579 if room.state() == RoomState::Invited {
580 if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
581 notifications.insert(
582 missing_event_id.to_owned(),
583 (room_id.to_owned(), stripped_event),
584 );
585 }
586 } else {
587 debug!("the room isn't in the invited state");
588 }
589 } else {
590 warn!(%room_id, "unknown room, can't check for invite events");
591 }
592 }
593
594 let found = if notifications.len() == expected_event_count { "" } else { "not " };
595 trace!("all notification events have{found} been found");
596
597 Ok(notifications)
598 }
599
600 pub async fn get_notification_with_sliding_sync(
601 &self,
602 room_id: &RoomId,
603 event_id: &EventId,
604 ) -> Result<NotificationStatus, Error> {
605 info!("fetching notification event with a sliding sync");
606
607 let request = NotificationItemsRequest {
608 room_id: room_id.to_owned(),
609 event_ids: vec![event_id.to_owned()],
610 };
611
612 let mut get_notifications_result =
613 self.get_notifications_with_sliding_sync(&[request]).await?;
614
615 get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
616 }
617
618 async fn compute_status(
623 &self,
624 room: &Room,
625 push_actions: Option<&[Action]>,
626 raw_event: RawNotificationEvent,
627 state_events: Vec<Raw<AnyStateEvent>>,
628 ) -> Result<NotificationStatus, Error> {
629 if let Some(actions) = push_actions
630 && !actions.iter().any(|a| a.should_notify())
631 {
632 return Ok(NotificationStatus::EventFilteredOut);
634 }
635
636 let notification_item =
637 NotificationItem::new(room, raw_event, push_actions, state_events).await?;
638
639 if self.client.is_user_ignored(notification_item.event.sender()).await {
640 Ok(NotificationStatus::EventFilteredOut)
641 } else {
642 Ok(NotificationStatus::Event(Box::new(notification_item)))
643 }
644 }
645
646 pub async fn get_notifications_with_sliding_sync(
651 &self,
652 requests: &[NotificationItemsRequest],
653 ) -> Result<BatchNotificationFetchingResult, Error> {
654 let raw_events = self.try_sliding_sync(requests).await?;
655
656 let mut batch_result = BatchNotificationFetchingResult::new();
657
658 for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
659 let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
661
662 let Some(raw_event) = raw_event else {
663 batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
665 continue;
666 };
667
668 let (raw_event, push_actions) = match &raw_event {
669 RawNotificationEvent::Timeline(timeline_event) => {
670 match self.retry_decryption(&room, timeline_event).await {
672 Ok(Some(timeline_event)) => {
673 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
674 (
675 RawNotificationEvent::Timeline(timeline_event.into_raw()),
676 push_actions,
677 )
678 }
679
680 Ok(None) => {
681 match room.event_push_actions(timeline_event).await {
684 Ok(push_actions) => (raw_event.clone(), push_actions),
685 Err(err) => {
686 batch_result.insert(event_id, Err(err.into()));
688 continue;
689 }
690 }
691 }
692
693 Err(err) => {
694 batch_result.insert(event_id, Err(err));
695 continue;
696 }
697 }
698 }
699
700 RawNotificationEvent::Invite(invite_event) => {
701 match room.event_push_actions(invite_event).await {
703 Ok(push_actions) => {
704 (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
705 }
706 Err(err) => {
707 batch_result.insert(event_id, Err(err.into()));
708 continue;
709 }
710 }
711 }
712 };
713
714 let notification_status_result =
715 self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
716
717 batch_result.insert(event_id, notification_status_result);
718 }
719
720 Ok(batch_result)
721 }
722
723 pub async fn get_notification_with_context(
736 &self,
737 room_id: &RoomId,
738 event_id: &EventId,
739 ) -> Result<NotificationStatus, Error> {
740 info!("fetching notification event with a /context query");
741
742 let Some(room) = self.parent_client.get_room(room_id) else {
744 return Err(Error::UnknownRoom);
745 };
746
747 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
748
749 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
750 let state_events = response.state;
751
752 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
753 timeline_event = decrypted_event;
754 }
755
756 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
757
758 self.compute_status(
759 &room,
760 push_actions.as_deref(),
761 RawNotificationEvent::Timeline(timeline_event.into_raw()),
762 state_events,
763 )
764 .await
765 }
766}
767
768fn is_event_encrypted(event_type: TimelineEventType) -> bool {
769 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
770
771 #[cfg(feature = "unstable-msc3956")]
772 let is_still_encrypted =
773 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
774
775 is_still_encrypted
776}
777
778#[derive(Debug)]
779pub enum NotificationStatus {
780 Event(Box<NotificationItem>),
782 EventNotFound,
784 EventFilteredOut,
788}
789
790#[derive(Debug, Clone)]
791pub struct NotificationItemsRequest {
792 pub room_id: OwnedRoomId,
793 pub event_ids: Vec<OwnedEventId>,
794}
795
796type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
797
798#[derive(Debug, Clone)]
803pub enum RawNotificationEvent {
804 Timeline(Raw<AnySyncTimelineEvent>),
806 Invite(Raw<StrippedRoomMemberEvent>),
809}
810
811#[derive(Debug)]
814pub enum NotificationEvent {
815 Timeline(Box<AnySyncTimelineEvent>),
817 Invite(Box<StrippedRoomMemberEvent>),
819}
820
821impl NotificationEvent {
822 pub fn sender(&self) -> &UserId {
823 match self {
824 NotificationEvent::Timeline(ev) => ev.sender(),
825 NotificationEvent::Invite(ev) => &ev.sender,
826 }
827 }
828
829 fn thread_id(&self) -> Option<OwnedEventId> {
832 let NotificationEvent::Timeline(sync_timeline_event) = &self else {
833 return None;
834 };
835 let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
836 return None;
837 };
838 let content = event.original_content()?;
839 match content {
840 AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
841 Relation::Thread(thread) => Some(thread.event_id),
842 _ => None,
843 },
844 _ => None,
845 }
846 }
847}
848
849#[derive(Debug)]
851pub struct NotificationItem {
852 pub event: NotificationEvent,
854
855 pub raw_event: RawNotificationEvent,
857
858 pub sender_display_name: Option<String>,
860 pub sender_avatar_url: Option<String>,
862 pub is_sender_name_ambiguous: bool,
864
865 pub room_computed_display_name: String,
867 pub room_avatar_url: Option<String>,
869 pub room_canonical_alias: Option<String>,
871 pub room_topic: Option<String>,
873 pub room_join_rule: Option<JoinRule>,
877 pub is_room_encrypted: Option<bool>,
879 pub is_direct_message_room: bool,
881 pub joined_members_count: u64,
883 pub is_space: bool,
885
886 pub is_noisy: Option<bool>,
891 pub has_mention: Option<bool>,
892 pub thread_id: Option<OwnedEventId>,
893
894 pub actions: Option<Vec<Action>>,
896}
897
898impl NotificationItem {
899 async fn new(
900 room: &Room,
901 raw_event: RawNotificationEvent,
902 push_actions: Option<&[Action]>,
903 state_events: Vec<Raw<AnyStateEvent>>,
904 ) -> Result<Self, Error> {
905 let event = match &raw_event {
906 RawNotificationEvent::Timeline(raw_event) => {
907 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
908 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
909 SyncRoomMessageEvent::Original(ev),
910 )) = &mut event
911 {
912 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
913 }
914 NotificationEvent::Timeline(Box::new(event))
915 }
916 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
917 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
918 )),
919 };
920
921 let sender = match room.state() {
922 RoomState::Invited => room.invite_details().await?.inviter,
923 _ => room.get_member_no_sync(event.sender()).await?,
924 };
925
926 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
927 match &sender {
928 Some(sender) => (
929 sender.display_name().map(|s| s.to_owned()),
930 sender.avatar_url().map(|s| s.to_string()),
931 sender.name_ambiguous(),
932 ),
933 None => (None, None, false),
934 };
935
936 if sender_display_name.is_none() || sender_avatar_url.is_none() {
937 let sender_id = event.sender();
938 for ev in state_events {
939 let ev = match ev.deserialize() {
940 Ok(ev) => ev,
941 Err(err) => {
942 warn!("Failed to deserialize a state event: {err}");
943 continue;
944 }
945 };
946 if ev.sender() != sender_id {
947 continue;
948 }
949 if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
950 content,
951 ..
952 }) = ev.content()
953 {
954 if sender_display_name.is_none() {
955 sender_display_name = content.displayname;
956 }
957 if sender_avatar_url.is_none() {
958 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
959 }
960 }
961 }
962 }
963
964 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
965 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
966 let thread_id = event.thread_id().clone();
967
968 let item = NotificationItem {
969 event,
970 raw_event,
971 sender_display_name,
972 sender_avatar_url,
973 is_sender_name_ambiguous,
974 room_computed_display_name: room.display_name().await?.to_string(),
975 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
976 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
977 room_topic: room.topic(),
978 room_join_rule: room.join_rule(),
979 is_direct_message_room: room.is_direct().await?,
980 is_room_encrypted: room
981 .latest_encryption_state()
982 .await
983 .map(|state| state.is_encrypted())
984 .ok(),
985 joined_members_count: room.joined_members_count(),
986 is_space: room.is_space(),
987 is_noisy,
988 has_mention,
989 thread_id,
990 actions: push_actions.map(|actions| actions.to_vec()),
991 };
992
993 Ok(item)
994 }
995
996 pub fn is_public(&self) -> Option<bool> {
1000 self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
1001 }
1002}
1003
1004#[derive(Debug, Error)]
1006pub enum Error {
1007 #[error(transparent)]
1008 BuildingLocalClient(ClientBuildError),
1009
1010 #[error("unknown room for a notification")]
1012 UnknownRoom,
1013
1014 #[error("invalid ruma event")]
1016 InvalidRumaEvent,
1017
1018 #[error("the sliding sync response doesn't include the target room")]
1021 SlidingSyncEmptyRoom,
1022
1023 #[error("the event was missing in the `/context` query")]
1024 ContextMissingEvent,
1025
1026 #[error(transparent)]
1028 SdkError(#[from] matrix_sdk::Error),
1029
1030 #[error(transparent)]
1032 StoreError(#[from] StoreError),
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037 use assert_matches2::assert_let;
1038 use matrix_sdk::test_utils::mocks::MatrixMockServer;
1039 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1040 use ruma::{event_id, room_id, user_id};
1041
1042 use crate::notification_client::{NotificationItem, RawNotificationEvent};
1043
1044 #[async_test]
1045 async fn test_notification_item_returns_thread_id() {
1046 let server = MatrixMockServer::new().await;
1047 let client = server.client_builder().build().await;
1048
1049 let room_id = room_id!("!a:b.c");
1050 let thread_root_event_id = event_id!("$root:b.c");
1051 let message = EventFactory::new()
1052 .room(room_id)
1053 .sender(user_id!("@sender:b.c"))
1054 .text_msg("Threaded")
1055 .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1056 .into_raw_sync();
1057 let room = server.sync_joined_room(&client, room_id).await;
1058
1059 let raw_notification_event = RawNotificationEvent::Timeline(message);
1060 let notification_item =
1061 NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1062 .await
1063 .expect("Could not create notification item");
1064
1065 assert_let!(Some(thread_id) = notification_item.thread_id);
1066 assert_eq!(thread_id, thread_root_event_id);
1067 }
1068}