1use std::{
16 collections::{
17 btree_map::{IntoIter, Iter},
18 BTreeMap,
19 },
20 sync::{Arc, Mutex},
21 time::Duration,
22};
23
24use futures_util::{pin_mut, StreamExt as _};
25use matrix_sdk::{
26 room::Room, sleep::sleep, Client, ClientBuildError, SlidingSyncList, SlidingSyncMode,
27};
28use matrix_sdk_base::{deserialized_responses::TimelineEvent, RoomState, StoreError};
29use ruma::{
30 api::client::sync::sync_events::v5 as http,
31 assign,
32 directory::RoomTypeFilter,
33 events::{
34 room::{
35 join_rules::JoinRule,
36 member::{MembershipState, StrippedRoomMemberEvent},
37 message::{Relation, SyncRoomMessageEvent},
38 },
39 AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
40 AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
41 TimelineEventType,
42 },
43 html::RemoveReplyFallback,
44 push::Action,
45 serde::Raw,
46 uint, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
47};
48use thiserror::Error;
49use tokio::sync::Mutex as AsyncMutex;
50use tracing::{debug, info, instrument, trace, warn};
51
52use crate::{
53 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
54 sync_service::SyncService,
55 DEFAULT_SANITIZER_MODE,
56};
57
58#[derive(Clone)]
60pub enum NotificationProcessSetup {
61 MultipleProcesses,
70
71 SingleProcess { sync_service: Arc<SyncService> },
79}
80
81pub struct NotificationClient {
87 client: Client,
89
90 parent_client: Client,
92
93 process_setup: NotificationProcessSetup,
95
96 notification_sync_mutex: AsyncMutex<()>,
104
105 encryption_sync_mutex: AsyncMutex<()>,
110}
111
112impl NotificationClient {
113 const CONNECTION_ID: &'static str = "notifications";
114 const LOCK_ID: &'static str = "notifications";
115
116 pub async fn new(
118 parent_client: Client,
119 process_setup: NotificationProcessSetup,
120 ) -> Result<Self, Error> {
121 let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
122
123 Ok(NotificationClient {
124 client,
125 parent_client,
126 notification_sync_mutex: AsyncMutex::new(()),
127 encryption_sync_mutex: AsyncMutex::new(()),
128 process_setup,
129 })
130 }
131
132 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
136 self.client.get_room(room_id)
137 }
138
139 #[instrument(skip(self))]
150 pub async fn get_notification(
151 &self,
152 room_id: &RoomId,
153 event_id: &EventId,
154 ) -> Result<Option<NotificationItem>, Error> {
155 match self.get_notification_with_sliding_sync(room_id, event_id).await? {
156 NotificationStatus::Event(event) => Ok(Some(*event)),
157 NotificationStatus::EventFilteredOut => Ok(None),
158 NotificationStatus::EventNotFound => {
159 self.get_notification_with_context(room_id, event_id).await
160 }
161 }
162 }
163
164 pub async fn get_notifications(
178 &self,
179 requests: &[NotificationItemsRequest],
180 ) -> Result<BatchNotificationFetchingResult<NotificationItem>, Error> {
181 let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
182 let mut notification_items = BatchNotificationFetchingResult::new();
183
184 for request in requests {
185 for event_id in &request.event_ids {
186 match notifications.remove(event_id) {
187 Some(Ok(NotificationStatus::Event(item))) => {
188 notification_items.add_notification(event_id.to_owned(), *item);
189 }
190 Some(Ok(NotificationStatus::EventNotFound)) | None => {
191 match self.get_notification_with_context(&request.room_id, event_id).await {
192 Ok(Some(item)) => {
193 notification_items.add_notification(event_id.to_owned(), item)
194 }
195 Ok(None) => (),
197 Err(error) => notification_items
198 .mark_fetching_notification_failed(event_id.to_owned(), error),
199 }
200 }
201 Some(Ok(NotificationStatus::EventFilteredOut)) => (),
203 Some(Err(e)) => {
204 notification_items
205 .mark_fetching_notification_failed(event_id.to_owned(), e);
206 }
207 }
208 }
209 }
210
211 Ok(notification_items)
212 }
213
214 #[instrument(skip_all)]
222 async fn retry_decryption(
223 &self,
224 room: &Room,
225 raw_event: &Raw<AnySyncTimelineEvent>,
226 ) -> Result<Option<TimelineEvent>, Error> {
227 let event: AnySyncTimelineEvent =
228 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
229
230 if !is_event_encrypted(event.event_type()) {
231 return Ok(None);
232 }
233
234 let _guard = self.encryption_sync_mutex.lock().await;
236
237 let with_locking = WithLocking::from(matches!(
248 self.process_setup,
249 NotificationProcessSetup::MultipleProcesses
250 ));
251
252 let push_ctx = room.push_context().await?;
253 let sync_permit_guard = match &self.process_setup {
254 NotificationProcessSetup::MultipleProcesses => {
255 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
259 sync_permit.lock_owned().await
260 }
261
262 NotificationProcessSetup::SingleProcess { sync_service } => {
263 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
264 permit_guard
265 } else {
266 let mut wait = 200;
275
276 debug!("Encryption sync running in background");
277 for _ in 0..3 {
278 trace!("waiting for decryption…");
279
280 sleep(Duration::from_millis(wait)).await;
281
282 let new_event =
283 room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await?;
284
285 match new_event.kind {
286 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
287 utd_info, ..} => {
288 if utd_info.reason.is_missing_room_key() {
289 wait *= 2;
292 } else {
293 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
294 return Ok(None);
295 }
296 }
297 _ => {
298 trace!("Waiting succeeded and event could be decrypted!");
299 return Ok(Some(new_event));
300 }
301 }
302 }
303
304 debug!("Timeout waiting for the encryption sync to decrypt notification.");
306 return Ok(None);
307 }
308 }
309 };
310
311 let encryption_sync = EncryptionSyncService::new(
312 self.client.clone(),
313 Some((Duration::from_secs(3), Duration::from_secs(4))),
314 with_locking,
315 )
316 .await;
317
318 match encryption_sync {
323 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
324 Ok(()) => match room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await {
325 Ok(new_event) => match new_event.kind {
326 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
327 utd_info, ..
328 } => {
329 trace!(
330 "Encryption sync failed to decrypt the event: {:?}",
331 utd_info.reason
332 );
333 Ok(None)
334 }
335 _ => {
336 trace!("Encryption sync managed to decrypt the event.");
337 Ok(Some(new_event))
338 }
339 },
340 Err(err) => {
341 trace!("Encryption sync failed to decrypt the event: {err}");
342 Ok(None)
343 }
344 },
345 Err(err) => {
346 warn!("Encryption sync error: {err:#}");
347 Ok(None)
348 }
349 },
350 Err(err) => {
351 warn!("Encryption sync build error: {err:#}",);
352 Ok(None)
353 }
354 }
355 }
356
357 #[instrument(skip_all)]
376 async fn try_sliding_sync(
377 &self,
378 requests: &[NotificationItemsRequest],
379 ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
380 let _guard = self.notification_sync_mutex.lock().await;
383
384 let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
389
390 let handler_raw_notification = raw_notifications.clone();
391
392 let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
393
394 let timeline_event_handler = self.client.add_event_handler({
395 let requests = requests.clone();
396 move |raw: Raw<AnySyncTimelineEvent>| async move {
397 match &raw.get_field::<OwnedEventId>("event_id") {
398 Ok(Some(event_id)) => {
399 let request =
400 &requests.iter().find(|request| request.event_ids.contains(event_id));
401 if request.is_none() {
402 return;
403 }
404 let room_id = request.unwrap().room_id.clone();
405 for request in requests.iter() {
406 if request.event_ids.contains(event_id) {
407 handler_raw_notification.lock().unwrap().insert(
411 event_id.to_owned(),
412 (room_id, Some(RawNotificationEvent::Timeline(raw))),
413 );
414 return;
415 }
416 }
417 }
418 Ok(None) => {
419 warn!("a sync event had no event id");
420 }
421 Err(err) => {
422 warn!("a sync event id couldn't be decoded: {err}");
423 }
424 }
425 }
426 });
427
428 let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
430
431 let user_id = self.client.user_id().unwrap().to_owned();
432 let handler_raw_invites = raw_invites.clone();
433 let handler_raw_notifications = raw_notifications.clone();
434 let stripped_member_handler = self.client.add_event_handler({
435 let requests = requests.clone();
436 move |raw: Raw<StrippedRoomMemberEvent>| async move {
437 let deserialized = match raw.deserialize() {
438 Ok(d) => d,
439 Err(err) => {
440 warn!("failed to deserialize raw stripped room member event: {err}");
441 return;
442 }
443 };
444
445 trace!("received a stripped room member event");
446
447 match &raw.get_field::<OwnedEventId>("event_id") {
450 Ok(Some(event_id)) => {
451 let request =
452 &requests.iter().find(|request| request.event_ids.contains(event_id));
453 if request.is_none() {
454 return;
455 }
456 let room_id = request.unwrap().room_id.clone();
457
458 handler_raw_notifications.lock().unwrap().insert(
462 event_id.to_owned(),
463 (room_id, Some(RawNotificationEvent::Invite(raw))),
464 );
465 return;
466 }
467 Ok(None) => {
468 debug!("a room member event had no id");
469 }
470 Err(err) => {
471 debug!("a room member event id couldn't be decoded: {err}");
472 }
473 }
474
475 if deserialized.content.membership == MembershipState::Invite
477 && deserialized.state_key == user_id
478 {
479 debug!("found an invite event for the current user");
480 handler_raw_invites
484 .lock()
485 .unwrap()
486 .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
487 } else {
488 debug!("not an invite event, or not for the current user");
489 }
490 }
491 });
492
493 let required_state = vec![
495 (StateEventType::RoomEncryption, "".to_owned()),
496 (StateEventType::RoomMember, "$LAZY".to_owned()),
497 (StateEventType::RoomMember, "$ME".to_owned()),
498 (StateEventType::RoomCanonicalAlias, "".to_owned()),
499 (StateEventType::RoomName, "".to_owned()),
500 (StateEventType::RoomPowerLevels, "".to_owned()),
501 (StateEventType::CallMember, "*".to_owned()),
502 ];
503
504 let invites = SlidingSyncList::builder("invites")
505 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
506 .timeline_limit(8)
507 .required_state(required_state.clone())
508 .filters(Some(assign!(http::request::ListFilters::default(), {
509 is_invite: Some(true),
510 not_room_types: vec![RoomTypeFilter::Space],
511 })));
512
513 let sync = self
514 .client
515 .sliding_sync(Self::CONNECTION_ID)?
516 .poll_timeout(Duration::from_secs(1))
517 .network_timeout(Duration::from_secs(3))
518 .with_account_data_extension(
519 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
520 )
521 .add_list(invites)
522 .build()
523 .await?;
524
525 let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
526 sync.subscribe_to_rooms(
527 &room_ids,
528 Some(assign!(http::request::RoomSubscription::default(), {
529 required_state,
530 timeline_limit: uint!(16)
531 })),
532 true,
533 );
534
535 let mut remaining_attempts = 3;
536
537 let stream = sync.sync();
538 pin_mut!(stream);
539
540 let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
542
543 loop {
544 if stream.next().await.is_none() {
545 break;
547 }
548
549 if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
550 == expected_event_count
551 {
552 break;
554 }
555
556 remaining_attempts -= 1;
557 if remaining_attempts == 0 {
558 break;
560 }
561 }
562
563 self.client.remove_event_handler(stripped_member_handler);
564 self.client.remove_event_handler(timeline_event_handler);
565
566 let mut notifications = raw_notifications.clone().lock().unwrap().clone();
567 let mut missing_event_ids = Vec::new();
568
569 for request in requests.iter() {
571 for event_id in &request.event_ids {
572 if !notifications.contains_key(event_id) {
573 missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
574 }
575 }
576 }
577
578 for (room_id, missing_event_id) in missing_event_ids {
580 trace!("we didn't have a non-invite event, looking for invited room now");
581 if let Some(room) = self.client.get_room(&room_id) {
582 if room.state() == RoomState::Invited {
583 if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
584 notifications.insert(
585 missing_event_id.to_owned(),
586 (room_id.to_owned(), stripped_event),
587 );
588 }
589 } else {
590 debug!("the room isn't in the invited state");
591 }
592 } else {
593 debug!("the room isn't an invite");
594 }
595 }
596
597 let found = if notifications.len() == expected_event_count { "" } else { "not " };
598 trace!("all notification events have{found} been found");
599
600 Ok(notifications)
601 }
602
603 pub async fn get_notification_with_sliding_sync(
604 &self,
605 room_id: &RoomId,
606 event_id: &EventId,
607 ) -> Result<NotificationStatus, Error> {
608 let event_ids = vec![event_id.to_owned()];
609 let request = NotificationItemsRequest { room_id: room_id.to_owned(), event_ids };
610 let mut get_notifications_result =
611 self.get_notifications_with_sliding_sync(&[request]).await?;
612 get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
613 }
614
615 pub async fn get_notifications_with_sliding_sync(
620 &self,
621 requests: &[NotificationItemsRequest],
622 ) -> Result<BatchNotificationFetchingResult<NotificationStatus>, Error> {
623 let raw_events = self.try_sliding_sync(requests).await?;
624
625 let mut result = BatchNotificationFetchingResult::new();
626
627 for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
628 let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
630
631 if let Some(raw_event) = raw_event {
632 let (raw_event, push_actions) = match &raw_event {
633 RawNotificationEvent::Timeline(timeline_event) => {
634 match self.retry_decryption(&room, timeline_event).await {
636 Ok(Some(mut timeline_event)) => {
637 let push_actions = timeline_event.push_actions.take();
638 (
639 RawNotificationEvent::Timeline(timeline_event.into_raw()),
640 push_actions,
641 )
642 }
643 Ok(None) => {
644 match room.event_push_actions(timeline_event).await {
645 Ok(push_actions) => (raw_event.clone(), push_actions),
646 Err(error) => {
647 result.mark_fetching_notification_failed(
649 event_id,
650 error.into(),
651 );
652 continue;
653 }
654 }
655 }
656 Err(error) => {
657 result.mark_fetching_notification_failed(event_id, error);
658 continue;
659 }
660 }
661 }
662 RawNotificationEvent::Invite(invite_event) => {
663 match room.event_push_actions(invite_event).await {
665 Ok(push_actions) => {
666 (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
667 }
668 Err(error) => {
669 result.mark_fetching_notification_failed(event_id, error.into());
670 continue;
671 }
672 }
673 }
674 };
675
676 let should_notify = push_actions
677 .as_ref()
678 .map(|actions| actions.iter().any(|a| a.should_notify()))
679 .unwrap_or(false);
680
681 if !should_notify {
682 result.add_notification(event_id, NotificationStatus::EventFilteredOut);
683 } else {
684 let notification_status = NotificationItem::new(
685 &room,
686 raw_event,
687 push_actions.as_deref(),
688 Vec::new(),
689 )
690 .await
691 .map(|event| NotificationStatus::Event(Box::new(event)));
692
693 match notification_status {
694 Ok(notification_item) => {
695 result.add_notification(event_id, notification_item);
696 }
697 Err(error) => {
698 result.mark_fetching_notification_failed(event_id, error);
699 }
700 }
701 }
702 } else {
703 result.add_notification(event_id, NotificationStatus::EventNotFound);
704 }
705 }
706
707 Ok(result)
708 }
709
710 pub async fn get_notification_with_context(
723 &self,
724 room_id: &RoomId,
725 event_id: &EventId,
726 ) -> Result<Option<NotificationItem>, Error> {
727 info!("fetching notification event with a /context query");
728
729 let Some(room) = self.parent_client.get_room(room_id) else {
731 return Err(Error::UnknownRoom);
732 };
733
734 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
735
736 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
737 let state_events = response.state;
738
739 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
740 timeline_event = decrypted_event;
741 }
742
743 if let Some(actions) = timeline_event.push_actions.as_ref() {
744 if !actions.iter().any(|a| a.should_notify()) {
745 return Ok(None);
746 }
747 }
748
749 let push_actions = timeline_event.push_actions.take();
750 Ok(Some(
751 NotificationItem::new(
752 &room,
753 RawNotificationEvent::Timeline(timeline_event.into_raw()),
754 push_actions.as_deref(),
755 state_events,
756 )
757 .await?,
758 ))
759 }
760}
761
762fn is_event_encrypted(event_type: TimelineEventType) -> bool {
763 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
764
765 #[cfg(feature = "unstable-msc3956")]
766 let is_still_encrypted =
767 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
768
769 is_still_encrypted
770}
771
772#[derive(Debug)]
773pub enum NotificationStatus {
774 Event(Box<NotificationItem>),
775 EventNotFound,
776 EventFilteredOut,
777}
778
779#[derive(Debug, Clone)]
780pub struct NotificationItemsRequest {
781 pub room_id: OwnedRoomId,
782 pub event_ids: Vec<OwnedEventId>,
783}
784
785#[derive(Default)]
786pub struct BatchNotificationFetchingResult<T> {
787 notifications: BTreeMap<OwnedEventId, Result<T, Error>>,
788}
789
790impl<T> BatchNotificationFetchingResult<T> {
791 pub fn new() -> Self {
792 Self { notifications: BTreeMap::new() }
793 }
794
795 fn add_notification(&mut self, event_id: OwnedEventId, notification: T) {
796 self.notifications.insert(event_id, Ok(notification));
797 }
798
799 fn mark_fetching_notification_failed(&mut self, event_id: OwnedEventId, error: Error) {
800 self.notifications.insert(event_id, Err(error));
801 }
802
803 pub fn remove(&mut self, id: &EventId) -> Option<Result<T, Error>> {
804 self.notifications.remove(id)
805 }
806
807 pub fn iter(&self) -> Iter<'_, OwnedEventId, Result<T, Error>> {
808 self.notifications.iter()
809 }
810}
811
812impl<T> IntoIterator for BatchNotificationFetchingResult<T> {
813 type Item = (OwnedEventId, Result<T, Error>);
814 type IntoIter = IntoIter<OwnedEventId, Result<T, Error>>;
815 fn into_iter(self) -> Self::IntoIter {
816 self.notifications.into_iter()
817 }
818}
819
820#[derive(Debug, Clone)]
825pub enum RawNotificationEvent {
826 Timeline(Raw<AnySyncTimelineEvent>),
828 Invite(Raw<StrippedRoomMemberEvent>),
831}
832
833#[derive(Debug)]
836pub enum NotificationEvent {
837 Timeline(Box<AnySyncTimelineEvent>),
839 Invite(Box<StrippedRoomMemberEvent>),
841}
842
843impl NotificationEvent {
844 pub fn sender(&self) -> &UserId {
845 match self {
846 NotificationEvent::Timeline(ev) => ev.sender(),
847 NotificationEvent::Invite(ev) => &ev.sender,
848 }
849 }
850
851 fn thread_id(&self) -> Option<OwnedEventId> {
854 let NotificationEvent::Timeline(sync_timeline_event) = &self else {
855 return None;
856 };
857 let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
858 return None;
859 };
860 let content = event.original_content()?;
861 match content {
862 AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
863 Relation::Thread(thread) => Some(thread.event_id),
864 _ => None,
865 },
866 _ => None,
867 }
868 }
869}
870
871#[derive(Debug)]
873pub struct NotificationItem {
874 pub event: NotificationEvent,
876
877 pub raw_event: RawNotificationEvent,
879
880 pub sender_display_name: Option<String>,
882 pub sender_avatar_url: Option<String>,
884 pub is_sender_name_ambiguous: bool,
886
887 pub room_computed_display_name: String,
889 pub room_avatar_url: Option<String>,
891 pub room_canonical_alias: Option<String>,
893 pub room_join_rule: JoinRule,
895 pub is_room_encrypted: Option<bool>,
897 pub is_room_public: bool,
899 pub is_direct_message_room: bool,
901 pub joined_members_count: u64,
903
904 pub is_noisy: Option<bool>,
909 pub has_mention: Option<bool>,
910 pub thread_id: Option<OwnedEventId>,
911}
912
913impl NotificationItem {
914 async fn new(
915 room: &Room,
916 raw_event: RawNotificationEvent,
917 push_actions: Option<&[Action]>,
918 state_events: Vec<Raw<AnyStateEvent>>,
919 ) -> Result<Self, Error> {
920 let event = match &raw_event {
921 RawNotificationEvent::Timeline(raw_event) => {
922 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
923 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
924 SyncRoomMessageEvent::Original(ev),
925 )) = &mut event
926 {
927 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
928 }
929 NotificationEvent::Timeline(Box::new(event))
930 }
931 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
932 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
933 )),
934 };
935
936 let sender = match room.state() {
937 RoomState::Invited => room.invite_details().await?.inviter,
938 _ => room.get_member_no_sync(event.sender()).await?,
939 };
940
941 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
942 match &sender {
943 Some(sender) => (
944 sender.display_name().map(|s| s.to_owned()),
945 sender.avatar_url().map(|s| s.to_string()),
946 sender.name_ambiguous(),
947 ),
948 None => (None, None, false),
949 };
950
951 if sender_display_name.is_none() || sender_avatar_url.is_none() {
952 let sender_id = event.sender();
953 for ev in state_events {
954 let Ok(ev) = ev.deserialize() else {
955 continue;
956 };
957 if ev.sender() != sender_id {
958 continue;
959 }
960 if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
961 content,
962 ..
963 }) = ev.content()
964 {
965 if sender_display_name.is_none() {
966 sender_display_name = content.displayname;
967 }
968 if sender_avatar_url.is_none() {
969 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
970 }
971 }
972 }
973 }
974
975 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
976 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
977 let thread_id = event.thread_id().clone();
978
979 let item = NotificationItem {
980 event,
981 raw_event,
982 sender_display_name,
983 sender_avatar_url,
984 is_sender_name_ambiguous,
985 room_computed_display_name: room.display_name().await?.to_string(),
986 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
987 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
988 room_join_rule: room.join_rule(),
989 is_direct_message_room: room.is_direct().await?,
990 is_room_public: room.is_public(),
991 is_room_encrypted: room
992 .latest_encryption_state()
993 .await
994 .map(|state| state.is_encrypted())
995 .ok(),
996 joined_members_count: room.joined_members_count(),
997 is_noisy,
998 has_mention,
999 thread_id,
1000 };
1001
1002 Ok(item)
1003 }
1004}
1005
1006#[derive(Debug, Error)]
1008pub enum Error {
1009 #[error(transparent)]
1010 BuildingLocalClient(ClientBuildError),
1011
1012 #[error("unknown room for a notification")]
1014 UnknownRoom,
1015
1016 #[error("invalid ruma event")]
1018 InvalidRumaEvent,
1019
1020 #[error("the sliding sync response doesn't include the target room")]
1023 SlidingSyncEmptyRoom,
1024
1025 #[error("the event was missing in the `/context` query")]
1026 ContextMissingEvent,
1027
1028 #[error(transparent)]
1030 SdkError(#[from] matrix_sdk::Error),
1031
1032 #[error(transparent)]
1034 StoreError(#[from] StoreError),
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039 use assert_matches2::assert_let;
1040 use matrix_sdk::test_utils::mocks::MatrixMockServer;
1041 use matrix_sdk_test::{async_test, event_factory::EventFactory};
1042 use ruma::{event_id, room_id, user_id};
1043
1044 use crate::notification_client::{NotificationItem, RawNotificationEvent};
1045
1046 #[async_test]
1047 async fn test_notification_item_returns_thread_id() {
1048 let server = MatrixMockServer::new().await;
1049 let client = server.client_builder().build().await;
1050
1051 let room_id = room_id!("!a:b.c");
1052 let thread_root_event_id = event_id!("$root:b.c");
1053 let message = EventFactory::new()
1054 .room(room_id)
1055 .sender(user_id!("@sender:b.c"))
1056 .text_msg("Threaded")
1057 .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1058 .into_raw_sync();
1059 let room = server.sync_joined_room(&client, room_id).await;
1060
1061 let raw_notification_event = RawNotificationEvent::Timeline(message);
1062 let notification_item =
1063 NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1064 .await
1065 .expect("Could not create notification item");
1066
1067 assert_let!(Some(thread_id) = notification_item.thread_id);
1068 assert_eq!(thread_id, thread_root_event_id);
1069 }
1070}