1use std::{
16 sync::{Arc, Mutex},
17 time::Duration,
18};
19
20use futures_util::{pin_mut, StreamExt as _};
21use matrix_sdk::{
22 room::Room, sleep::sleep, Client, ClientBuildError, SlidingSyncList, SlidingSyncMode,
23};
24use matrix_sdk_base::{deserialized_responses::TimelineEvent, RoomState, StoreError};
25use ruma::{
26 api::client::sync::sync_events::v5 as http,
27 assign,
28 directory::RoomTypeFilter,
29 events::{
30 room::{
31 member::{MembershipState, StrippedRoomMemberEvent},
32 message::SyncRoomMessageEvent,
33 },
34 AnyFullStateEventContent, AnyStateEvent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
35 FullStateEventContent, StateEventType, TimelineEventType,
36 },
37 html::RemoveReplyFallback,
38 push::Action,
39 serde::Raw,
40 uint, EventId, OwnedEventId, RoomId, UserId,
41};
42use thiserror::Error;
43use tokio::sync::Mutex as AsyncMutex;
44use tracing::{debug, info, instrument, trace, warn};
45
46use crate::{
47 encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
48 sync_service::SyncService,
49 DEFAULT_SANITIZER_MODE,
50};
51
52#[derive(Clone)]
54pub enum NotificationProcessSetup {
55 MultipleProcesses,
64
65 SingleProcess { sync_service: Arc<SyncService> },
73}
74
75pub struct NotificationClient {
81 client: Client,
83
84 parent_client: Client,
86
87 process_setup: NotificationProcessSetup,
89
90 notification_sync_mutex: AsyncMutex<()>,
98
99 encryption_sync_mutex: AsyncMutex<()>,
104}
105
106impl NotificationClient {
107 const CONNECTION_ID: &'static str = "notifications";
108 const LOCK_ID: &'static str = "notifications";
109
110 pub async fn new(
112 parent_client: Client,
113 process_setup: NotificationProcessSetup,
114 ) -> Result<Self, Error> {
115 let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
116
117 Ok(NotificationClient {
118 client,
119 parent_client,
120 notification_sync_mutex: AsyncMutex::new(()),
121 encryption_sync_mutex: AsyncMutex::new(()),
122 process_setup,
123 })
124 }
125
126 #[instrument(skip(self))]
137 pub async fn get_notification(
138 &self,
139 room_id: &RoomId,
140 event_id: &EventId,
141 ) -> Result<Option<NotificationItem>, Error> {
142 match self.get_notification_with_sliding_sync(room_id, event_id).await? {
143 NotificationStatus::Event(event) => Ok(Some(event)),
144 NotificationStatus::EventFilteredOut => Ok(None),
145 NotificationStatus::EventNotFound => {
146 self.get_notification_with_context(room_id, event_id).await
147 }
148 }
149 }
150
151 #[instrument(skip_all)]
159 async fn retry_decryption(
160 &self,
161 room: &Room,
162 raw_event: &Raw<AnySyncTimelineEvent>,
163 ) -> Result<Option<TimelineEvent>, Error> {
164 let event: AnySyncTimelineEvent =
165 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
166
167 if !is_event_encrypted(event.event_type()) {
168 return Ok(None);
169 }
170
171 let _guard = self.encryption_sync_mutex.lock().await;
173
174 let with_locking = WithLocking::from(matches!(
185 self.process_setup,
186 NotificationProcessSetup::MultipleProcesses
187 ));
188
189 let sync_permit_guard = match &self.process_setup {
190 NotificationProcessSetup::MultipleProcesses => {
191 let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
195 sync_permit.lock_owned().await
196 }
197
198 NotificationProcessSetup::SingleProcess { sync_service } => {
199 if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
200 permit_guard
201 } else {
202 let mut wait = 200;
211
212 debug!("Encryption sync running in background");
213 for _ in 0..3 {
214 trace!("waiting for decryption…");
215
216 sleep(Duration::from_millis(wait)).await;
217
218 let new_event = room.decrypt_event(raw_event.cast_ref()).await?;
219
220 match new_event.kind {
221 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
222 utd_info, ..} => {
223 if utd_info.reason.is_missing_room_key() {
224 wait *= 2;
227 } else {
228 debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
229 return Ok(None);
230 }
231 }
232 _ => {
233 trace!("Waiting succeeded and event could be decrypted!");
234 return Ok(Some(new_event));
235 }
236 }
237 }
238
239 debug!("Timeout waiting for the encryption sync to decrypt notification.");
241 return Ok(None);
242 }
243 }
244 };
245
246 let encryption_sync = EncryptionSyncService::new(
247 self.client.clone(),
248 Some((Duration::from_secs(3), Duration::from_secs(4))),
249 with_locking,
250 )
251 .await;
252
253 match encryption_sync {
258 Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
259 Ok(()) => match room.decrypt_event(raw_event.cast_ref()).await {
260 Ok(new_event) => match new_event.kind {
261 matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
262 utd_info, ..
263 } => {
264 trace!(
265 "Encryption sync failed to decrypt the event: {:?}",
266 utd_info.reason
267 );
268 Ok(None)
269 }
270 _ => {
271 trace!("Encryption sync managed to decrypt the event.");
272 Ok(Some(new_event))
273 }
274 },
275 Err(err) => {
276 trace!("Encryption sync failed to decrypt the event: {err}");
277 Ok(None)
278 }
279 },
280 Err(err) => {
281 warn!("Encryption sync error: {err:#}");
282 Ok(None)
283 }
284 },
285 Err(err) => {
286 warn!("Encryption sync build error: {err:#}",);
287 Ok(None)
288 }
289 }
290 }
291
292 #[instrument(skip_all)]
311 async fn try_sliding_sync(
312 &self,
313 room_id: &RoomId,
314 event_id: &EventId,
315 ) -> Result<Option<RawNotificationEvent>, Error> {
316 let _guard = self.notification_sync_mutex.lock().await;
319
320 let raw_notification = Arc::new(Mutex::new(None));
325
326 let handler_raw_notification = raw_notification.clone();
327 let target_event_id = event_id.to_owned();
328
329 let timeline_event_handler =
330 self.client.add_event_handler(move |raw: Raw<AnySyncTimelineEvent>| async move {
331 match raw.get_field::<OwnedEventId>("event_id") {
332 Ok(Some(event_id)) => {
333 if event_id == target_event_id {
334 *handler_raw_notification.lock().unwrap() =
337 Some(RawNotificationEvent::Timeline(raw));
338 }
339 }
340 Ok(None) => {
341 warn!("a sync event had no event id");
342 }
343 Err(err) => {
344 warn!("a sync event id couldn't be decoded: {err}");
345 }
346 }
347 });
348
349 let raw_invite = Arc::new(Mutex::new(None));
351
352 let target_event_id = event_id.to_owned();
353 let user_id = self.client.user_id().unwrap().to_owned();
354 let handler_raw_invite = raw_invite.clone();
355 let handler_raw_notification = raw_notification.clone();
356 let stripped_member_handler =
357 self.client.add_event_handler(move |raw: Raw<StrippedRoomMemberEvent>| async move {
358 let deserialized = match raw.deserialize() {
359 Ok(d) => d,
360 Err(err) => {
361 warn!("failed to deserialize raw stripped room member event: {err}");
362 return;
363 }
364 };
365
366 trace!("received a stripped room member event");
367
368 match raw.get_field::<OwnedEventId>("event_id") {
371 Ok(Some(event_id)) => {
372 if event_id == target_event_id {
373 *handler_raw_notification.lock().unwrap() =
376 Some(RawNotificationEvent::Invite(raw));
377 return;
378 }
379 }
380 Ok(None) => {
381 debug!("a room member event had no id");
382 }
383 Err(err) => {
384 debug!("a room member event id couldn't be decoded: {err}");
385 }
386 }
387
388 if deserialized.content.membership == MembershipState::Invite
390 && deserialized.state_key == user_id
391 {
392 debug!("found an invite event for the current user");
393 *handler_raw_invite.lock().unwrap() = Some(RawNotificationEvent::Invite(raw));
397 } else {
398 debug!("not an invite event, or not for the current user");
399 }
400 });
401
402 let required_state = vec![
404 (StateEventType::RoomEncryption, "".to_owned()),
405 (StateEventType::RoomMember, "$LAZY".to_owned()),
406 (StateEventType::RoomMember, "$ME".to_owned()),
407 (StateEventType::RoomCanonicalAlias, "".to_owned()),
408 (StateEventType::RoomName, "".to_owned()),
409 (StateEventType::RoomPowerLevels, "".to_owned()),
410 ];
411
412 let invites = SlidingSyncList::builder("invites")
413 .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
414 .timeline_limit(8)
415 .required_state(required_state.clone())
416 .filters(Some(assign!(http::request::ListFilters::default(), {
417 is_invite: Some(true),
418 not_room_types: vec![RoomTypeFilter::Space],
419 })));
420
421 let sync = self
422 .client
423 .sliding_sync(Self::CONNECTION_ID)?
424 .poll_timeout(Duration::from_secs(1))
425 .network_timeout(Duration::from_secs(3))
426 .with_account_data_extension(
427 assign!(http::request::AccountData::default(), { enabled: Some(true) }),
428 )
429 .add_list(invites)
430 .build()
431 .await?;
432
433 sync.subscribe_to_rooms(
434 &[room_id],
435 Some(assign!(http::request::RoomSubscription::default(), {
436 required_state,
437 timeline_limit: uint!(16)
438 })),
439 true,
440 );
441
442 let mut remaining_attempts = 3;
443
444 let stream = sync.sync();
445 pin_mut!(stream);
446
447 loop {
448 if stream.next().await.is_none() {
449 break;
451 }
452
453 if raw_notification.lock().unwrap().is_some() || raw_invite.lock().unwrap().is_some() {
454 break;
456 }
457
458 remaining_attempts -= 1;
459 if remaining_attempts == 0 {
460 break;
462 }
463 }
464
465 self.client.remove_event_handler(stripped_member_handler);
466 self.client.remove_event_handler(timeline_event_handler);
467
468 let mut maybe_event = raw_notification.lock().unwrap().take();
469
470 if maybe_event.is_none() {
471 trace!("we didn't have a non-invite event, looking for invited room now");
472 if let Some(room) = self.client.get_room(room_id) {
473 if room.state() == RoomState::Invited {
474 maybe_event = raw_invite.lock().unwrap().take();
475 } else {
476 debug!("the room isn't in the invited state");
477 }
478 } else {
479 debug!("the room isn't an invite");
480 }
481 }
482
483 let found = if maybe_event.is_some() { "" } else { "not " };
484 trace!("the notification event has been {found}found");
485
486 Ok(maybe_event)
487 }
488
489 pub async fn get_notification_with_sliding_sync(
494 &self,
495 room_id: &RoomId,
496 event_id: &EventId,
497 ) -> Result<NotificationStatus, Error> {
498 let Some(mut raw_event) = self.try_sliding_sync(room_id, event_id).await? else {
499 return Ok(NotificationStatus::EventNotFound);
500 };
501
502 let Some(room) = self.client.get_room(room_id) else { return Err(Error::UnknownRoom) };
504
505 let push_actions = match &raw_event {
506 RawNotificationEvent::Timeline(timeline_event) => {
507 if let Some(mut timeline_event) =
509 self.retry_decryption(&room, timeline_event).await?
510 {
511 let push_actions = timeline_event.push_actions.take();
512 raw_event = RawNotificationEvent::Timeline(timeline_event.into_raw());
513 push_actions
514 } else {
515 room.event_push_actions(timeline_event).await?
516 }
517 }
518 RawNotificationEvent::Invite(invite_event) => {
519 room.event_push_actions(invite_event).await?
521 }
522 };
523
524 if let Some(push_actions) = &push_actions {
525 if !push_actions.iter().any(|a| a.should_notify()) {
526 return Ok(NotificationStatus::EventFilteredOut);
527 }
528 }
529
530 Ok(NotificationStatus::Event(
531 NotificationItem::new(&room, raw_event, push_actions.as_deref(), Vec::new()).await?,
532 ))
533 }
534
535 pub async fn get_notification_with_context(
548 &self,
549 room_id: &RoomId,
550 event_id: &EventId,
551 ) -> Result<Option<NotificationItem>, Error> {
552 info!("fetching notification event with a /context query");
553
554 let Some(room) = self.parent_client.get_room(room_id) else {
556 return Err(Error::UnknownRoom);
557 };
558
559 let response = room.event_with_context(event_id, true, uint!(0), None).await?;
560
561 let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
562 let state_events = response.state;
563
564 if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
565 timeline_event = decrypted_event;
566 }
567
568 if let Some(actions) = timeline_event.push_actions.as_ref() {
569 if !actions.iter().any(|a| a.should_notify()) {
570 return Ok(None);
571 }
572 }
573
574 let push_actions = timeline_event.push_actions.take();
575 Ok(Some(
576 NotificationItem::new(
577 &room,
578 RawNotificationEvent::Timeline(timeline_event.into_raw()),
579 push_actions.as_deref(),
580 state_events,
581 )
582 .await?,
583 ))
584 }
585}
586
587fn is_event_encrypted(event_type: TimelineEventType) -> bool {
588 let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
589
590 #[cfg(feature = "unstable-msc3956")]
591 let is_still_encrypted =
592 is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
593
594 is_still_encrypted
595}
596
597#[derive(Debug)]
598pub enum NotificationStatus {
599 Event(NotificationItem),
600 EventNotFound,
601 EventFilteredOut,
602}
603
604#[derive(Debug)]
609pub enum RawNotificationEvent {
610 Timeline(Raw<AnySyncTimelineEvent>),
612 Invite(Raw<StrippedRoomMemberEvent>),
615}
616
617#[derive(Debug)]
620pub enum NotificationEvent {
621 Timeline(AnySyncTimelineEvent),
623 Invite(StrippedRoomMemberEvent),
625}
626
627impl NotificationEvent {
628 pub fn sender(&self) -> &UserId {
629 match self {
630 NotificationEvent::Timeline(ev) => ev.sender(),
631 NotificationEvent::Invite(ev) => &ev.sender,
632 }
633 }
634}
635
636#[derive(Debug)]
638pub struct NotificationItem {
639 pub event: NotificationEvent,
641
642 pub raw_event: RawNotificationEvent,
644
645 pub sender_display_name: Option<String>,
647 pub sender_avatar_url: Option<String>,
649 pub is_sender_name_ambiguous: bool,
651
652 pub room_computed_display_name: String,
654 pub room_avatar_url: Option<String>,
656 pub room_canonical_alias: Option<String>,
658 pub is_room_encrypted: Option<bool>,
660 pub is_direct_message_room: bool,
662 pub joined_members_count: u64,
664
665 pub is_noisy: Option<bool>,
670 pub has_mention: Option<bool>,
671}
672
673impl NotificationItem {
674 async fn new(
675 room: &Room,
676 raw_event: RawNotificationEvent,
677 push_actions: Option<&[Action]>,
678 state_events: Vec<Raw<AnyStateEvent>>,
679 ) -> Result<Self, Error> {
680 let event = match &raw_event {
681 RawNotificationEvent::Timeline(raw_event) => {
682 let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
683 if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
684 SyncRoomMessageEvent::Original(ev),
685 )) = &mut event
686 {
687 ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
688 }
689 NotificationEvent::Timeline(event)
690 }
691 RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(
692 raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
693 ),
694 };
695
696 let sender = match room.state() {
697 RoomState::Invited => room.invite_details().await?.inviter,
698 _ => room.get_member_no_sync(event.sender()).await?,
699 };
700
701 let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
702 match &sender {
703 Some(sender) => (
704 sender.display_name().map(|s| s.to_owned()),
705 sender.avatar_url().map(|s| s.to_string()),
706 sender.name_ambiguous(),
707 ),
708 None => (None, None, false),
709 };
710
711 if sender_display_name.is_none() || sender_avatar_url.is_none() {
712 let sender_id = event.sender();
713 for ev in state_events {
714 let Ok(ev) = ev.deserialize() else {
715 continue;
716 };
717 if ev.sender() != sender_id {
718 continue;
719 }
720 if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
721 content,
722 ..
723 }) = ev.content()
724 {
725 if sender_display_name.is_none() {
726 sender_display_name = content.displayname;
727 }
728 if sender_avatar_url.is_none() {
729 sender_avatar_url = content.avatar_url.map(|url| url.to_string());
730 }
731 }
732 }
733 }
734
735 let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
736 let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
737
738 let item = NotificationItem {
739 event,
740 raw_event,
741 sender_display_name,
742 sender_avatar_url,
743 is_sender_name_ambiguous,
744 room_computed_display_name: room.display_name().await?.to_string(),
745 room_avatar_url: room.avatar_url().map(|s| s.to_string()),
746 room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
747 is_direct_message_room: room.is_direct().await?,
748 is_room_encrypted: room.is_encrypted().await.ok(),
749 joined_members_count: room.joined_members_count(),
750 is_noisy,
751 has_mention,
752 };
753
754 Ok(item)
755 }
756}
757
758#[derive(Debug, Error)]
760pub enum Error {
761 #[error(transparent)]
762 BuildingLocalClient(ClientBuildError),
763
764 #[error("unknown room for a notification")]
766 UnknownRoom,
767
768 #[error("invalid ruma event")]
770 InvalidRumaEvent,
771
772 #[error("the sliding sync response doesn't include the target room")]
775 SlidingSyncEmptyRoom,
776
777 #[error("the event was missing in the `/context` query")]
778 ContextMissingEvent,
779
780 #[error(transparent)]
782 SdkError(#[from] matrix_sdk::Error),
783
784 #[error(transparent)]
786 StoreError(#[from] StoreError),
787}