Skip to main content

matrix_sdk_ui/
notification_client.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeMap,
17    ops::Deref,
18    sync::{Arc, Mutex},
19    time::Duration,
20};
21
22use futures_util::{StreamExt as _, pin_mut};
23use matrix_sdk::{
24    Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
25};
26use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
27use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
28use ruma::{
29    EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
30    api::client::sync::sync_events::v5 as http,
31    assign,
32    events::{
33        AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
34        AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
35        TimelineEventType,
36        room::{
37            encrypted::OriginalSyncRoomEncryptedEvent,
38            join_rules::JoinRule,
39            member::{MembershipState, StrippedRoomMemberEvent},
40            message::{Relation, SyncRoomMessageEvent},
41        },
42    },
43    html::RemoveReplyFallback,
44    push::Action,
45    serde::Raw,
46    uint,
47};
48use thiserror::Error;
49use tokio::sync::Mutex as AsyncMutex;
50use tracing::{debug, info, instrument, trace, warn};
51
52use crate::{
53    DEFAULT_SANITIZER_MODE,
54    encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService},
55    sync_service::SyncService,
56};
57
58/// What kind of process setup do we have for this notification client?
59#[derive(Clone)]
60pub enum NotificationProcessSetup {
61    /// The notification client may run on a separate process than the rest of
62    /// the app.
63    ///
64    /// For instance, this is the case on iOS, where notifications are handled
65    /// in a separate process (the Notification Service Extension, aka NSE).
66    ///
67    /// In that case, a cross-process lock will be used to coordinate writes
68    /// into the stores handled by the SDK.
69    MultipleProcesses,
70
71    /// The notification client runs in the same process as the rest of the
72    /// `Client` performing syncs.
73    ///
74    /// For instance, this is the case on Android, where a notification will
75    /// wake up the main app process.
76    ///
77    /// In that case, a smart reference to the [`SyncService`] must be provided.
78    SingleProcess { sync_service: Arc<SyncService> },
79}
80
81/// A client specialized for handling push notifications received over the
82/// network, for an app.
83///
84/// In particular, it takes care of running a full decryption sync, in case the
85/// event in the notification was impossible to decrypt beforehand.
86pub struct NotificationClient {
87    /// SDK client that uses an in-memory state store.
88    client: Client,
89
90    /// SDK client that uses the same state store as the caller's context.
91    parent_client: Client,
92
93    /// Is the notification client running on its own process or not?
94    process_setup: NotificationProcessSetup,
95
96    /// A mutex to serialize requests to the notifications sliding sync.
97    ///
98    /// If several notifications come in at the same time (e.g. network was
99    /// unreachable because of airplane mode or something similar), then we
100    /// need to make sure that repeated calls to `get_notification` won't
101    /// cause multiple requests with the same `conn_id` we're using for
102    /// notifications. This mutex solves this by sequentializing the requests.
103    notification_sync_mutex: AsyncMutex<()>,
104
105    /// A mutex to serialize requests to the encryption sliding sync that's used
106    /// in case we didn't have the keys to decipher an event.
107    ///
108    /// Same reasoning as [`Self::notification_sync_mutex`].
109    encryption_sync_mutex: AsyncMutex<()>,
110}
111
112impl NotificationClient {
113    const CONNECTION_ID: &'static str = "notifications";
114    const LOCK_ID: &'static str = "notifications";
115
116    /// Create a new notification client.
117    pub async fn new(
118        parent_client: Client,
119        process_setup: NotificationProcessSetup,
120    ) -> Result<Self, Error> {
121        // Only create the lock id if cross process lock is needed (multiple processes)
122        let cross_process_store_config = match process_setup {
123            NotificationProcessSetup::MultipleProcesses => {
124                CrossProcessLockConfig::multi_process(Self::LOCK_ID)
125            }
126            NotificationProcessSetup::SingleProcess { .. } => CrossProcessLockConfig::SingleProcess,
127        };
128        let client = parent_client.notification_client(cross_process_store_config).await?;
129
130        Ok(NotificationClient {
131            client,
132            parent_client,
133            notification_sync_mutex: AsyncMutex::new(()),
134            encryption_sync_mutex: AsyncMutex::new(()),
135            process_setup,
136        })
137    }
138
139    /// Fetches a room by its ID using the in-memory state store backed client.
140    /// Useful to retrieve room information after running the limited
141    /// notification client sliding sync loop.
142    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
143        self.client.get_room(room_id)
144    }
145
146    /// Fetches the content of a notification.
147    ///
148    /// This will first try to get the notification using a short-lived sliding
149    /// sync, and if the sliding-sync can't find the event, then it'll use a
150    /// `/context` query to find the event with associated member information.
151    ///
152    /// An error result means that we couldn't resolve the notification; in that
153    /// case, a dummy notification may be displayed instead.
154    #[instrument(skip(self))]
155    pub async fn get_notification(
156        &self,
157        room_id: &RoomId,
158        event_id: &EventId,
159    ) -> Result<NotificationStatus, Error> {
160        let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
161        match status {
162            NotificationStatus::Event(..)
163            | NotificationStatus::EventFilteredOut
164            | NotificationStatus::EventRedacted => Ok(status),
165            NotificationStatus::EventNotFound => {
166                self.get_notification_with_context(room_id, event_id).await
167            }
168        }
169    }
170
171    /// Fetches the content of several notifications.
172    ///
173    /// This will first try to get the notifications using a short-lived sliding
174    /// sync, and if the sliding-sync can't find the events, then it'll use a
175    /// `/context` query to find the events with associated member information.
176    ///
177    /// An error result at the top level means that something failed when trying
178    /// to set up the notification fetching.
179    ///
180    /// For each notification item you can also receive an error, which means
181    /// something failed when trying to fetch that particular notification
182    /// (decryption, fetching push actions, etc.); in that case, a dummy
183    /// notification may be displayed instead.
184    pub async fn get_notifications(
185        &self,
186        requests: &[NotificationItemsRequest],
187    ) -> Result<BatchNotificationFetchingResult, Error> {
188        let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
189
190        for request in requests {
191            for event_id in &request.event_ids {
192                match notifications.get_mut(event_id) {
193                    // If the notification for a given event wasn't found with sliding sync, try
194                    // with a /context for each event.
195                    Some(Ok(NotificationStatus::EventNotFound)) | None => {
196                        notifications.insert(
197                            event_id.to_owned(),
198                            self.get_notification_with_context(&request.room_id, event_id).await,
199                        );
200                    }
201
202                    _ => {}
203                }
204            }
205        }
206
207        Ok(notifications)
208    }
209
210    /// Run an encryption sync loop, in case an event is still encrypted.
211    ///
212    /// Will return `Ok(Some)` if and only if:
213    /// - the event was encrypted,
214    /// - we successfully ran an encryption sync or waited long enough for an
215    ///   existing encryption sync to decrypt the event.
216    ///
217    /// Otherwise, if the event was not encrypted, or couldn't be decrypted
218    /// (without causing a fatal error), will return `Ok(None)`.
219    #[instrument(skip_all)]
220    async fn retry_decryption(
221        &self,
222        room: &Room,
223        raw_event: &Raw<AnySyncTimelineEvent>,
224    ) -> Result<Option<TimelineEvent>, Error> {
225        let event: AnySyncTimelineEvent =
226            raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
227
228        if !is_event_encrypted(event.event_type()) {
229            return Ok(None);
230        }
231
232        // Serialize calls to this function.
233        let _guard = self.encryption_sync_mutex.lock().await;
234
235        // The message is still encrypted, and the client is configured to retry
236        // decryption.
237        //
238        // Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop:
239        // - the first iteration allows to get SS events as well as send e2ee requests.
240        // - the second one let the SS homeserver forward events triggered by the
241        //   sending of e2ee requests.
242        //
243        // Keep timeouts small for both, since we might be short on time.
244
245        let push_ctx = room.push_context().await?;
246        let sync_permit_guard = match &self.process_setup {
247            NotificationProcessSetup::MultipleProcesses => {
248                // We're running on our own process, dedicated for notifications. In that case,
249                // create a dummy sync permit; we're guaranteed there's at most one since we've
250                // acquired the `encryption_sync_mutex' lock here.
251                let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
252                sync_permit.lock_owned().await
253            }
254
255            NotificationProcessSetup::SingleProcess { sync_service } => {
256                if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
257                    permit_guard
258                } else {
259                    // There's already a sync service active, thus the encryption sync is already
260                    // running elsewhere. As a matter of fact, if the event was encrypted, that
261                    // means we were racing against the encryption sync. Wait a bit, attempt to
262                    // decrypt, and carry on.
263
264                    // We repeat the sleep 3 times at most, each iteration we
265                    // double the amount of time waited, so overall we may wait up to 7 times this
266                    // amount.
267                    let mut wait = 200;
268
269                    debug!("Encryption sync running in background");
270                    for _ in 0..3 {
271                        trace!("waiting for decryption…");
272
273                        sleep(Duration::from_millis(wait)).await;
274
275                        // Note: We specify the cast type in case the
276                        // `experimental-encrypted-state-events` feature is enabled, which provides
277                        // multiple cast implementations.
278                        let new_event = room
279                            .decrypt_event(
280                                raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
281                                push_ctx.as_ref(),
282                            )
283                            .await?;
284
285                        match new_event.kind {
286                            matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
287                                utd_info, ..} => {
288                                if utd_info.reason.is_missing_room_key() {
289                                    // Decryption error that could be caused by a missing room
290                                    // key; retry in a few.
291                                    wait *= 2;
292                                } else {
293                                    debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
294                                    return Ok(None);
295                                }
296                            }
297                            _ => {
298                                trace!("Waiting succeeded and event could be decrypted!");
299                                return Ok(Some(new_event));
300                            }
301                        }
302                    }
303
304                    // We couldn't decrypt the event after waiting a few times, abort.
305                    debug!("Timeout waiting for the encryption sync to decrypt notification.");
306                    return Ok(None);
307                }
308            }
309        };
310
311        let encryption_sync = EncryptionSyncService::new(
312            self.client.clone(),
313            Some((Duration::from_secs(3), Duration::from_secs(4))),
314        )
315        .await;
316
317        // Just log out errors, but don't have them abort the notification processing:
318        // an undecrypted notification is still better than no
319        // notifications.
320
321        match encryption_sync {
322            Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
323                // Note: We specify the cast type in case the
324                // `experimental-encrypted-state-events` feature is enabled, which provides
325                // multiple cast implementations.
326                Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
327                    Ok(new_event) => match new_event.kind {
328                        matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
329                            utd_info, ..
330                        } => {
331                            trace!(
332                                "Encryption sync failed to decrypt the event: {:?}",
333                                utd_info.reason
334                            );
335                            Ok(None)
336                        }
337                        _ => {
338                            trace!("Encryption sync managed to decrypt the event.");
339                            Ok(Some(new_event))
340                        }
341                    },
342                    Err(err) => {
343                        trace!("Encryption sync failed to decrypt the event: {err}");
344                        Ok(None)
345                    }
346                },
347                Err(err) => {
348                    warn!("Encryption sync error: {err:#}");
349                    Ok(None)
350                }
351            },
352            Err(err) => {
353                warn!("Encryption sync build error: {err:#}",);
354                Ok(None)
355            }
356        }
357    }
358
359    /// Try to run a sliding sync (without encryption) to retrieve the events
360    /// from the notification.
361    ///
362    /// An event can either be:
363    /// - an invite event,
364    /// - or a non-invite event.
365    ///
366    /// In case it's a non-invite event, it's rather easy: we'll request
367    /// explicit state that'll be useful for building the
368    /// `NotificationItem`, and subscribe to the room which the notification
369    /// relates to.
370    ///
371    /// In case it's an invite-event, it's trickier because the stripped event
372    /// may not contain the event id, so we can't just match on it. Rather,
373    /// we look at stripped room member events that may be fitting (i.e.
374    /// match the current user and are invites), and if the SDK concludes the
375    /// room was in the invited state, and we didn't find the event by id,
376    /// *then* we'll use that stripped room member event.
377    #[instrument(skip_all)]
378    async fn try_sliding_sync(
379        &self,
380        requests: &[NotificationItemsRequest],
381    ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
382        const MAX_SLIDING_SYNC_ATTEMPTS: u64 = 3;
383        // Serialize all the calls to this method by taking a lock at the beginning,
384        // that will be dropped later.
385        let _guard = self.notification_sync_mutex.lock().await;
386
387        // Set up a sliding sync that only subscribes to the room that had the
388        // notification, so we can figure out the full event and associated
389        // information.
390
391        let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
392        let handler_raw_notification = raw_notifications.clone();
393
394        let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
395        let handler_raw_invites = raw_invites.clone();
396
397        let user_id = self.client.user_id().unwrap().to_owned();
398        let room_ids = requests.iter().map(|req| req.room_id.clone()).collect::<Vec<_>>();
399
400        let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
401
402        let timeline_event_handler = self.client.add_event_handler({
403            let requests = requests.clone();
404            move |raw: Raw<AnySyncTimelineEvent>| async move {
405                match &raw.get_field::<OwnedEventId>("event_id") {
406                    Ok(Some(event_id)) => {
407                        let Some(request) =
408                            &requests.iter().find(|request| request.event_ids.contains(event_id))
409                        else {
410                            return;
411                        };
412
413                        let room_id = request.room_id.clone();
414
415                        // found it! There shouldn't be a previous event before, but if
416                        // there is, that should be ok to
417                        // just replace it.
418                        handler_raw_notification.lock().unwrap().insert(
419                            event_id.to_owned(),
420                            (room_id, Some(RawNotificationEvent::Timeline(raw))),
421                        );
422                    }
423                    Ok(None) => {
424                        warn!("a sync event had no event id");
425                    }
426                    Err(err) => {
427                        warn!("failed to deserialize sync event id: {err}");
428                    }
429                }
430            }
431        });
432
433        let handler_raw_notifications = raw_notifications.clone();
434        let stripped_member_handler = self.client.add_event_handler({
435            let requests = requests.clone();
436            let room_ids: Vec<_> = room_ids.clone();
437            move |raw: Raw<StrippedRoomMemberEvent>, room: Room| async move {
438                if !room_ids.contains(&room.room_id().to_owned()) {
439                    return;
440                }
441
442                let deserialized = match raw.deserialize() {
443                    Ok(d) => d,
444                    Err(err) => {
445                        warn!("failed to deserialize raw stripped room member event: {err}");
446                        return;
447                    }
448                };
449
450                trace!("received a stripped room member event");
451
452                // Try to match the event by event_id, as it's the most precise. In theory, we
453                // shouldn't receive it, so that's a first attempt.
454                match &raw.get_field::<OwnedEventId>("event_id") {
455                    Ok(Some(event_id)) => {
456                        let request =
457                            &requests.iter().find(|request| request.event_ids.contains(event_id));
458                        if request.is_none() {
459                            return;
460                        }
461                        let room_id = request.unwrap().room_id.clone();
462
463                        // found it! There shouldn't be a previous event before, but if
464                        // there is, that should be ok to
465                        // just replace it.
466                        handler_raw_notifications.lock().unwrap().insert(
467                            event_id.to_owned(),
468                            (room_id, Some(RawNotificationEvent::Invite(raw))),
469                        );
470                        return;
471                    }
472                    Ok(None) => {
473                        warn!("a room member event had no id");
474                    }
475                    Err(err) => {
476                        warn!("failed to deserialize room member event id: {err}");
477                    }
478                }
479
480                // Try to match the event by membership and state_key for the current user.
481                if deserialized.content.membership == MembershipState::Invite
482                    && deserialized.state_key == user_id
483                {
484                    trace!("found an invite event for the current user");
485                    // This could be it! There might be several of these following each other, so
486                    // assume it's the latest one (in sync ordering), and override a previous one if
487                    // present.
488                    handler_raw_invites
489                        .lock()
490                        .unwrap()
491                        .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
492                } else {
493                    trace!("not an invite event, or not for the current user");
494                }
495            }
496        });
497
498        // Room power levels are necessary to build the push context.
499        let required_state = vec![
500            (StateEventType::RoomEncryption, "".to_owned()),
501            (StateEventType::RoomMember, "$LAZY".to_owned()),
502            (StateEventType::RoomMember, "$ME".to_owned()),
503            (StateEventType::RoomCanonicalAlias, "".to_owned()),
504            (StateEventType::RoomName, "".to_owned()),
505            (StateEventType::RoomAvatar, "".to_owned()),
506            (StateEventType::RoomPowerLevels, "".to_owned()),
507            (StateEventType::RoomJoinRules, "".to_owned()),
508            (StateEventType::CallMember, "*".to_owned()),
509            (StateEventType::RoomCreate, "".to_owned()),
510            (StateEventType::MemberHints, "".to_owned()),
511        ];
512
513        let invites = SlidingSyncList::builder("invites")
514            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
515            .timeline_limit(8)
516            .required_state(required_state.clone())
517            .filters(Some(assign!(http::request::ListFilters::default(), {
518                is_invite: Some(true),
519            })));
520
521        let sync = self
522            .client
523            .sliding_sync(Self::CONNECTION_ID)?
524            .poll_timeout(Duration::from_secs(1))
525            .network_timeout(Duration::from_secs(3))
526            .with_account_data_extension(
527                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
528            )
529            .add_list(invites)
530            .build()
531            .await?;
532
533        sync.subscribe_to_rooms(
534            &room_ids.iter().map(|id| id.deref()).collect::<Vec<&RoomId>>(),
535            Some(assign!(http::request::RoomSubscription::default(), {
536                required_state,
537                timeline_limit: uint!(16)
538            })),
539            true,
540        );
541
542        let mut remaining_attempts = MAX_SLIDING_SYNC_ATTEMPTS;
543
544        let stream = sync.sync();
545        pin_mut!(stream);
546
547        // Sum the expected event count for each room
548        let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
549
550        loop {
551            if stream.next().await.is_none() {
552                // Sliding sync aborted early.
553                break;
554            }
555
556            let event_count = raw_notifications.lock().unwrap().len();
557            let invite_count = raw_invites.lock().unwrap().len();
558
559            let current_attempt = 1 + MAX_SLIDING_SYNC_ATTEMPTS - remaining_attempts;
560            trace!(
561                "Attempt #{current_attempt}: \
562                Found {event_count} notification(s), \
563                {invite_count} invite event(s), \
564                expected {expected_event_count} total",
565            );
566
567            // We can stop looking once we've received the expected number of events from
568            // the sync. Since we can receive only events or invites for rooms but not both,
569            // and we're not taking into account invites from not subscribed rooms, this
570            // check should be accurate.
571            if event_count + invite_count == expected_event_count {
572                // We got the events.
573                break;
574            }
575
576            remaining_attempts -= 1;
577            warn!("There are some missing notifications, remaining attempts: {remaining_attempts}");
578            if remaining_attempts == 0 {
579                // We're out of luck.
580                break;
581            }
582        }
583
584        self.client.remove_event_handler(stripped_member_handler);
585        self.client.remove_event_handler(timeline_event_handler);
586
587        let mut notifications = raw_notifications.clone().lock().unwrap().clone();
588        let mut missing_event_ids = Vec::new();
589
590        // Create the list of missing event ids after the syncs.
591        for request in requests.iter() {
592            for event_id in &request.event_ids {
593                if !notifications.contains_key(event_id) {
594                    missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
595                }
596            }
597        }
598
599        // Try checking if the missing notifications could be invites.
600        for (room_id, missing_event_id) in missing_event_ids {
601            trace!("we didn't have a non-invite event, looking for invited room now");
602            if let Some(room) = self.client.get_room(&room_id) {
603                if room.state() == RoomState::Invited {
604                    if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
605                        notifications.insert(
606                            missing_event_id.to_owned(),
607                            (room_id.to_owned(), stripped_event),
608                        );
609                    }
610                } else {
611                    debug!("the room isn't in the invited state");
612                }
613            } else {
614                warn!(%room_id, "unknown room, can't check for invite events");
615            }
616        }
617
618        let found = if notifications.len() == expected_event_count { "" } else { "not " };
619        trace!("all notification events have{found} been found");
620
621        Ok(notifications)
622    }
623
624    pub async fn get_notification_with_sliding_sync(
625        &self,
626        room_id: &RoomId,
627        event_id: &EventId,
628    ) -> Result<NotificationStatus, Error> {
629        info!("fetching notification event with a sliding sync");
630
631        let request = NotificationItemsRequest {
632            room_id: room_id.to_owned(),
633            event_ids: vec![event_id.to_owned()],
634        };
635
636        let mut get_notifications_result =
637            self.get_notifications_with_sliding_sync(&[request]).await?;
638
639        get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
640    }
641
642    /// Given a (decrypted or not) event, figure out whether it should be
643    /// filtered out for other client-side reasons (such as the sender being
644    /// ignored, for instance), and returns the corresponding
645    /// [`NotificationStatus`].
646    async fn compute_status(
647        &self,
648        room: &Room,
649        push_actions: Option<&[Action]>,
650        raw_event: RawNotificationEvent,
651        state_events: Vec<Raw<AnyStateEvent>>,
652    ) -> Result<NotificationStatus, Error> {
653        if let Some(actions) = push_actions
654            && !actions.iter().any(|a| a.should_notify())
655        {
656            // The event shouldn't notify: return early.
657            return Ok(NotificationStatus::EventFilteredOut);
658        }
659
660        let notification_item =
661            NotificationItem::new(room, raw_event, push_actions, state_events).await?;
662
663        if self.client.is_user_ignored(notification_item.event.sender()).await {
664            Ok(NotificationStatus::EventFilteredOut)
665        } else {
666            Ok(NotificationStatus::Event(Box::new(notification_item)))
667        }
668    }
669
670    /// Get a list of full notifications, given a room id and event ids.
671    ///
672    /// This will run a small sliding sync to retrieve the content of the
673    /// events, along with extra data to form a rich notification context.
674    pub async fn get_notifications_with_sliding_sync(
675        &self,
676        requests: &[NotificationItemsRequest],
677    ) -> Result<BatchNotificationFetchingResult, Error> {
678        let raw_events = self.try_sliding_sync(requests).await?;
679
680        let mut batch_result = BatchNotificationFetchingResult::new();
681
682        for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
683            // At this point it should have been added by the sync, if it's not, give up.
684            let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
685
686            let Some(raw_event) = raw_event else {
687                // The event was not found, so we can't build a notification.
688                batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
689                continue;
690            };
691
692            let (raw_event, push_actions) = match &raw_event {
693                RawNotificationEvent::Timeline(timeline_event) => {
694                    // Check if the event is redacted first
695                    let event_for_redaction_check: AnySyncTimelineEvent =
696                        match timeline_event.deserialize() {
697                            Ok(event) => event,
698                            Err(_) => {
699                                batch_result.insert(event_id, Err(Error::InvalidRumaEvent));
700                                continue;
701                            }
702                        };
703
704                    if is_event_redacted(&event_for_redaction_check) {
705                        batch_result.insert(event_id, Ok(NotificationStatus::EventRedacted));
706                        continue;
707                    }
708
709                    // Timeline events may be encrypted, so make sure they get decrypted first.
710                    match self.retry_decryption(&room, timeline_event).await {
711                        Ok(Some(timeline_event)) => {
712                            let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
713                            (
714                                RawNotificationEvent::Timeline(timeline_event.into_raw()),
715                                push_actions,
716                            )
717                        }
718
719                        Ok(None) => {
720                            // The event was either not encrypted in the first place, or we
721                            // couldn't decrypt it after retrying. Use the raw event as is.
722                            match room.event_push_actions(timeline_event).await {
723                                Ok(push_actions) => (raw_event.clone(), push_actions),
724                                Err(err) => {
725                                    // Could not get push actions.
726                                    batch_result.insert(event_id, Err(err.into()));
727                                    continue;
728                                }
729                            }
730                        }
731
732                        Err(err) => {
733                            batch_result.insert(event_id, Err(err));
734                            continue;
735                        }
736                    }
737                }
738
739                RawNotificationEvent::Invite(invite_event) => {
740                    // Invite events can't be encrypted, so they should be in clear text.
741                    match room.event_push_actions(invite_event).await {
742                        Ok(push_actions) => {
743                            (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
744                        }
745                        Err(err) => {
746                            batch_result.insert(event_id, Err(err.into()));
747                            continue;
748                        }
749                    }
750                }
751            };
752
753            let notification_status_result =
754                self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
755
756            batch_result.insert(event_id, notification_status_result);
757        }
758
759        Ok(batch_result)
760    }
761
762    /// Retrieve a notification using a `/context` query.
763    ///
764    /// This is for clients that are already running other sliding syncs in the
765    /// same process, so that most of the contextual information for the
766    /// notification should already be there. In particular, the room containing
767    /// the event MUST be known (via a sliding sync for invites, or another
768    /// sliding sync).
769    ///
770    /// An error result means that we couldn't resolve the notification; in that
771    /// case, a dummy notification may be displayed instead. A `None` result
772    /// means the notification has been filtered out by the user's push
773    /// rules.
774    pub async fn get_notification_with_context(
775        &self,
776        room_id: &RoomId,
777        event_id: &EventId,
778    ) -> Result<NotificationStatus, Error> {
779        info!("fetching notification event with a /context query");
780
781        // See above comment.
782        let Some(room) = self.parent_client.get_room(room_id) else {
783            return Err(Error::UnknownRoom);
784        };
785
786        let response = room.event_with_context(event_id, true, uint!(0), None).await?;
787
788        let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
789        let state_events = response.state;
790
791        // Check if the event is redacted
792        let event_for_redaction_check: AnySyncTimelineEvent =
793            timeline_event.raw().deserialize().map_err(|_| Error::InvalidRumaEvent)?;
794
795        if is_event_redacted(&event_for_redaction_check) {
796            return Ok(NotificationStatus::EventRedacted);
797        }
798
799        if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
800            timeline_event = decrypted_event;
801        }
802
803        let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
804
805        self.compute_status(
806            &room,
807            push_actions.as_deref(),
808            RawNotificationEvent::Timeline(timeline_event.into_raw()),
809            state_events,
810        )
811        .await
812    }
813}
814
815fn is_event_encrypted(event_type: TimelineEventType) -> bool {
816    let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
817
818    #[cfg(feature = "unstable-msc3956")]
819    let is_still_encrypted =
820        is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
821
822    is_still_encrypted
823}
824
825fn is_event_redacted(event: &AnySyncTimelineEvent) -> bool {
826    // Check if the event is a message-like event but has no original content (i.e.,
827    // redacted)
828    match event {
829        AnySyncTimelineEvent::MessageLike(msg) => msg.is_redacted(),
830        _ => false,
831    }
832}
833
834#[derive(Debug)]
835pub enum NotificationStatus {
836    /// The event has been found and was not filtered out.
837    Event(Box<NotificationItem>),
838    /// The event couldn't be found in the network queries used to find it.
839    EventNotFound,
840    /// The event has been filtered out, either because of the user's push
841    /// rules, or because the user which triggered it is ignored by the
842    /// current user.
843    EventFilteredOut,
844    /// The event has been redacted and has no meaningful content.
845    EventRedacted,
846}
847
848#[derive(Debug, Clone)]
849pub struct NotificationItemsRequest {
850    pub room_id: OwnedRoomId,
851    pub event_ids: Vec<OwnedEventId>,
852}
853
854type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
855
856/// The Notification event as it was fetched from remote for the
857/// given `event_id`, represented as Raw but decrypted, thus only
858/// whether it is an invite or regular Timeline event has been
859/// determined.
860#[derive(Debug, Clone)]
861pub enum RawNotificationEvent {
862    /// The raw event for a timeline event
863    Timeline(Raw<AnySyncTimelineEvent>),
864    /// The notification contains an invitation with the given
865    /// StrippedRoomMemberEvent (in raw here)
866    Invite(Raw<StrippedRoomMemberEvent>),
867}
868
869/// The deserialized Event as it was fetched from remote for the
870/// given `event_id` and after decryption (if possible).
871#[derive(Debug)]
872pub enum NotificationEvent {
873    /// The Notification was for a TimelineEvent
874    Timeline(Box<AnySyncTimelineEvent>),
875    /// The Notification is an invite with the given stripped room event data
876    Invite(Box<StrippedRoomMemberEvent>),
877}
878
879impl NotificationEvent {
880    pub fn sender(&self) -> &UserId {
881        match self {
882            NotificationEvent::Timeline(ev) => ev.sender(),
883            NotificationEvent::Invite(ev) => &ev.sender,
884        }
885    }
886
887    /// Returns the root event id of the thread the notification event is in, if
888    /// any.
889    fn thread_id(&self) -> Option<OwnedEventId> {
890        let NotificationEvent::Timeline(sync_timeline_event) = &self else {
891            return None;
892        };
893        let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
894            return None;
895        };
896        let content = event.original_content()?;
897        match content {
898            AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
899                Relation::Thread(thread) => Some(thread.event_id),
900                _ => None,
901            },
902            _ => None,
903        }
904    }
905}
906
907/// A notification with its full content.
908#[derive(Debug)]
909pub struct NotificationItem {
910    /// Underlying Ruma event.
911    pub event: NotificationEvent,
912
913    /// The raw of the underlying event.
914    pub raw_event: RawNotificationEvent,
915
916    /// Display name of the sender.
917    pub sender_display_name: Option<String>,
918    /// Avatar URL of the sender.
919    pub sender_avatar_url: Option<String>,
920    /// Is the sender's name ambiguous?
921    pub is_sender_name_ambiguous: bool,
922
923    /// Room computed display name.
924    pub room_computed_display_name: String,
925    /// Room avatar URL.
926    pub room_avatar_url: Option<String>,
927    /// Room canonical alias.
928    pub room_canonical_alias: Option<String>,
929    /// Room topic.
930    pub room_topic: Option<String>,
931    /// Room join rule.
932    ///
933    /// Set to `None` if the join rule for this room is not available.
934    pub room_join_rule: Option<JoinRule>,
935    /// Is this room encrypted?
936    pub is_room_encrypted: Option<bool>,
937    /// Is this room considered a direct message?
938    pub is_direct_message_room: bool,
939    /// Numbers of members who joined the room.
940    pub joined_members_count: u64,
941    /// Is the room a space?
942    pub is_space: bool,
943
944    /// Is it a noisy notification? (i.e. does any push action contain a sound
945    /// action)
946    ///
947    /// It is set if and only if the push actions could be determined.
948    pub is_noisy: Option<bool>,
949    pub has_mention: Option<bool>,
950    pub thread_id: Option<OwnedEventId>,
951
952    /// The push actions for this notification (notify, sound, highlight, etc.).
953    pub actions: Option<Vec<Action>>,
954}
955
956impl NotificationItem {
957    async fn new(
958        room: &Room,
959        raw_event: RawNotificationEvent,
960        push_actions: Option<&[Action]>,
961        state_events: Vec<Raw<AnyStateEvent>>,
962    ) -> Result<Self, Error> {
963        let event = match &raw_event {
964            RawNotificationEvent::Timeline(raw_event) => {
965                let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
966                if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
967                    SyncRoomMessageEvent::Original(ev),
968                )) = &mut event
969                {
970                    ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
971                }
972                NotificationEvent::Timeline(Box::new(event))
973            }
974            RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
975                raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
976            )),
977        };
978
979        let sender = match room.state() {
980            RoomState::Invited => room.invite_details().await?.inviter,
981            _ => room.get_member_no_sync(event.sender()).await?,
982        };
983
984        let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
985            match &sender {
986                Some(sender) => (
987                    sender.display_name().map(|s| s.to_owned()),
988                    sender.avatar_url().map(|s| s.to_string()),
989                    sender.name_ambiguous(),
990                ),
991                None => (None, None, false),
992            };
993
994        if sender_display_name.is_none() || sender_avatar_url.is_none() {
995            let sender_id = event.sender();
996            for ev in state_events {
997                let ev = match ev.deserialize() {
998                    Ok(ev) => ev,
999                    Err(err) => {
1000                        warn!("Failed to deserialize a state event: {err}");
1001                        continue;
1002                    }
1003                };
1004                if ev.sender() != sender_id {
1005                    continue;
1006                }
1007                if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
1008                    content,
1009                    ..
1010                }) = ev.content()
1011                {
1012                    if sender_display_name.is_none() {
1013                        sender_display_name = content.displayname;
1014                    }
1015                    if sender_avatar_url.is_none() {
1016                        sender_avatar_url = content.avatar_url.map(|url| url.to_string());
1017                    }
1018                }
1019            }
1020        }
1021
1022        let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
1023        let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
1024        let thread_id = event.thread_id().clone();
1025
1026        let item = NotificationItem {
1027            event,
1028            raw_event,
1029            sender_display_name,
1030            sender_avatar_url,
1031            is_sender_name_ambiguous,
1032            room_computed_display_name: room.display_name().await?.to_string(),
1033            room_avatar_url: room.avatar_url().map(|s| s.to_string()),
1034            room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
1035            room_topic: room.topic(),
1036            room_join_rule: room.join_rule(),
1037            is_direct_message_room: room.is_direct().await?,
1038            is_room_encrypted: room
1039                .latest_encryption_state()
1040                .await
1041                .map(|state| state.is_encrypted())
1042                .ok(),
1043            joined_members_count: room.joined_members_count(),
1044            is_space: room.is_space(),
1045            is_noisy,
1046            has_mention,
1047            thread_id,
1048            actions: push_actions.map(|actions| actions.to_vec()),
1049        };
1050
1051        Ok(item)
1052    }
1053
1054    /// Returns whether this room is public or not, based on the join rule.
1055    ///
1056    /// Maybe return `None` if the join rule is not available.
1057    pub fn is_public(&self) -> Option<bool> {
1058        self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
1059    }
1060}
1061
1062/// An error for the [`NotificationClient`].
1063#[derive(Debug, Error)]
1064pub enum Error {
1065    #[error(transparent)]
1066    BuildingLocalClient(ClientBuildError),
1067
1068    /// The room associated to this event wasn't found.
1069    #[error("unknown room for a notification")]
1070    UnknownRoom,
1071
1072    /// The Ruma event contained within this notification couldn't be parsed.
1073    #[error("invalid ruma event")]
1074    InvalidRumaEvent,
1075
1076    /// When calling `get_notification_with_sliding_sync`, the room was missing
1077    /// in the response.
1078    #[error("the sliding sync response doesn't include the target room")]
1079    SlidingSyncEmptyRoom,
1080
1081    #[error("the event was missing in the `/context` query")]
1082    ContextMissingEvent,
1083
1084    /// An error forwarded from the client.
1085    #[error(transparent)]
1086    SdkError(#[from] matrix_sdk::Error),
1087
1088    /// An error forwarded from the underlying state store.
1089    #[error(transparent)]
1090    StoreError(#[from] StoreError),
1091}
1092
1093#[cfg(test)]
1094mod tests {
1095    use std::collections::BTreeMap;
1096
1097    use assert_matches2::assert_let;
1098    use matrix_sdk::test_utils::mocks::MatrixMockServer;
1099    use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
1100    use ruma::{
1101        api::client::sync::sync_events::v5,
1102        assign, event_id,
1103        events::room::{member::MembershipState, message::RedactedRoomMessageEventContent},
1104        owned_event_id, owned_room_id, room_id, user_id,
1105    };
1106
1107    use crate::notification_client::{
1108        NotificationClient, NotificationItem, NotificationItemsRequest, NotificationProcessSetup,
1109        NotificationStatus, RawNotificationEvent,
1110    };
1111
1112    #[async_test]
1113    async fn test_notification_item_returns_thread_id() {
1114        let server = MatrixMockServer::new().await;
1115        let client = server.client_builder().build().await;
1116
1117        let room_id = room_id!("!a:b.c");
1118        let thread_root_event_id = event_id!("$root:b.c");
1119        let message = EventFactory::new()
1120            .room(room_id)
1121            .sender(user_id!("@sender:b.c"))
1122            .text_msg("Threaded")
1123            .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1124            .into_raw_sync();
1125        let room = server.sync_joined_room(&client, room_id).await;
1126
1127        let raw_notification_event = RawNotificationEvent::Timeline(message);
1128        let notification_item =
1129            NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1130                .await
1131                .expect("Could not create notification item");
1132
1133        assert_let!(Some(thread_id) = notification_item.thread_id);
1134        assert_eq!(thread_id, thread_root_event_id);
1135    }
1136
1137    #[async_test]
1138    async fn test_try_sliding_sync_ignores_invites_for_non_subscribed_rooms() {
1139        let server = MatrixMockServer::new().await;
1140        let client = server.client_builder().build().await;
1141
1142        let user_id = client.user_id().unwrap();
1143        let room_id = room_id!("!a:b.c");
1144        let invite = EventFactory::new()
1145            .room(room_id)
1146            .member(user_id)
1147            .membership(MembershipState::Invite)
1148            .no_event_id()
1149            .into_raw_sync_state();
1150        let mut room = v5::response::Room::new();
1151        room.invite_state = Some(vec![invite.cast_unchecked()]);
1152        let rooms = BTreeMap::from_iter([(room_id.to_owned(), room)]);
1153        server
1154            .mock_sliding_sync()
1155            .ok(assign!(v5::Response::new("1".to_owned()), {
1156                rooms: rooms,
1157            }))
1158            .mount()
1159            .await;
1160
1161        let notification_client =
1162            NotificationClient::new(client.clone(), NotificationProcessSetup::MultipleProcesses)
1163                .await
1164                .expect("Could not create a notification client");
1165
1166        // Check we don't receive the invite for a different room, even if it was
1167        // included in the sync response
1168        let event_id = owned_event_id!("$a:b.c");
1169        let result = notification_client
1170            .try_sliding_sync(&[NotificationItemsRequest {
1171                room_id: owned_room_id!("!other:b.c"),
1172                event_ids: vec![event_id.clone()],
1173            }])
1174            .await
1175            .expect("Could not run sliding sync");
1176
1177        assert!(result.is_empty());
1178
1179        // Now try fetching the invite for the previously ignored room
1180        let result = notification_client
1181            .try_sliding_sync(&[NotificationItemsRequest {
1182                room_id: room_id.to_owned(),
1183                event_ids: vec![event_id.clone()],
1184            }])
1185            .await
1186            .expect("Could not run sliding sync");
1187
1188        // Check we did receive an event
1189        assert!(!result.is_empty());
1190
1191        // Try to assert it's the same event (since we don't have an event id)
1192        // We can check its room, sender and membership state
1193        let (in_room_id, event) = &result[&event_id];
1194        assert_eq!(room_id, in_room_id);
1195        assert_let!(Some(RawNotificationEvent::Invite(raw_invite)) = event);
1196
1197        let invite = raw_invite.deserialize().expect("Could not deserialize invite event");
1198        assert_eq!(invite.state_key, user_id.to_string());
1199        assert_eq!(invite.content.membership, MembershipState::Invite);
1200    }
1201
1202    #[async_test]
1203    async fn test_redacted_event_returns_event_redacted_status() {
1204        let server = MatrixMockServer::new().await;
1205        let client = server.client_builder().build().await;
1206
1207        let room_id = room_id!("!a:b.c");
1208
1209        // Create a redacted message event (no content)
1210        let event_id = owned_event_id!("$redacted:b.c");
1211        let redacted_event = EventFactory::new()
1212            .room(room_id)
1213            .sender(user_id!("@sender:b.c"))
1214            .redacted(&ALICE, RedactedRoomMessageEventContent::new())
1215            .event_id(&event_id)
1216            .into_raw();
1217        let mut room = v5::response::Room::new();
1218        room.timeline = vec![redacted_event];
1219
1220        let mut rooms = BTreeMap::new();
1221        rooms.insert(room_id.to_owned(), room);
1222
1223        server
1224            .mock_sliding_sync()
1225            .ok(assign!(v5::Response::new("1".to_owned()), {
1226                rooms: rooms,
1227            }))
1228            .mount()
1229            .await;
1230
1231        let notification_client =
1232            NotificationClient::new(client.clone(), NotificationProcessSetup::MultipleProcesses)
1233                .await
1234                .expect("Could not create a notification client");
1235
1236        let result: NotificationStatus = notification_client
1237            .get_notification_with_sliding_sync(room_id, &event_id)
1238            .await
1239            .expect("Could not get notification");
1240
1241        match result {
1242            NotificationStatus::EventRedacted => {
1243                // Success - redacted event was properly detected
1244            }
1245            other => panic!("Expected EventRedacted, got {:?}", other),
1246        }
1247    }
1248}