1use std::{
16 collections::BTreeMap,
17 sync::{Arc, Mutex},
18 time::Duration,
19};
20
21use futures_util::{StreamExt as _, pin_mut};
22use matrix_sdk::{
23 Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
24};
25use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
26use ruma::{
27 EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
28 api::client::sync::sync_events::v5 as http,
29 assign,
30 events::{
31 AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
32 AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
33 TimelineEventType,
34 room::{
35 encrypted::OriginalSyncRoomEncryptedEvent,
36 join_rules::JoinRule,
37 member::{MembershipState, StrippedRoomMemberEvent},
38 message::{Relation, SyncRoomMessageEvent},
39 },
40 },
41 html::RemoveReplyFallback,
42 push::Action,
43 serde::Raw,
44 uint,
45};
46use thiserror::Error;
47use tokio::sync::Mutex as AsyncMutex;
48use tracing::{debug, info, instrument, trace, warn};
49
50use crate::{
51 DEFAULT_SANITIZER_MODE,
52 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
53 sync_service::SyncService,
54};
55
56#[derive(Clone)]
58pub enum NotificationProcessSetup {
59 MultipleProcesses,
68
69 SingleProcess { sync_service: Arc<SyncService> },
77}
78
79pub struct NotificationClient {
85 client: Client,
87
88 parent_client: Client,
90
91 process_setup: NotificationProcessSetup,
93
94 notification_sync_mutex: AsyncMutex<()>,
102
103 encryption_sync_mutex: AsyncMutex<()>,
108}
109
110impl NotificationClient {
111 const CONNECTION_ID: &'static str = "notifications";
112 const LOCK_ID: &'static str = "notifications";
113
114 pub async fn new(
116 parent_client: Client,
117 process_setup: NotificationProcessSetup,
118 ) -> Result<Self, Error> {
119 let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
120
121 Ok(NotificationClient {
122 client,
123 parent_client,
124 notification_sync_mutex: AsyncMutex::new(()),
125 encryption_sync_mutex: AsyncMutex::new(()),
126 process_setup,
127 })
128 }
129
130 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
134 self.client.get_room(room_id)
135 }
136
137 #[instrument(skip(self))]
146 pub async fn get_notification(
147 &self,
148 room_id: &RoomId,
149 event_id: &EventId,
150 ) -> Result<NotificationStatus, Error> {
151 let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
152 match status {
153 NotificationStatus::Event(..) | NotificationStatus::EventFilteredOut => Ok(status),
154 NotificationStatus::EventNotFound => {
155 self.get_notification_with_context(room_id, event_id).await
156 }
157 }
158 }
159
160 pub async fn get_notifications(
174 &self,
175 requests: &[NotificationItemsRequest],
176 ) -> Result<BatchNotificationFetchingResult, Error> {
177 let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
178
179 for request in requests {
180 for event_id in &request.event_ids {
181 match notifications.get_mut(event_id) {
182 Some(Ok(NotificationStatus::EventNotFound)) | None => {
185 notifications.insert(
186 event_id.to_owned(),
187 self.get_notification_with_context(&request.room_id, event_id).await,
188 );
189 }
190
191 _ => {}
192 }
193 }
194 }
195
196 Ok(notifications)
197 }
198
199 #[instrument(skip_all)]
209 async fn retry_decryption(
210 &self,
211 room: &Room,
212 raw_event: &Raw<AnySyncTimelineEvent>,
213 ) -> Result<Option<TimelineEvent>, Error> {
214 let event: AnySyncTimelineEvent =
215 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
216
217 if !is_event_encrypted(event.event_type()) {
218 return Ok(None);
219 }
220
221 let _guard = self.encryption_sync_mutex.lock().await;
223
224 let with_locking = WithLocking::from(matches!(
235 self.process_setup,
236 NotificationProcessSetup::MultipleProcesses
237 ));
238
239 let push_ctx = room.push_context().await?;
240 let sync_permit_guard = match &self.process_setup {
241 NotificationProcessSetup::MultipleProcesses => {
242 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
246 sync_permit.lock_owned().await
247 }
248
249 NotificationProcessSetup::SingleProcess { sync_service } => {
250 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
251 permit_guard
252 } else {
253 let mut wait = 200;
262
263 debug!("Encryption sync running in background");
264 for _ in 0..3 {
265 trace!("waiting for decryption…");
266
267 sleep(Duration::from_millis(wait)).await;
268
269 let new_event = room
273 .decrypt_event(
274 raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
275 push_ctx.as_ref(),
276 )
277 .await?;
278
279 match new_event.kind {
280 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
281 utd_info, ..} => {
282 if utd_info.reason.is_missing_room_key() {
283 wait *= 2;
286 } else {
287 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
288 return Ok(None);
289 }
290 }
291 _ => {
292 trace!("Waiting succeeded and event could be decrypted!");
293 return Ok(Some(new_event));
294 }
295 }
296 }
297
298 debug!("Timeout waiting for the encryption sync to decrypt notification.");
300 return Ok(None);
301 }
302 }
303 };
304
305 let encryption_sync = EncryptionSyncService::new(
306 self.client.clone(),
307 Some((Duration::from_secs(3), Duration::from_secs(4))),
308 with_locking,
309 )
310 .await;
311
312 match encryption_sync {
317 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
318 Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
322 Ok(new_event) => match new_event.kind {
323 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
324 utd_info, ..
325 } => {
326 trace!(
327 "Encryption sync failed to decrypt the event: {:?}",
328 utd_info.reason
329 );
330 Ok(None)
331 }
332 _ => {
333 trace!("Encryption sync managed to decrypt the event.");
334 Ok(Some(new_event))
335 }
336 },
337 Err(err) => {
338 trace!("Encryption sync failed to decrypt the event: {err}");
339 Ok(None)
340 }
341 },
342 Err(err) => {
343 warn!("Encryption sync error: {err:#}");
344 Ok(None)
345 }
346 },
347 Err(err) => {
348 warn!("Encryption sync build error: {err:#}",);
349 Ok(None)
350 }
351 }
352 }
353
354 #[instrument(skip_all)]
373 async fn try_sliding_sync(
374 &self,
375 requests: &[NotificationItemsRequest],
376 ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
377 let _guard = self.notification_sync_mutex.lock().await;
380
381 let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
386
387 let handler_raw_notification = raw_notifications.clone();
388
389 let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
390
391 let timeline_event_handler = self.client.add_event_handler({
392 let requests = requests.clone();
393 move |raw: Raw<AnySyncTimelineEvent>| async move {
394 match &raw.get_field::<OwnedEventId>("event_id") {
395 Ok(Some(event_id)) => {
396 let request =
397 &requests.iter().find(|request| request.event_ids.contains(event_id));
398 if request.is_none() {
399 return;
400 }
401 let room_id = request.unwrap().room_id.clone();
402 for request in requests.iter() {
403 if request.event_ids.contains(event_id) {
404 handler_raw_notification.lock().unwrap().insert(
408 event_id.to_owned(),
409 (room_id, Some(RawNotificationEvent::Timeline(raw))),
410 );
411 return;
412 }
413 }
414 }
415 Ok(None) => {
416 warn!("a sync event had no event id");
417 }
418 Err(err) => {
419 warn!("failed to deserialize sync event id: {err}");
420 }
421 }
422 }
423 });
424
425 let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
427
428 let user_id = self.client.user_id().unwrap().to_owned();
429 let handler_raw_invites = raw_invites.clone();
430 let handler_raw_notifications = raw_notifications.clone();
431 let stripped_member_handler = self.client.add_event_handler({
432 let requests = requests.clone();
433 move |raw: Raw<StrippedRoomMemberEvent>| async move {
434 let deserialized = match raw.deserialize() {
435 Ok(d) => d,
436 Err(err) => {
437 warn!("failed to deserialize raw stripped room member event: {err}");
438 return;
439 }
440 };
441
442 trace!("received a stripped room member event");
443
444 match &raw.get_field::<OwnedEventId>("event_id") {
447 Ok(Some(event_id)) => {
448 let request =
449 &requests.iter().find(|request| request.event_ids.contains(event_id));
450 if request.is_none() {
451 return;
452 }
453 let room_id = request.unwrap().room_id.clone();
454
455 handler_raw_notifications.lock().unwrap().insert(
459 event_id.to_owned(),
460 (room_id, Some(RawNotificationEvent::Invite(raw))),
461 );
462 return;
463 }
464 Ok(None) => {
465 warn!("a room member event had no id");
466 }
467 Err(err) => {
468 warn!("failed to deserialize room member event id: {err}");
469 }
470 }
471
472 if deserialized.content.membership == MembershipState::Invite
474 && deserialized.state_key == user_id
475 {
476 trace!("found an invite event for the current user");
477 handler_raw_invites
481 .lock()
482 .unwrap()
483 .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
484 } else {
485 trace!("not an invite event, or not for the current user");
486 }
487 }
488 });
489
490 let required_state = vec![
492 (StateEventType::RoomEncryption, "".to_owned()),
493 (StateEventType::RoomMember, "$LAZY".to_owned()),
494 (StateEventType::RoomMember, "$ME".to_owned()),
495 (StateEventType::RoomCanonicalAlias, "".to_owned()),
496 (StateEventType::RoomName, "".to_owned()),
497 (StateEventType::RoomPowerLevels, "".to_owned()),
498 (StateEventType::RoomJoinRules, "".to_owned()),
499 (StateEventType::CallMember, "*".to_owned()),
500 (StateEventType::RoomCreate, "".to_owned()),
501 ];
502
503 let invites = SlidingSyncList::builder("invites")
504 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
505 .timeline_limit(8)
506 .required_state(required_state.clone())
507 .filters(Some(assign!(http::request::ListFilters::default(), {
508 is_invite: Some(true),
509 })));
510
511 let sync = self
512 .client
513 .sliding_sync(Self::CONNECTION_ID)?
514 .poll_timeout(Duration::from_secs(1))
515 .network_timeout(Duration::from_secs(3))
516 .with_account_data_extension(
517 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
518 )
519 .add_list(invites)
520 .build()
521 .await?;
522
523 let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
524 sync.subscribe_to_rooms(
525 &room_ids,
526 Some(assign!(http::request::RoomSubscription::default(), {
527 required_state,
528 timeline_limit: uint!(16)
529 })),
530 true,
531 );
532
533 let mut remaining_attempts = 3;
534
535 let stream = sync.sync();
536 pin_mut!(stream);
537
538 let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
540
541 loop {
542 if stream.next().await.is_none() {
543 break;
545 }
546
547 if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
548 == expected_event_count
549 {
550 break;
552 }
553
554 remaining_attempts -= 1;
555 if remaining_attempts == 0 {
556 break;
558 }
559 }
560
561 self.client.remove_event_handler(stripped_member_handler);
562 self.client.remove_event_handler(timeline_event_handler);
563
564 let mut notifications = raw_notifications.clone().lock().unwrap().clone();
565 let mut missing_event_ids = Vec::new();
566
567 for request in requests.iter() {
569 for event_id in &request.event_ids {
570 if !notifications.contains_key(event_id) {
571 missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
572 }
573 }
574 }
575
576 for (room_id, missing_event_id) in missing_event_ids {
578 trace!("we didn't have a non-invite event, looking for invited room now");
579 if let Some(room) = self.client.get_room(&room_id) {
580 if room.state() == RoomState::Invited {
581 if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
582 notifications.insert(
583 missing_event_id.to_owned(),
584 (room_id.to_owned(), stripped_event),
585 );
586 }
587 } else {
588 debug!("the room isn't in the invited state");
589 }
590 } else {
591 warn!(%room_id, "unknown room, can't check for invite events");
592 }
593 }
594
595 let found = if notifications.len() == expected_event_count { "" } else { "not " };
596 trace!("all notification events have{found} been found");
597
598 Ok(notifications)
599 }
600
601 pub async fn get_notification_with_sliding_sync(
602 &self,
603 room_id: &RoomId,
604 event_id: &EventId,
605 ) -> Result<NotificationStatus, Error> {
606 info!("fetching notification event with a sliding sync");
607
608 let request = NotificationItemsRequest {
609 room_id: room_id.to_owned(),
610 event_ids: vec![event_id.to_owned()],
611 };
612
613 let mut get_notifications_result =
614 self.get_notifications_with_sliding_sync(&[request]).await?;
615
616 get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
617 }
618
619 async fn compute_status(
624 &self,
625 room: &Room,
626 push_actions: Option<&[Action]>,
627 raw_event: RawNotificationEvent,
628 state_events: Vec<Raw<AnyStateEvent>>,
629 ) -> Result<NotificationStatus, Error> {
630 if let Some(actions) = push_actions
631 && !actions.iter().any(|a| a.should_notify())
632 {
633 return Ok(NotificationStatus::EventFilteredOut);
635 }
636
637 let notification_item =
638 NotificationItem::new(room, raw_event, push_actions, state_events).await?;
639
640 if self.client.is_user_ignored(notification_item.event.sender()).await {
641 Ok(NotificationStatus::EventFilteredOut)
642 } else {
643 Ok(NotificationStatus::Event(Box::new(notification_item)))
644 }
645 }
646
647 pub async fn get_notifications_with_sliding_sync(
652 &self,
653 requests: &[NotificationItemsRequest],
654 ) -> Result<BatchNotificationFetchingResult, Error> {
655 let raw_events = self.try_sliding_sync(requests).await?;
656
657 let mut batch_result = BatchNotificationFetchingResult::new();
658
659 for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
660 let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
662
663 let Some(raw_event) = raw_event else {
664 batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
666 continue;
667 };
668
669 let (raw_event, push_actions) = match &raw_event {
670 RawNotificationEvent::Timeline(timeline_event) => {
671 match self.retry_decryption(&room, timeline_event).await {
673 Ok(Some(timeline_event)) => {
674 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
675 (
676 RawNotificationEvent::Timeline(timeline_event.into_raw()),
677 push_actions,
678 )
679 }
680
681 Ok(None) => {
682 match room.event_push_actions(timeline_event).await {
685 Ok(push_actions) => (raw_event.clone(), push_actions),
686 Err(err) => {
687 batch_result.insert(event_id, Err(err.into()));
689 continue;
690 }
691 }
692 }
693
694 Err(err) => {
695 batch_result.insert(event_id, Err(err));
696 continue;
697 }
698 }
699 }
700
701 RawNotificationEvent::Invite(invite_event) => {
702 match room.event_push_actions(invite_event).await {
704 Ok(push_actions) => {
705 (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
706 }
707 Err(err) => {
708 batch_result.insert(event_id, Err(err.into()));
709 continue;
710 }
711 }
712 }
713 };
714
715 let notification_status_result =
716 self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
717
718 batch_result.insert(event_id, notification_status_result);
719 }
720
721 Ok(batch_result)
722 }
723
724 pub async fn get_notification_with_context(
737 &self,
738 room_id: &RoomId,
739 event_id: &EventId,
740 ) -> Result<NotificationStatus, Error> {
741 info!("fetching notification event with a /context query");
742
743 let Some(room) = self.parent_client.get_room(room_id) else {
745 return Err(Error::UnknownRoom);
746 };
747
748 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
749
750 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
751 let state_events = response.state;
752
753 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
754 timeline_event = decrypted_event;
755 }
756
757 let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
758
759 self.compute_status(
760 &room,
761 push_actions.as_deref(),
762 RawNotificationEvent::Timeline(timeline_event.into_raw()),
763 state_events,
764 )
765 .await
766 }
767}
768
769fn is_event_encrypted(event_type: TimelineEventType) -> bool {
770 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
771
772 #[cfg(feature = "unstable-msc3956")]
773 let is_still_encrypted =
774 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
775
776 is_still_encrypted
777}
778
779#[derive(Debug)]
780pub enum NotificationStatus {
781 Event(Box<NotificationItem>),
783 EventNotFound,
785 EventFilteredOut,
789}
790
791#[derive(Debug, Clone)]
792pub struct NotificationItemsRequest {
793 pub room_id: OwnedRoomId,
794 pub event_ids: Vec<OwnedEventId>,
795}
796
797type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
798
799#[derive(Debug, Clone)]
804pub enum RawNotificationEvent {
805 Timeline(Raw<AnySyncTimelineEvent>),
807 Invite(Raw<StrippedRoomMemberEvent>),
810}
811
812#[derive(Debug)]
815pub enum NotificationEvent {
816 Timeline(Box<AnySyncTimelineEvent>),
818 Invite(Box<StrippedRoomMemberEvent>),
820}
821
822impl NotificationEvent {
823 pub fn sender(&self) -> &UserId {
824 match self {
825 NotificationEvent::Timeline(ev) => ev.sender(),
826 NotificationEvent::Invite(ev) => &ev.sender,
827 }
828 }
829
830 fn thread_id(&self) -> Option<OwnedEventId> {
833 let NotificationEvent::Timeline(sync_timeline_event) = &self else {
834 return None;
835 };
836 let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
837 return None;
838 };
839 let content = event.original_content()?;
840 match content {
841 AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
842 Relation::Thread(thread) => Some(thread.event_id),
843 _ => None,
844 },
845 _ => None,
846 }
847 }
848}
849
850#[derive(Debug)]
852pub struct NotificationItem {
853 pub event: NotificationEvent,
855
856 pub raw_event: RawNotificationEvent,
858
859 pub sender_display_name: Option<String>,
861 pub sender_avatar_url: Option<String>,
863 pub is_sender_name_ambiguous: bool,
865
866 pub room_computed_display_name: String,
868 pub room_avatar_url: Option<String>,
870 pub room_canonical_alias: Option<String>,
872 pub room_topic: Option<String>,
874 pub room_join_rule: Option<JoinRule>,
878 pub is_room_encrypted: Option<bool>,
880 pub is_direct_message_room: bool,
882 pub joined_members_count: u64,
884
885 pub is_noisy: Option<bool>,
890 pub has_mention: Option<bool>,
891 pub thread_id: Option<OwnedEventId>,
892}
893
894impl NotificationItem {
895 async fn new(
896 room: &Room,
897 raw_event: RawNotificationEvent,
898 push_actions: Option<&[Action]>,
899 state_events: Vec<Raw<AnyStateEvent>>,
900 ) -> Result<Self, Error> {
901 let event = match &raw_event {
902 RawNotificationEvent::Timeline(raw_event) => {
903 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
904 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
905 SyncRoomMessageEvent::Original(ev),
906 )) = &mut event
907 {
908 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
909 }
910 NotificationEvent::Timeline(Box::new(event))
911 }
912 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
913 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
914 )),
915 };
916
917 let sender = match room.state() {
918 RoomState::Invited => room.invite_details().await?.inviter,
919 _ => room.get_member_no_sync(event.sender()).await?,
920 };
921
922 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
923 match &sender {
924 Some(sender) => (
925 sender.display_name().map(|s| s.to_owned()),
926 sender.avatar_url().map(|s| s.to_string()),
927 sender.name_ambiguous(),
928 ),
929 None => (None, None, false),
930 };
931
932 if sender_display_name.is_none() || sender_avatar_url.is_none() {
933 let sender_id = event.sender();
934 for ev in state_events {
935 let ev = match ev.deserialize() {
936 Ok(ev) => ev,
937 Err(err) => {
938 warn!("Failed to deserialize a state event: {err}");
939 continue;
940 }
941 };
942 if ev.sender() != sender_id {
943 continue;
944 }
945 if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
946 content,
947 ..
948 }) = ev.content()
949 {
950 if sender_display_name.is_none() {
951 sender_display_name = content.displayname;
952 }
953 if sender_avatar_url.is_none() {
954 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
955 }
956 }
957 }
958 }
959
960 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
961 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
962 let thread_id = event.thread_id().clone();
963
964 let item = NotificationItem {
965 event,
966 raw_event,
967 sender_display_name,
968 sender_avatar_url,
969 is_sender_name_ambiguous,
970 room_computed_display_name: room.display_name().await?.to_string(),
971 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
972 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
973 room_topic: room.topic(),
974 room_join_rule: room.join_rule(),
975 is_direct_message_room: room.is_direct().await?,
976 is_room_encrypted: room
977 .latest_encryption_state()
978 .await
979 .map(|state| state.is_encrypted())
980 .ok(),
981 joined_members_count: room.joined_members_count(),
982 is_noisy,
983 has_mention,
984 thread_id,
985 };
986
987 Ok(item)
988 }
989
990 pub fn is_public(&self) -> Option<bool> {
994 self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
995 }
996}
997
998#[derive(Debug, Error)]
1000pub enum Error {
1001 #[error(transparent)]
1002 BuildingLocalClient(ClientBuildError),
1003
1004 #[error("unknown room for a notification")]
1006 UnknownRoom,
1007
1008 #[error("invalid ruma event")]
1010 InvalidRumaEvent,
1011
1012 #[error("the sliding sync response doesn't include the target room")]
1015 SlidingSyncEmptyRoom,
1016
1017 #[error("the event was missing in the `/context` query")]
1018 ContextMissingEvent,
1019
1020 #[error(transparent)]
1022 SdkError(#[from] matrix_sdk::Error),
1023
1024 #[error(transparent)]
1026 StoreError(#[from] StoreError),
1027}
1028
1029#[cfg(test)]
1030mod tests {
1031 use assert_matches2::assert_let;
1032 use matrix_sdk::test_utils::mocks::MatrixMockServer;
1033 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1034 use ruma::{event_id, room_id, user_id};
1035
1036 use crate::notification_client::{NotificationItem, RawNotificationEvent};
1037
1038 #[async_test]
1039 async fn test_notification_item_returns_thread_id() {
1040 let server = MatrixMockServer::new().await;
1041 let client = server.client_builder().build().await;
1042
1043 let room_id = room_id!("!a:b.c");
1044 let thread_root_event_id = event_id!("$root:b.c");
1045 let message = EventFactory::new()
1046 .room(room_id)
1047 .sender(user_id!("@sender:b.c"))
1048 .text_msg("Threaded")
1049 .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1050 .into_raw_sync();
1051 let room = server.sync_joined_room(&client, room_id).await;
1052
1053 let raw_notification_event = RawNotificationEvent::Timeline(message);
1054 let notification_item =
1055 NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1056 .await
1057 .expect("Could not create notification item");
1058
1059 assert_let!(Some(thread_id) = notification_item.thread_id);
1060 assert_eq!(thread_id, thread_root_event_id);
1061 }
1062}