1use std::{
16 collections::BTreeMap,
17 sync::{Arc, Mutex},
18 time::Duration,
19};
20
21use futures_util::{StreamExt as _, pin_mut};
22use matrix_sdk::{
23 Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
24};
25use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
26use ruma::{
27 EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
28 api::client::sync::sync_events::v5 as http,
29 assign,
30 events::{
31 AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
32 AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
33 TimelineEventType,
34 room::{
35 encrypted::OriginalSyncRoomEncryptedEvent,
36 join_rules::JoinRule,
37 member::{MembershipState, StrippedRoomMemberEvent},
38 message::{Relation, SyncRoomMessageEvent},
39 },
40 },
41 html::RemoveReplyFallback,
42 push::Action,
43 serde::Raw,
44 uint,
45};
46use thiserror::Error;
47use tokio::sync::Mutex as AsyncMutex;
48use tracing::{debug, info, instrument, trace, warn};
49
50use crate::{
51 DEFAULT_SANITIZER_MODE,
52 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
53 sync_service::SyncService,
54};
55
56#[derive(Clone)]
58pub enum NotificationProcessSetup {
59 MultipleProcesses,
68
69 SingleProcess { sync_service: Arc<SyncService> },
77}
78
79pub struct NotificationClient {
85 client: Client,
87
88 parent_client: Client,
90
91 process_setup: NotificationProcessSetup,
93
94 notification_sync_mutex: AsyncMutex<()>,
102
103 encryption_sync_mutex: AsyncMutex<()>,
108}
109
110impl NotificationClient {
111 const CONNECTION_ID: &'static str = "notifications";
112 const LOCK_ID: &'static str = "notifications";
113
114 pub async fn new(
116 parent_client: Client,
117 process_setup: NotificationProcessSetup,
118 ) -> Result<Self, Error> {
119 let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
120
121 Ok(NotificationClient {
122 client,
123 parent_client,
124 notification_sync_mutex: AsyncMutex::new(()),
125 encryption_sync_mutex: AsyncMutex::new(()),
126 process_setup,
127 })
128 }
129
130 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
134 self.client.get_room(room_id)
135 }
136
137 #[instrument(skip(self))]
146 pub async fn get_notification(
147 &self,
148 room_id: &RoomId,
149 event_id: &EventId,
150 ) -> Result<NotificationStatus, Error> {
151 let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
152 match status {
153 NotificationStatus::Event(..) | NotificationStatus::EventFilteredOut => Ok(status),
154 NotificationStatus::EventNotFound => {
155 self.get_notification_with_context(room_id, event_id).await
156 }
157 }
158 }
159
160 pub async fn get_notifications(
174 &self,
175 requests: &[NotificationItemsRequest],
176 ) -> Result<BatchNotificationFetchingResult, Error> {
177 let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
178
179 for request in requests {
180 for event_id in &request.event_ids {
181 match notifications.get_mut(event_id) {
182 Some(Ok(NotificationStatus::EventNotFound)) | None => {
185 notifications.insert(
186 event_id.to_owned(),
187 self.get_notification_with_context(&request.room_id, event_id).await,
188 );
189 }
190
191 _ => {}
192 }
193 }
194 }
195
196 Ok(notifications)
197 }
198
199 #[instrument(skip_all)]
209 async fn retry_decryption(
210 &self,
211 room: &Room,
212 raw_event: &Raw<AnySyncTimelineEvent>,
213 ) -> Result<Option<TimelineEvent>, Error> {
214 let event: AnySyncTimelineEvent =
215 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
216
217 if !is_event_encrypted(event.event_type()) {
218 return Ok(None);
219 }
220
221 let _guard = self.encryption_sync_mutex.lock().await;
223
224 let with_locking = WithLocking::from(matches!(
235 self.process_setup,
236 NotificationProcessSetup::MultipleProcesses
237 ));
238
239 let push_ctx = room.push_context().await?;
240 let sync_permit_guard = match &self.process_setup {
241 NotificationProcessSetup::MultipleProcesses => {
242 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
246 sync_permit.lock_owned().await
247 }
248
249 NotificationProcessSetup::SingleProcess { sync_service } => {
250 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
251 permit_guard
252 } else {
253 let mut wait = 200;
262
263 debug!("Encryption sync running in background");
264 for _ in 0..3 {
265 trace!("waiting for decryption…");
266
267 sleep(Duration::from_millis(wait)).await;
268
269 let new_event = room
273 .decrypt_event(
274 raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
275 push_ctx.as_ref(),
276 )
277 .await?;
278
279 match new_event.kind {
280 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
281 utd_info, ..} => {
282 if utd_info.reason.is_missing_room_key() {
283 wait *= 2;
286 } else {
287 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
288 return Ok(None);
289 }
290 }
291 _ => {
292 trace!("Waiting succeeded and event could be decrypted!");
293 return Ok(Some(new_event));
294 }
295 }
296 }
297
298 debug!("Timeout waiting for the encryption sync to decrypt notification.");
300 return Ok(None);
301 }
302 }
303 };
304
305 let encryption_sync = EncryptionSyncService::new(
306 self.client.clone(),
307 Some((Duration::from_secs(3), Duration::from_secs(4))),
308 with_locking,
309 )
310 .await;
311
312 match encryption_sync {
317 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
318 Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
322 Ok(new_event) => match new_event.kind {
323 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
324 utd_info, ..
325 } => {
326 trace!(
327 "Encryption sync failed to decrypt the event: {:?}",
328 utd_info.reason
329 );
330 Ok(None)
331 }
332 _ => {
333 trace!("Encryption sync managed to decrypt the event.");
334 Ok(Some(new_event))
335 }
336 },
337 Err(err) => {
338 trace!("Encryption sync failed to decrypt the event: {err}");
339 Ok(None)
340 }
341 },
342 Err(err) => {
343 warn!("Encryption sync error: {err:#}");
344 Ok(None)
345 }
346 },
347 Err(err) => {
348 warn!("Encryption sync build error: {err:#}",);
349 Ok(None)
350 }
351 }
352 }
353
354 #[instrument(skip_all)]
373 async fn try_sliding_sync(
374 &self,
375 requests: &[NotificationItemsRequest],
376 ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
377 let _guard = self.notification_sync_mutex.lock().await;
380
381 let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
386
387 let handler_raw_notification = raw_notifications.clone();
388
389 let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
390
391 let timeline_event_handler = self.client.add_event_handler({
392 let requests = requests.clone();
393 move |raw: Raw<AnySyncTimelineEvent>| async move {
394 match &raw.get_field::<OwnedEventId>("event_id") {
395 Ok(Some(event_id)) => {
396 let Some(request) =
397 &requests.iter().find(|request| request.event_ids.contains(event_id))
398 else {
399 return;
400 };
401
402 let room_id = request.room_id.clone();
403
404 handler_raw_notification.lock().unwrap().insert(
408 event_id.to_owned(),
409 (room_id, Some(RawNotificationEvent::Timeline(raw))),
410 );
411 }
412 Ok(None) => {
413 warn!("a sync event had no event id");
414 }
415 Err(err) => {
416 warn!("failed to deserialize sync event id: {err}");
417 }
418 }
419 }
420 });
421
422 let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
424
425 let user_id = self.client.user_id().unwrap().to_owned();
426 let handler_raw_invites = raw_invites.clone();
427 let handler_raw_notifications = raw_notifications.clone();
428 let stripped_member_handler = self.client.add_event_handler({
429 let requests = requests.clone();
430 move |raw: Raw<StrippedRoomMemberEvent>| async move {
431 let deserialized = match raw.deserialize() {
432 Ok(d) => d,
433 Err(err) => {
434 warn!("failed to deserialize raw stripped room member event: {err}");
435 return;
436 }
437 };
438
439 trace!("received a stripped room member event");
440
441 match &raw.get_field::<OwnedEventId>("event_id") {
444 Ok(Some(event_id)) => {
445 let request =
446 &requests.iter().find(|request| request.event_ids.contains(event_id));
447 if request.is_none() {
448 return;
449 }
450 let room_id = request.unwrap().room_id.clone();
451
452 handler_raw_notifications.lock().unwrap().insert(
456 event_id.to_owned(),
457 (room_id, Some(RawNotificationEvent::Invite(raw))),
458 );
459 return;
460 }
461 Ok(None) => {
462 warn!("a room member event had no id");
463 }
464 Err(err) => {
465 warn!("failed to deserialize room member event id: {err}");
466 }
467 }
468
469 if deserialized.content.membership == MembershipState::Invite
471 && deserialized.state_key == user_id
472 {
473 trace!("found an invite event for the current user");
474 handler_raw_invites
478 .lock()
479 .unwrap()
480 .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
481 } else {
482 trace!("not an invite event, or not for the current user");
483 }
484 }
485 });
486
487 let required_state = vec![
489 (StateEventType::RoomEncryption, "".to_owned()),
490 (StateEventType::RoomMember, "$LAZY".to_owned()),
491 (StateEventType::RoomMember, "$ME".to_owned()),
492 (StateEventType::RoomCanonicalAlias, "".to_owned()),
493 (StateEventType::RoomName, "".to_owned()),
494 (StateEventType::RoomAvatar, "".to_owned()),
495 (StateEventType::RoomPowerLevels, "".to_owned()),
496 (StateEventType::RoomJoinRules, "".to_owned()),
497 (StateEventType::CallMember, "*".to_owned()),
498 (StateEventType::RoomCreate, "".to_owned()),
499 ];
500
501 let invites = SlidingSyncList::builder("invites")
502 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
503 .timeline_limit(8)
504 .required_state(required_state.clone())
505 .filters(Some(assign!(http::request::ListFilters::default(), {
506 is_invite: Some(true),
507 })));
508
509 let sync = self
510 .client
511 .sliding_sync(Self::CONNECTION_ID)?
512 .poll_timeout(Duration::from_secs(1))
513 .network_timeout(Duration::from_secs(3))
514 .with_account_data_extension(
515 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
516 )
517 .add_list(invites)
518 .build()
519 .await?;
520
521 let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
522 sync.subscribe_to_rooms(
523 &room_ids,
524 Some(assign!(http::request::RoomSubscription::default(), {
525 required_state,
526 timeline_limit: uint!(16)
527 })),
528 true,
529 );
530
531 let mut remaining_attempts = 3;
532
533 let stream = sync.sync();
534 pin_mut!(stream);
535
536 let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
538
539 loop {
540 if stream.next().await.is_none() {
541 break;
543 }
544
545 if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
546 == expected_event_count
547 {
548 break;
550 }
551
552 remaining_attempts -= 1;
553 if remaining_attempts == 0 {
554 break;
556 }
557 }
558
559 self.client.remove_event_handler(stripped_member_handler);
560 self.client.remove_event_handler(timeline_event_handler);
561
562 let mut notifications = raw_notifications.clone().lock().unwrap().clone();
563 let mut missing_event_ids = Vec::new();
564
565 for request in requests.iter() {
567 for event_id in &request.event_ids {
568 if !notifications.contains_key(event_id) {
569 missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
570 }
571 }
572 }
573
574 for (room_id, missing_event_id) in missing_event_ids {
576 trace!("we didn't have a non-invite event, looking for invited room now");
577 if let Some(room) = self.client.get_room(&room_id) {
578 if room.state() == RoomState::Invited {
579 if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
580 notifications.insert(
581 missing_event_id.to_owned(),
582 (room_id.to_owned(), stripped_event),
583 );
584 }
585 } else {
586 debug!("the room isn't in the invited state");
587 }
588 } else {
589 warn!(%room_id, "unknown room, can't check for invite events");
590 }
591 }
592
593 let found = if notifications.len() == expected_event_count { "" } else { "not " };
594 trace!("all notification events have{found} been found");
595
596 Ok(notifications)
597 }
598
599 pub async fn get_notification_with_sliding_sync(
600 &self,
601 room_id: &RoomId,
602 event_id: &EventId,
603 ) -> Result<NotificationStatus, Error> {
604 info!("fetching notification event with a sliding sync");
605
606 let request = NotificationItemsRequest {
607 room_id: room_id.to_owned(),
608 event_ids: vec![event_id.to_owned()],
609 };
610
611 let mut get_notifications_result =
612 self.get_notifications_with_sliding_sync(&[request]).await?;
613
614 get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
615 }
616
617 async fn compute_status(
622 &self,
623 room: &Room,
624 push_actions: Option<&[Action]>,
625 raw_event: RawNotificationEvent,
626 state_events: Vec<Raw<AnyStateEvent>>,
627 ) -> Result<NotificationStatus, Error> {
628 if let Some(actions) = push_actions
629 && !actions.iter().any(|a| a.should_notify())
630 {
631 return Ok(NotificationStatus::EventFilteredOut);
633 }
634
635 let notification_item =
636 NotificationItem::new(room, raw_event, push_actions, state_events).await?;
637
638 if self.client.is_user_ignored(notification_item.event.sender()).await {
639 Ok(NotificationStatus::EventFilteredOut)
640 } else {
641 Ok(NotificationStatus::Event(Box::new(notification_item)))
642 }
643 }
644
645 pub async fn get_notifications_with_sliding_sync(
650 &self,
651 requests: &[NotificationItemsRequest],
652 ) -> Result<BatchNotificationFetchingResult, Error> {
653 let raw_events = self.try_sliding_sync(requests).await?;
654
655 let mut batch_result = BatchNotificationFetchingResult::new();
656
657 for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
658 let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
660
661 let Some(raw_event) = raw_event else {
662 batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
664 continue;
665 };
666
667 let (raw_event, push_actions) = match &raw_event {
668 RawNotificationEvent::Timeline(timeline_event) => {
669 match self.retry_decryption(&room, timeline_event).await {
671 Ok(Some(timeline_event)) => {
672 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
673 (
674 RawNotificationEvent::Timeline(timeline_event.into_raw()),
675 push_actions,
676 )
677 }
678
679 Ok(None) => {
680 match room.event_push_actions(timeline_event).await {
683 Ok(push_actions) => (raw_event.clone(), push_actions),
684 Err(err) => {
685 batch_result.insert(event_id, Err(err.into()));
687 continue;
688 }
689 }
690 }
691
692 Err(err) => {
693 batch_result.insert(event_id, Err(err));
694 continue;
695 }
696 }
697 }
698
699 RawNotificationEvent::Invite(invite_event) => {
700 match room.event_push_actions(invite_event).await {
702 Ok(push_actions) => {
703 (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
704 }
705 Err(err) => {
706 batch_result.insert(event_id, Err(err.into()));
707 continue;
708 }
709 }
710 }
711 };
712
713 let notification_status_result =
714 self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
715
716 batch_result.insert(event_id, notification_status_result);
717 }
718
719 Ok(batch_result)
720 }
721
722 pub async fn get_notification_with_context(
735 &self,
736 room_id: &RoomId,
737 event_id: &EventId,
738 ) -> Result<NotificationStatus, Error> {
739 info!("fetching notification event with a /context query");
740
741 let Some(room) = self.parent_client.get_room(room_id) else {
743 return Err(Error::UnknownRoom);
744 };
745
746 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
747
748 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
749 let state_events = response.state;
750
751 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
752 timeline_event = decrypted_event;
753 }
754
755 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
756
757 self.compute_status(
758 &room,
759 push_actions.as_deref(),
760 RawNotificationEvent::Timeline(timeline_event.into_raw()),
761 state_events,
762 )
763 .await
764 }
765}
766
767fn is_event_encrypted(event_type: TimelineEventType) -> bool {
768 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
769
770 #[cfg(feature = "unstable-msc3956")]
771 let is_still_encrypted =
772 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
773
774 is_still_encrypted
775}
776
777#[derive(Debug)]
778pub enum NotificationStatus {
779 Event(Box<NotificationItem>),
781 EventNotFound,
783 EventFilteredOut,
787}
788
789#[derive(Debug, Clone)]
790pub struct NotificationItemsRequest {
791 pub room_id: OwnedRoomId,
792 pub event_ids: Vec<OwnedEventId>,
793}
794
795type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
796
797#[derive(Debug, Clone)]
802pub enum RawNotificationEvent {
803 Timeline(Raw<AnySyncTimelineEvent>),
805 Invite(Raw<StrippedRoomMemberEvent>),
808}
809
810#[derive(Debug)]
813pub enum NotificationEvent {
814 Timeline(Box<AnySyncTimelineEvent>),
816 Invite(Box<StrippedRoomMemberEvent>),
818}
819
820impl NotificationEvent {
821 pub fn sender(&self) -> &UserId {
822 match self {
823 NotificationEvent::Timeline(ev) => ev.sender(),
824 NotificationEvent::Invite(ev) => &ev.sender,
825 }
826 }
827
828 fn thread_id(&self) -> Option<OwnedEventId> {
831 let NotificationEvent::Timeline(sync_timeline_event) = &self else {
832 return None;
833 };
834 let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
835 return None;
836 };
837 let content = event.original_content()?;
838 match content {
839 AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
840 Relation::Thread(thread) => Some(thread.event_id),
841 _ => None,
842 },
843 _ => None,
844 }
845 }
846}
847
848#[derive(Debug)]
850pub struct NotificationItem {
851 pub event: NotificationEvent,
853
854 pub raw_event: RawNotificationEvent,
856
857 pub sender_display_name: Option<String>,
859 pub sender_avatar_url: Option<String>,
861 pub is_sender_name_ambiguous: bool,
863
864 pub room_computed_display_name: String,
866 pub room_avatar_url: Option<String>,
868 pub room_canonical_alias: Option<String>,
870 pub room_topic: Option<String>,
872 pub room_join_rule: Option<JoinRule>,
876 pub is_room_encrypted: Option<bool>,
878 pub is_direct_message_room: bool,
880 pub joined_members_count: u64,
882 pub is_space: bool,
884
885 pub is_noisy: Option<bool>,
890 pub has_mention: Option<bool>,
891 pub thread_id: Option<OwnedEventId>,
892
893 pub actions: Option<Vec<Action>>,
895}
896
897impl NotificationItem {
898 async fn new(
899 room: &Room,
900 raw_event: RawNotificationEvent,
901 push_actions: Option<&[Action]>,
902 state_events: Vec<Raw<AnyStateEvent>>,
903 ) -> Result<Self, Error> {
904 let event = match &raw_event {
905 RawNotificationEvent::Timeline(raw_event) => {
906 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
907 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
908 SyncRoomMessageEvent::Original(ev),
909 )) = &mut event
910 {
911 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
912 }
913 NotificationEvent::Timeline(Box::new(event))
914 }
915 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
916 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
917 )),
918 };
919
920 let sender = match room.state() {
921 RoomState::Invited => room.invite_details().await?.inviter,
922 _ => room.get_member_no_sync(event.sender()).await?,
923 };
924
925 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
926 match &sender {
927 Some(sender) => (
928 sender.display_name().map(|s| s.to_owned()),
929 sender.avatar_url().map(|s| s.to_string()),
930 sender.name_ambiguous(),
931 ),
932 None => (None, None, false),
933 };
934
935 if sender_display_name.is_none() || sender_avatar_url.is_none() {
936 let sender_id = event.sender();
937 for ev in state_events {
938 let ev = match ev.deserialize() {
939 Ok(ev) => ev,
940 Err(err) => {
941 warn!("Failed to deserialize a state event: {err}");
942 continue;
943 }
944 };
945 if ev.sender() != sender_id {
946 continue;
947 }
948 if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
949 content,
950 ..
951 }) = ev.content()
952 {
953 if sender_display_name.is_none() {
954 sender_display_name = content.displayname;
955 }
956 if sender_avatar_url.is_none() {
957 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
958 }
959 }
960 }
961 }
962
963 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
964 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
965 let thread_id = event.thread_id().clone();
966
967 let item = NotificationItem {
968 event,
969 raw_event,
970 sender_display_name,
971 sender_avatar_url,
972 is_sender_name_ambiguous,
973 room_computed_display_name: room.display_name().await?.to_string(),
974 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
975 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
976 room_topic: room.topic(),
977 room_join_rule: room.join_rule(),
978 is_direct_message_room: room.is_direct().await?,
979 is_room_encrypted: room
980 .latest_encryption_state()
981 .await
982 .map(|state| state.is_encrypted())
983 .ok(),
984 joined_members_count: room.joined_members_count(),
985 is_space: room.is_space(),
986 is_noisy,
987 has_mention,
988 thread_id,
989 actions: push_actions.map(|actions| actions.to_vec()),
990 };
991
992 Ok(item)
993 }
994
995 pub fn is_public(&self) -> Option<bool> {
999 self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
1000 }
1001}
1002
1003#[derive(Debug, Error)]
1005pub enum Error {
1006 #[error(transparent)]
1007 BuildingLocalClient(ClientBuildError),
1008
1009 #[error("unknown room for a notification")]
1011 UnknownRoom,
1012
1013 #[error("invalid ruma event")]
1015 InvalidRumaEvent,
1016
1017 #[error("the sliding sync response doesn't include the target room")]
1020 SlidingSyncEmptyRoom,
1021
1022 #[error("the event was missing in the `/context` query")]
1023 ContextMissingEvent,
1024
1025 #[error(transparent)]
1027 SdkError(#[from] matrix_sdk::Error),
1028
1029 #[error(transparent)]
1031 StoreError(#[from] StoreError),
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036 use assert_matches2::assert_let;
1037 use matrix_sdk::test_utils::mocks::MatrixMockServer;
1038 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1039 use ruma::{event_id, room_id, user_id};
1040
1041 use crate::notification_client::{NotificationItem, RawNotificationEvent};
1042
1043 #[async_test]
1044 async fn test_notification_item_returns_thread_id() {
1045 let server = MatrixMockServer::new().await;
1046 let client = server.client_builder().build().await;
1047
1048 let room_id = room_id!("!a:b.c");
1049 let thread_root_event_id = event_id!("$root:b.c");
1050 let message = EventFactory::new()
1051 .room(room_id)
1052 .sender(user_id!("@sender:b.c"))
1053 .text_msg("Threaded")
1054 .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1055 .into_raw_sync();
1056 let room = server.sync_joined_room(&client, room_id).await;
1057
1058 let raw_notification_event = RawNotificationEvent::Timeline(message);
1059 let notification_item =
1060 NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1061 .await
1062 .expect("Could not create notification item");
1063
1064 assert_let!(Some(thread_id) = notification_item.thread_id);
1065 assert_eq!(thread_id, thread_root_event_id);
1066 }
1067}