Skip to main content

matrix_sdk_ui/
notification_client.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeMap,
17    ops::Deref,
18    sync::{Arc, Mutex},
19    time::Duration,
20};
21
22use futures_util::{StreamExt as _, pin_mut};
23use itertools::Itertools;
24use matrix_sdk::{
25    Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
26};
27use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
28use matrix_sdk_common::cross_process_lock::CrossProcessLockConfig;
29use ruma::{
30    EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
31    api::client::sync::sync_events::v5 as http,
32    assign,
33    events::{
34        AnyMessageLikeEventContent, AnyStateEvent, AnyStateEventContentChange,
35        AnySyncMessageLikeEvent, AnySyncTimelineEvent, StateEventContentChange, StateEventType,
36        TimelineEventType,
37        room::{
38            encrypted::OriginalSyncRoomEncryptedEvent,
39            join_rules::JoinRule,
40            member::{MembershipState, StrippedRoomMemberEvent},
41            message::{Relation, SyncRoomMessageEvent},
42        },
43    },
44    html::RemoveReplyFallback,
45    push::Action,
46    serde::Raw,
47    uint,
48};
49use thiserror::Error;
50use tokio::sync::Mutex as AsyncMutex;
51use tracing::{debug, info, instrument, trace, warn};
52
53use crate::{
54    DEFAULT_SANITIZER_MODE,
55    encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService},
56    sync_service::SyncService,
57};
58
59/// What kind of process setup do we have for this notification client?
60#[derive(Clone)]
61pub enum NotificationProcessSetup {
62    /// The notification client may run on a separate process than the rest of
63    /// the app.
64    ///
65    /// For instance, this is the case on iOS, where notifications are handled
66    /// in a separate process (the Notification Service Extension, aka NSE).
67    ///
68    /// In that case, a cross-process lock will be used to coordinate writes
69    /// into the stores handled by the SDK.
70    MultipleProcesses,
71
72    /// The notification client runs in the same process as the rest of the
73    /// `Client` performing syncs.
74    ///
75    /// For instance, this is the case on Android, where a notification will
76    /// wake up the main app process.
77    ///
78    /// In that case, a smart reference to the [`SyncService`] must be provided.
79    SingleProcess { sync_service: Arc<SyncService> },
80}
81
82/// A client specialized for handling push notifications received over the
83/// network, for an app.
84///
85/// In particular, it takes care of running a full decryption sync, in case the
86/// event in the notification was impossible to decrypt beforehand.
87pub struct NotificationClient {
88    /// SDK client that uses an in-memory state store.
89    client: Client,
90
91    /// SDK client that uses the same state store as the caller's context.
92    parent_client: Client,
93
94    /// Is the notification client running on its own process or not?
95    process_setup: NotificationProcessSetup,
96
97    /// A mutex to serialize requests to the notifications sliding sync.
98    ///
99    /// If several notifications come in at the same time (e.g. network was
100    /// unreachable because of airplane mode or something similar), then we
101    /// need to make sure that repeated calls to `get_notification` won't
102    /// cause multiple requests with the same `conn_id` we're using for
103    /// notifications. This mutex solves this by sequentializing the requests.
104    notification_sync_mutex: AsyncMutex<()>,
105
106    /// A mutex to serialize requests to the encryption sliding sync that's used
107    /// in case we didn't have the keys to decipher an event.
108    ///
109    /// Same reasoning as [`Self::notification_sync_mutex`].
110    encryption_sync_mutex: AsyncMutex<()>,
111}
112
113impl NotificationClient {
114    const CONNECTION_ID: &'static str = "notifications";
115    const LOCK_ID: &'static str = "notifications";
116
117    /// Create a new notification client.
118    pub async fn new(
119        parent_client: Client,
120        process_setup: NotificationProcessSetup,
121    ) -> Result<Self, Error> {
122        // Only create the lock id if cross process lock is needed (multiple processes)
123        let cross_process_store_config = match process_setup {
124            NotificationProcessSetup::MultipleProcesses => {
125                CrossProcessLockConfig::multi_process(Self::LOCK_ID)
126            }
127            NotificationProcessSetup::SingleProcess { .. } => CrossProcessLockConfig::SingleProcess,
128        };
129        let client = parent_client.notification_client(cross_process_store_config).await?;
130
131        Ok(NotificationClient {
132            client,
133            parent_client,
134            notification_sync_mutex: AsyncMutex::new(()),
135            encryption_sync_mutex: AsyncMutex::new(()),
136            process_setup,
137        })
138    }
139
140    /// Fetches a room by its ID using the in-memory state store backed client.
141    /// Useful to retrieve room information after running the limited
142    /// notification client sliding sync loop.
143    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
144        self.client.get_room(room_id)
145    }
146
147    /// Fetches the content of a notification.
148    ///
149    /// This will first try to get the notification using a short-lived sliding
150    /// sync, and if the sliding-sync can't find the event, then it'll use a
151    /// `/context` query to find the event with associated member information.
152    ///
153    /// An error result means that we couldn't resolve the notification; in that
154    /// case, a dummy notification may be displayed instead.
155    #[instrument(skip(self))]
156    pub async fn get_notification(
157        &self,
158        room_id: &RoomId,
159        event_id: &EventId,
160    ) -> Result<NotificationStatus, Error> {
161        let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
162        match status {
163            NotificationStatus::Event(..)
164            | NotificationStatus::EventFilteredOut
165            | NotificationStatus::EventRedacted => Ok(status),
166            NotificationStatus::EventNotFound => {
167                self.get_notification_with_context(room_id, event_id).await
168            }
169        }
170    }
171
172    /// Fetches the content of several notifications.
173    ///
174    /// This will first try to get the notifications using a short-lived sliding
175    /// sync, and if the sliding-sync can't find the events, then it'll use a
176    /// `/context` query to find the events with associated member information.
177    ///
178    /// An error result at the top level means that something failed when trying
179    /// to set up the notification fetching.
180    ///
181    /// For each notification item you can also receive an error, which means
182    /// something failed when trying to fetch that particular notification
183    /// (decryption, fetching push actions, etc.); in that case, a dummy
184    /// notification may be displayed instead.
185    pub async fn get_notifications(
186        &self,
187        requests: &[NotificationItemsRequest],
188    ) -> Result<BatchNotificationFetchingResult, Error> {
189        let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
190
191        for request in requests {
192            for event_id in &request.event_ids {
193                match notifications.get_mut(event_id) {
194                    // If the notification for a given event wasn't found with sliding sync, try
195                    // with a /context for each event.
196                    Some(Ok(NotificationStatus::EventNotFound)) | None => {
197                        notifications.insert(
198                            event_id.to_owned(),
199                            self.get_notification_with_context(&request.room_id, event_id).await,
200                        );
201                    }
202
203                    _ => {}
204                }
205            }
206        }
207
208        Ok(notifications)
209    }
210
211    /// Run an encryption sync loop, in case an event is still encrypted.
212    ///
213    /// Will return `Ok(Some)` if and only if:
214    /// - the event was encrypted,
215    /// - we successfully ran an encryption sync or waited long enough for an
216    ///   existing encryption sync to decrypt the event.
217    ///
218    /// Otherwise, if the event was not encrypted, or couldn't be decrypted
219    /// (without causing a fatal error), will return `Ok(None)`.
220    #[instrument(skip_all)]
221    async fn retry_decryption(
222        &self,
223        room: &Room,
224        raw_event: &Raw<AnySyncTimelineEvent>,
225    ) -> Result<Option<TimelineEvent>, Error> {
226        let event: AnySyncTimelineEvent =
227            raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
228
229        if !is_event_encrypted(event.event_type()) {
230            return Ok(None);
231        }
232
233        // Serialize calls to this function.
234        let _guard = self.encryption_sync_mutex.lock().await;
235
236        // The message is still encrypted, and the client is configured to retry
237        // decryption.
238        //
239        // Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop:
240        // - the first iteration allows to get SS events as well as send e2ee requests.
241        // - the second one let the SS homeserver forward events triggered by the
242        //   sending of e2ee requests.
243        //
244        // Keep timeouts small for both, since we might be short on time.
245
246        let push_ctx = room.push_context().await?;
247        let sync_permit_guard = match &self.process_setup {
248            NotificationProcessSetup::MultipleProcesses => {
249                // We're running on our own process, dedicated for notifications. In that case,
250                // create a dummy sync permit; we're guaranteed there's at most one since we've
251                // acquired the `encryption_sync_mutex' lock here.
252                let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
253                sync_permit.lock_owned().await
254            }
255
256            NotificationProcessSetup::SingleProcess { sync_service } => {
257                if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
258                    permit_guard
259                } else {
260                    // There's already a sync service active, thus the encryption sync is already
261                    // running elsewhere. As a matter of fact, if the event was encrypted, that
262                    // means we were racing against the encryption sync. Wait a bit, attempt to
263                    // decrypt, and carry on.
264
265                    // We repeat the sleep 3 times at most, each iteration we
266                    // double the amount of time waited, so overall we may wait up to 7 times this
267                    // amount.
268                    let mut wait = 200;
269
270                    debug!("Encryption sync running in background");
271                    for _ in 0..3 {
272                        trace!("waiting for decryption…");
273
274                        sleep(Duration::from_millis(wait)).await;
275
276                        // Note: We specify the cast type in case the
277                        // `experimental-encrypted-state-events` feature is enabled, which provides
278                        // multiple cast implementations.
279                        let new_event = room
280                            .decrypt_event(
281                                raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
282                                push_ctx.as_ref(),
283                            )
284                            .await?;
285
286                        match new_event.kind {
287                            matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
288                                utd_info, ..} => {
289                                if utd_info.reason.is_missing_room_key() {
290                                    // Decryption error that could be caused by a missing room
291                                    // key; retry in a few.
292                                    wait *= 2;
293                                } else {
294                                    debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
295                                    return Ok(None);
296                                }
297                            }
298                            _ => {
299                                trace!("Waiting succeeded and event could be decrypted!");
300                                return Ok(Some(new_event));
301                            }
302                        }
303                    }
304
305                    // We couldn't decrypt the event after waiting a few times, abort.
306                    debug!("Timeout waiting for the encryption sync to decrypt notification.");
307                    return Ok(None);
308                }
309            }
310        };
311
312        let encryption_sync = EncryptionSyncService::new(
313            self.client.clone(),
314            Some((Duration::from_secs(3), Duration::from_secs(4))),
315        )
316        .await;
317
318        // Just log out errors, but don't have them abort the notification processing:
319        // an undecrypted notification is still better than no
320        // notifications.
321
322        match encryption_sync {
323            Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
324                // Note: We specify the cast type in case the
325                // `experimental-encrypted-state-events` feature is enabled, which provides
326                // multiple cast implementations.
327                Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
328                    Ok(new_event) => match new_event.kind {
329                        matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
330                            utd_info, ..
331                        } => {
332                            trace!(
333                                "Encryption sync failed to decrypt the event: {:?}",
334                                utd_info.reason
335                            );
336                            Ok(None)
337                        }
338                        _ => {
339                            trace!("Encryption sync managed to decrypt the event.");
340                            Ok(Some(new_event))
341                        }
342                    },
343                    Err(err) => {
344                        trace!("Encryption sync failed to decrypt the event: {err}");
345                        Ok(None)
346                    }
347                },
348                Err(err) => {
349                    warn!("Encryption sync error: {err:#}");
350                    Ok(None)
351                }
352            },
353            Err(err) => {
354                warn!("Encryption sync build error: {err:#}",);
355                Ok(None)
356            }
357        }
358    }
359
360    /// Try to run a sliding sync (without encryption) to retrieve the events
361    /// from the notification.
362    ///
363    /// An event can either be:
364    /// - an invite event,
365    /// - or a non-invite event.
366    ///
367    /// In case it's a non-invite event, it's rather easy: we'll request
368    /// explicit state that'll be useful for building the
369    /// `NotificationItem`, and subscribe to the room which the notification
370    /// relates to.
371    ///
372    /// In case it's an invite-event, it's trickier because the stripped event
373    /// may not contain the event id, so we can't just match on it. Rather,
374    /// we look at stripped room member events that may be fitting (i.e.
375    /// match the current user and are invites), and if the SDK concludes the
376    /// room was in the invited state, and we didn't find the event by id,
377    /// *then* we'll use that stripped room member event.
378    #[instrument(skip_all)]
379    async fn try_sliding_sync(
380        &self,
381        requests: &[NotificationItemsRequest],
382    ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
383        const MAX_SLIDING_SYNC_ATTEMPTS: u64 = 3;
384        // Serialize all the calls to this method by taking a lock at the beginning,
385        // that will be dropped later.
386        let _guard = self.notification_sync_mutex.lock().await;
387
388        // Set up a sliding sync that only subscribes to the room that had the
389        // notification, so we can figure out the full event and associated
390        // information.
391
392        let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
393        let handler_raw_notification = raw_notifications.clone();
394
395        let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
396        let handler_raw_invites = raw_invites.clone();
397
398        let user_id = self.client.user_id().unwrap().to_owned();
399        let room_ids = requests.iter().map(|req| req.room_id.clone()).collect::<Vec<_>>();
400
401        let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
402
403        let timeline_event_handler = self.client.add_event_handler({
404            let requests = requests.clone();
405            move |raw: Raw<AnySyncTimelineEvent>| async move {
406                match &raw.get_field::<OwnedEventId>("event_id") {
407                    Ok(Some(event_id)) => {
408                        let Some(request) =
409                            &requests.iter().find(|request| request.event_ids.contains(event_id))
410                        else {
411                            return;
412                        };
413
414                        let room_id = request.room_id.clone();
415
416                        // found it! There shouldn't be a previous event before, but if
417                        // there is, that should be ok to
418                        // just replace it.
419                        handler_raw_notification.lock().unwrap().insert(
420                            event_id.to_owned(),
421                            (room_id, Some(RawNotificationEvent::Timeline(raw))),
422                        );
423                    }
424                    Ok(None) => {
425                        warn!("a sync event had no event id");
426                    }
427                    Err(err) => {
428                        warn!("failed to deserialize sync event id: {err}");
429                    }
430                }
431            }
432        });
433
434        let handler_raw_notifications = raw_notifications.clone();
435        let stripped_member_handler = self.client.add_event_handler({
436            let requests = requests.clone();
437            let room_ids: Vec<_> = room_ids.clone();
438            move |raw: Raw<StrippedRoomMemberEvent>, room: Room| async move {
439                if !room_ids.contains(&room.room_id().to_owned()) {
440                    return;
441                }
442
443                let deserialized = match raw.deserialize() {
444                    Ok(d) => d,
445                    Err(err) => {
446                        warn!("failed to deserialize raw stripped room member event: {err}");
447                        return;
448                    }
449                };
450
451                trace!("received a stripped room member event");
452
453                // Try to match the event by event_id, as it's the most precise. In theory, we
454                // shouldn't receive it, so that's a first attempt.
455                match &raw.get_field::<OwnedEventId>("event_id") {
456                    Ok(Some(event_id)) => {
457                        let request =
458                            &requests.iter().find(|request| request.event_ids.contains(event_id));
459                        if request.is_none() {
460                            return;
461                        }
462                        let room_id = request.unwrap().room_id.clone();
463
464                        // found it! There shouldn't be a previous event before, but if
465                        // there is, that should be ok to
466                        // just replace it.
467                        handler_raw_notifications.lock().unwrap().insert(
468                            event_id.to_owned(),
469                            (room_id, Some(RawNotificationEvent::Invite(raw))),
470                        );
471                        return;
472                    }
473                    Ok(None) => {
474                        warn!("a room member event had no id");
475                    }
476                    Err(err) => {
477                        warn!("failed to deserialize room member event id: {err}");
478                    }
479                }
480
481                // Try to match the event by membership and state_key for the current user.
482                if deserialized.content.membership == MembershipState::Invite
483                    && deserialized.state_key == user_id
484                {
485                    trace!("found an invite event for the current user");
486                    // This could be it! There might be several of these following each other, so
487                    // assume it's the latest one (in sync ordering), and override a previous one if
488                    // present.
489                    handler_raw_invites
490                        .lock()
491                        .unwrap()
492                        .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
493                } else {
494                    trace!("not an invite event, or not for the current user");
495                }
496            }
497        });
498
499        // Room power levels are necessary to build the push context.
500        let required_state = vec![
501            (StateEventType::RoomEncryption, "".to_owned()),
502            (StateEventType::RoomMember, "$LAZY".to_owned()),
503            (StateEventType::RoomMember, "$ME".to_owned()),
504            (StateEventType::RoomCanonicalAlias, "".to_owned()),
505            (StateEventType::RoomName, "".to_owned()),
506            (StateEventType::RoomAvatar, "".to_owned()),
507            (StateEventType::RoomPowerLevels, "".to_owned()),
508            (StateEventType::RoomJoinRules, "".to_owned()),
509            (StateEventType::CallMember, "*".to_owned()),
510            (StateEventType::RoomCreate, "".to_owned()),
511            (StateEventType::MemberHints, "".to_owned()),
512        ];
513
514        let invites = SlidingSyncList::builder("invites")
515            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
516            .timeline_limit(8)
517            .required_state(required_state.clone())
518            .filters(Some(assign!(http::request::ListFilters::default(), {
519                is_invite: Some(true),
520            })));
521
522        let sync = self
523            .client
524            .sliding_sync(Self::CONNECTION_ID)?
525            .poll_timeout(Duration::from_secs(1))
526            .network_timeout(Duration::from_secs(3))
527            .with_account_data_extension(
528                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
529            )
530            .add_list(invites)
531            .build()
532            .await?;
533
534        sync.subscribe_to_rooms(
535            &room_ids.iter().map(|id| id.deref()).collect::<Vec<&RoomId>>(),
536            Some(assign!(http::request::RoomSubscription::default(), {
537                required_state,
538                timeline_limit: uint!(16)
539            })),
540            true,
541        );
542
543        let mut remaining_attempts = MAX_SLIDING_SYNC_ATTEMPTS;
544
545        let stream = sync.sync();
546        pin_mut!(stream);
547
548        // Sum the expected event count for each room
549        let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
550
551        loop {
552            if stream.next().await.is_none() {
553                // Sliding sync aborted early.
554                break;
555            }
556
557            let event_count = raw_notifications.lock().unwrap().len();
558            let invite_count = raw_invites.lock().unwrap().len();
559
560            let current_attempt = 1 + MAX_SLIDING_SYNC_ATTEMPTS - remaining_attempts;
561            trace!(
562                "Attempt #{current_attempt}: \
563                Found {event_count} notification(s), \
564                {invite_count} invite event(s), \
565                expected {expected_event_count} total",
566            );
567
568            // We can stop looking once we've received the expected number of events from
569            // the sync. Since we can receive only events or invites for rooms but not both,
570            // and we're not taking into account invites from not subscribed rooms, this
571            // check should be accurate.
572            if event_count + invite_count == expected_event_count {
573                // We got the events.
574                break;
575            }
576
577            remaining_attempts -= 1;
578            warn!("There are some missing notifications, remaining attempts: {remaining_attempts}");
579            if remaining_attempts == 0 {
580                // We're out of luck.
581                break;
582            }
583        }
584
585        self.client.remove_event_handler(stripped_member_handler);
586        self.client.remove_event_handler(timeline_event_handler);
587
588        let mut notifications = raw_notifications.clone().lock().unwrap().clone();
589        let mut missing_event_ids = Vec::new();
590
591        // Create the list of missing event ids after the syncs.
592        for request in requests.iter() {
593            for event_id in &request.event_ids {
594                if !notifications.contains_key(event_id) {
595                    missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
596                }
597            }
598        }
599
600        // Try checking if the missing notifications could be invites.
601        for (room_id, missing_event_id) in missing_event_ids {
602            trace!("we didn't have a non-invite event, looking for invited room now");
603            if let Some(room) = self.client.get_room(&room_id) {
604                if room.state() == RoomState::Invited {
605                    if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
606                        notifications.insert(
607                            missing_event_id.to_owned(),
608                            (room_id.to_owned(), stripped_event),
609                        );
610                    }
611                } else {
612                    debug!("the room isn't in the invited state");
613                }
614            } else {
615                warn!(%room_id, "unknown room, can't check for invite events");
616            }
617        }
618
619        let found = if notifications.len() == expected_event_count { "" } else { "not " };
620        trace!("all notification events have{found} been found");
621
622        Ok(notifications)
623    }
624
625    pub async fn get_notification_with_sliding_sync(
626        &self,
627        room_id: &RoomId,
628        event_id: &EventId,
629    ) -> Result<NotificationStatus, Error> {
630        info!("fetching notification event with a sliding sync");
631
632        let request = NotificationItemsRequest {
633            room_id: room_id.to_owned(),
634            event_ids: vec![event_id.to_owned()],
635        };
636
637        let mut get_notifications_result =
638            self.get_notifications_with_sliding_sync(&[request]).await?;
639
640        get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
641    }
642
643    /// Given a (decrypted or not) event, figure out whether it should be
644    /// filtered out for other client-side reasons (such as the sender being
645    /// ignored, for instance), and returns the corresponding
646    /// [`NotificationStatus`].
647    async fn compute_status(
648        &self,
649        room: &Room,
650        push_actions: Option<&[Action]>,
651        raw_event: RawNotificationEvent,
652        state_events: Vec<Raw<AnyStateEvent>>,
653    ) -> Result<NotificationStatus, Error> {
654        if let Some(actions) = push_actions
655            && !actions.iter().any(|a| a.should_notify())
656        {
657            // The event shouldn't notify: return early.
658            return Ok(NotificationStatus::EventFilteredOut);
659        }
660
661        let notification_item =
662            NotificationItem::new(room, raw_event, push_actions, state_events).await?;
663
664        if self.client.is_user_ignored(notification_item.event.sender()).await {
665            Ok(NotificationStatus::EventFilteredOut)
666        } else {
667            Ok(NotificationStatus::Event(Box::new(notification_item)))
668        }
669    }
670
671    /// Get a list of full notifications, given a room id and event ids.
672    ///
673    /// This will run a small sliding sync to retrieve the content of the
674    /// events, along with extra data to form a rich notification context.
675    pub async fn get_notifications_with_sliding_sync(
676        &self,
677        requests: &[NotificationItemsRequest],
678    ) -> Result<BatchNotificationFetchingResult, Error> {
679        let raw_events = self.try_sliding_sync(requests).await?;
680
681        let mut batch_result = BatchNotificationFetchingResult::new();
682
683        for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
684            // At this point it should have been added by the sync, if it's not, give up.
685            let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
686
687            let Some(raw_event) = raw_event else {
688                // The event was not found, so we can't build a notification.
689                batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
690                continue;
691            };
692
693            let (raw_event, push_actions) = match &raw_event {
694                RawNotificationEvent::Timeline(timeline_event) => {
695                    // Check if the event is redacted first
696                    let event_for_redaction_check: AnySyncTimelineEvent =
697                        match timeline_event.deserialize() {
698                            Ok(event) => event,
699                            Err(_) => {
700                                batch_result.insert(event_id, Err(Error::InvalidRumaEvent));
701                                continue;
702                            }
703                        };
704
705                    if is_event_redacted(&event_for_redaction_check) {
706                        batch_result.insert(event_id, Ok(NotificationStatus::EventRedacted));
707                        continue;
708                    }
709
710                    // Timeline events may be encrypted, so make sure they get decrypted first.
711                    match self.retry_decryption(&room, timeline_event).await {
712                        Ok(Some(timeline_event)) => {
713                            let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
714                            (
715                                RawNotificationEvent::Timeline(timeline_event.into_raw()),
716                                push_actions,
717                            )
718                        }
719
720                        Ok(None) => {
721                            // The event was either not encrypted in the first place, or we
722                            // couldn't decrypt it after retrying. Use the raw event as is.
723                            match room.event_push_actions(timeline_event).await {
724                                Ok(push_actions) => (raw_event.clone(), push_actions),
725                                Err(err) => {
726                                    // Could not get push actions.
727                                    batch_result.insert(event_id, Err(err.into()));
728                                    continue;
729                                }
730                            }
731                        }
732
733                        Err(err) => {
734                            batch_result.insert(event_id, Err(err));
735                            continue;
736                        }
737                    }
738                }
739
740                RawNotificationEvent::Invite(invite_event) => {
741                    // Invite events can't be encrypted, so they should be in clear text.
742                    match room.event_push_actions(invite_event).await {
743                        Ok(push_actions) => {
744                            (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
745                        }
746                        Err(err) => {
747                            batch_result.insert(event_id, Err(err.into()));
748                            continue;
749                        }
750                    }
751                }
752            };
753
754            let notification_status_result =
755                self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
756
757            batch_result.insert(event_id, notification_status_result);
758        }
759
760        Ok(batch_result)
761    }
762
763    /// Retrieve a notification using a `/context` query.
764    ///
765    /// This is for clients that are already running other sliding syncs in the
766    /// same process, so that most of the contextual information for the
767    /// notification should already be there. In particular, the room containing
768    /// the event MUST be known (via a sliding sync for invites, or another
769    /// sliding sync).
770    ///
771    /// An error result means that we couldn't resolve the notification; in that
772    /// case, a dummy notification may be displayed instead. A `None` result
773    /// means the notification has been filtered out by the user's push
774    /// rules.
775    pub async fn get_notification_with_context(
776        &self,
777        room_id: &RoomId,
778        event_id: &EventId,
779    ) -> Result<NotificationStatus, Error> {
780        info!("fetching notification event with a /context query");
781
782        // See above comment.
783        let Some(room) = self.parent_client.get_room(room_id) else {
784            return Err(Error::UnknownRoom);
785        };
786
787        let response = room.event_with_context(event_id, true, uint!(0), None).await?;
788
789        let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
790        let state_events = response.state;
791
792        // Check if the event is redacted
793        let event_for_redaction_check: AnySyncTimelineEvent =
794            timeline_event.raw().deserialize().map_err(|_| Error::InvalidRumaEvent)?;
795
796        if is_event_redacted(&event_for_redaction_check) {
797            return Ok(NotificationStatus::EventRedacted);
798        }
799
800        if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
801            timeline_event = decrypted_event;
802        }
803
804        let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
805
806        self.compute_status(
807            &room,
808            push_actions.as_deref(),
809            RawNotificationEvent::Timeline(timeline_event.into_raw()),
810            state_events,
811        )
812        .await
813    }
814}
815
816fn is_event_encrypted(event_type: TimelineEventType) -> bool {
817    let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
818
819    #[cfg(feature = "unstable-msc3956")]
820    let is_still_encrypted =
821        is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
822
823    is_still_encrypted
824}
825
826fn is_event_redacted(event: &AnySyncTimelineEvent) -> bool {
827    // Check if the event is a message-like event but has no original content (i.e.,
828    // redacted)
829    match event {
830        AnySyncTimelineEvent::MessageLike(msg) => msg.is_redacted(),
831        _ => false,
832    }
833}
834
835#[derive(Debug)]
836pub enum NotificationStatus {
837    /// The event has been found and was not filtered out.
838    Event(Box<NotificationItem>),
839    /// The event couldn't be found in the network queries used to find it.
840    EventNotFound,
841    /// The event has been filtered out, either because of the user's push
842    /// rules, or because the user which triggered it is ignored by the
843    /// current user.
844    EventFilteredOut,
845    /// The event has been redacted and has no meaningful content.
846    EventRedacted,
847}
848
849#[derive(Debug, Clone)]
850pub struct NotificationItemsRequest {
851    pub room_id: OwnedRoomId,
852    pub event_ids: Vec<OwnedEventId>,
853}
854
855type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
856
857/// The Notification event as it was fetched from remote for the
858/// given `event_id`, represented as Raw but decrypted, thus only
859/// whether it is an invite or regular Timeline event has been
860/// determined.
861#[derive(Debug, Clone)]
862pub enum RawNotificationEvent {
863    /// The raw event for a timeline event
864    Timeline(Raw<AnySyncTimelineEvent>),
865    /// The notification contains an invitation with the given
866    /// StrippedRoomMemberEvent (in raw here)
867    Invite(Raw<StrippedRoomMemberEvent>),
868}
869
870/// The deserialized Event as it was fetched from remote for the
871/// given `event_id` and after decryption (if possible).
872#[derive(Debug)]
873pub enum NotificationEvent {
874    /// The Notification was for a TimelineEvent
875    Timeline(Box<AnySyncTimelineEvent>),
876    /// The Notification is an invite with the given stripped room event data
877    Invite(Box<StrippedRoomMemberEvent>),
878}
879
880impl NotificationEvent {
881    pub fn sender(&self) -> &UserId {
882        match self {
883            NotificationEvent::Timeline(ev) => ev.sender(),
884            NotificationEvent::Invite(ev) => &ev.sender,
885        }
886    }
887
888    /// Returns the root event id of the thread the notification event is in, if
889    /// any.
890    fn thread_id(&self) -> Option<OwnedEventId> {
891        let NotificationEvent::Timeline(sync_timeline_event) = &self else {
892            return None;
893        };
894        let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
895            return None;
896        };
897        let content = event.original_content()?;
898        match content {
899            AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
900                Relation::Thread(thread) => Some(thread.event_id),
901                _ => None,
902            },
903            _ => None,
904        }
905    }
906}
907
908/// A notification with its full content.
909#[derive(Debug)]
910pub struct NotificationItem {
911    /// Underlying Ruma event.
912    pub event: NotificationEvent,
913
914    /// The raw of the underlying event.
915    pub raw_event: RawNotificationEvent,
916
917    /// Display name of the sender.
918    pub sender_display_name: Option<String>,
919    /// Avatar URL of the sender.
920    pub sender_avatar_url: Option<String>,
921    /// Is the sender's name ambiguous?
922    pub is_sender_name_ambiguous: bool,
923
924    /// Room computed display name.
925    pub room_computed_display_name: String,
926    /// Room avatar URL.
927    pub room_avatar_url: Option<String>,
928    /// Room canonical alias.
929    pub room_canonical_alias: Option<String>,
930    /// Room topic.
931    pub room_topic: Option<String>,
932    /// Room join rule.
933    ///
934    /// Set to `None` if the join rule for this room is not available.
935    pub room_join_rule: Option<JoinRule>,
936    /// Is this room encrypted?
937    pub is_room_encrypted: Option<bool>,
938    /// Is this room considered a direct message?
939    pub is_direct_message_room: bool,
940    /// Numbers of members who joined the room.
941    pub joined_members_count: u64,
942    /// Number of service members in the room.
943    pub service_members: Vec<String>,
944    pub active_service_members_count: u64,
945    /// Is the room a space?
946    pub is_space: bool,
947
948    /// Is it a noisy notification? (i.e. does any push action contain a sound
949    /// action)
950    ///
951    /// It is set if and only if the push actions could be determined.
952    pub is_noisy: Option<bool>,
953    pub has_mention: Option<bool>,
954    pub thread_id: Option<OwnedEventId>,
955
956    /// The push actions for this notification (notify, sound, highlight, etc.).
957    pub actions: Option<Vec<Action>>,
958
959    /// Whether the room this notification is from is a DM or not.
960    pub room_is_dm: bool,
961}
962
963impl NotificationItem {
964    async fn new(
965        room: &Room,
966        raw_event: RawNotificationEvent,
967        push_actions: Option<&[Action]>,
968        state_events: Vec<Raw<AnyStateEvent>>,
969    ) -> Result<Self, Error> {
970        let event = match &raw_event {
971            RawNotificationEvent::Timeline(raw_event) => {
972                let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
973                if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
974                    SyncRoomMessageEvent::Original(ev),
975                )) = &mut event
976                {
977                    ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
978                }
979                NotificationEvent::Timeline(Box::new(event))
980            }
981            RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
982                raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
983            )),
984        };
985
986        let sender = match room.state() {
987            RoomState::Invited => room.invite_details().await?.inviter,
988            _ => room.get_member_no_sync(event.sender()).await?,
989        };
990
991        let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
992            match &sender {
993                Some(sender) => (
994                    sender.display_name().map(|s| s.to_owned()),
995                    sender.avatar_url().map(|s| s.to_string()),
996                    sender.name_ambiguous(),
997                ),
998                None => (None, None, false),
999            };
1000
1001        if sender_display_name.is_none() || sender_avatar_url.is_none() {
1002            let sender_id = event.sender();
1003            for ev in state_events {
1004                let ev = match ev.deserialize() {
1005                    Ok(ev) => ev,
1006                    Err(err) => {
1007                        warn!("Failed to deserialize a state event: {err}");
1008                        continue;
1009                    }
1010                };
1011                if ev.sender() != sender_id {
1012                    continue;
1013                }
1014                if let AnyStateEventContentChange::RoomMember(StateEventContentChange::Original {
1015                    content,
1016                    ..
1017                }) = ev.content_change()
1018                {
1019                    if sender_display_name.is_none() {
1020                        sender_display_name = content.displayname;
1021                    }
1022                    if sender_avatar_url.is_none() {
1023                        sender_avatar_url = content.avatar_url.map(|url| url.to_string());
1024                    }
1025                }
1026            }
1027        }
1028
1029        let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
1030        let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
1031        let thread_id = event.thread_id().clone();
1032        let service_members = room
1033            .service_members()
1034            .unwrap_or_default()
1035            .iter()
1036            .map(ToString::to_string)
1037            .collect_vec();
1038
1039        let active_service_members_count =
1040            room.update_active_service_members().await?.unwrap_or_default().len() as u64;
1041
1042        let item = NotificationItem {
1043            event,
1044            raw_event,
1045            sender_display_name,
1046            sender_avatar_url,
1047            is_sender_name_ambiguous,
1048            room_computed_display_name: room.display_name().await?.to_string(),
1049            room_avatar_url: room.avatar_url().map(|s| s.to_string()),
1050            room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
1051            room_topic: room.topic(),
1052            room_join_rule: room.join_rule(),
1053            is_direct_message_room: room.is_direct().await?,
1054            is_room_encrypted: room
1055                .latest_encryption_state()
1056                .await
1057                .map(|state| state.is_encrypted())
1058                .ok(),
1059            joined_members_count: room.joined_members_count(),
1060            service_members,
1061            active_service_members_count,
1062            is_space: room.is_space(),
1063            is_noisy,
1064            has_mention,
1065            thread_id,
1066            actions: push_actions.map(|actions| actions.to_vec()),
1067            room_is_dm: room.compute_is_dm().await?,
1068        };
1069
1070        Ok(item)
1071    }
1072
1073    /// Returns whether this room is public or not, based on the join rule.
1074    ///
1075    /// Maybe return `None` if the join rule is not available.
1076    pub fn is_public(&self) -> Option<bool> {
1077        self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
1078    }
1079}
1080
1081/// An error for the [`NotificationClient`].
1082#[derive(Debug, Error)]
1083pub enum Error {
1084    #[error(transparent)]
1085    BuildingLocalClient(ClientBuildError),
1086
1087    /// The room associated to this event wasn't found.
1088    #[error("unknown room for a notification")]
1089    UnknownRoom,
1090
1091    /// The Ruma event contained within this notification couldn't be parsed.
1092    #[error("invalid ruma event")]
1093    InvalidRumaEvent,
1094
1095    /// When calling `get_notification_with_sliding_sync`, the room was missing
1096    /// in the response.
1097    #[error("the sliding sync response doesn't include the target room")]
1098    SlidingSyncEmptyRoom,
1099
1100    #[error("the event was missing in the `/context` query")]
1101    ContextMissingEvent,
1102
1103    /// An error forwarded from the client.
1104    #[error(transparent)]
1105    SdkError(#[from] matrix_sdk::Error),
1106
1107    /// An error forwarded from the underlying state store.
1108    #[error(transparent)]
1109    StoreError(#[from] StoreError),
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114    use std::collections::BTreeMap;
1115
1116    use assert_matches2::assert_let;
1117    use matrix_sdk::test_utils::mocks::MatrixMockServer;
1118    use matrix_sdk_test::{ALICE, async_test, event_factory::EventFactory};
1119    use ruma::{
1120        api::client::sync::sync_events::v5,
1121        assign, event_id,
1122        events::room::{member::MembershipState, message::RedactedRoomMessageEventContent},
1123        owned_event_id, owned_room_id, room_id, user_id,
1124    };
1125
1126    use crate::notification_client::{
1127        NotificationClient, NotificationItem, NotificationItemsRequest, NotificationProcessSetup,
1128        NotificationStatus, RawNotificationEvent,
1129    };
1130
1131    #[async_test]
1132    async fn test_notification_item_returns_thread_id() {
1133        let server = MatrixMockServer::new().await;
1134        let client = server.client_builder().build().await;
1135
1136        let room_id = room_id!("!a:b.c");
1137        let thread_root_event_id = event_id!("$root:b.c");
1138        let message = EventFactory::new()
1139            .room(room_id)
1140            .sender(user_id!("@sender:b.c"))
1141            .text_msg("Threaded")
1142            .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1143            .into_raw_sync();
1144        let room = server.sync_joined_room(&client, room_id).await;
1145
1146        let raw_notification_event = RawNotificationEvent::Timeline(message);
1147        let notification_item =
1148            NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1149                .await
1150                .expect("Could not create notification item");
1151
1152        assert_let!(Some(thread_id) = notification_item.thread_id);
1153        assert_eq!(thread_id, thread_root_event_id);
1154    }
1155
1156    #[async_test]
1157    async fn test_try_sliding_sync_ignores_invites_for_non_subscribed_rooms() {
1158        let server = MatrixMockServer::new().await;
1159        let client = server.client_builder().build().await;
1160
1161        let user_id = client.user_id().unwrap();
1162        let room_id = room_id!("!a:b.c");
1163        let invite = EventFactory::new()
1164            .room(room_id)
1165            .member(user_id)
1166            .membership(MembershipState::Invite)
1167            .no_event_id()
1168            .into_raw_sync_state();
1169        let mut room = v5::response::Room::new();
1170        room.invite_state = Some(vec![invite.cast_unchecked()]);
1171        let rooms = BTreeMap::from_iter([(room_id.to_owned(), room)]);
1172        server
1173            .mock_sliding_sync()
1174            .ok(assign!(v5::Response::new("1".to_owned()), {
1175                rooms: rooms,
1176            }))
1177            .mount()
1178            .await;
1179
1180        let notification_client =
1181            NotificationClient::new(client.clone(), NotificationProcessSetup::MultipleProcesses)
1182                .await
1183                .expect("Could not create a notification client");
1184
1185        // Check we don't receive the invite for a different room, even if it was
1186        // included in the sync response
1187        let event_id = owned_event_id!("$a:b.c");
1188        let result = notification_client
1189            .try_sliding_sync(&[NotificationItemsRequest {
1190                room_id: owned_room_id!("!other:b.c"),
1191                event_ids: vec![event_id.clone()],
1192            }])
1193            .await
1194            .expect("Could not run sliding sync");
1195
1196        assert!(result.is_empty());
1197
1198        // Now try fetching the invite for the previously ignored room
1199        let result = notification_client
1200            .try_sliding_sync(&[NotificationItemsRequest {
1201                room_id: room_id.to_owned(),
1202                event_ids: vec![event_id.clone()],
1203            }])
1204            .await
1205            .expect("Could not run sliding sync");
1206
1207        // Check we did receive an event
1208        assert!(!result.is_empty());
1209
1210        // Try to assert it's the same event (since we don't have an event id)
1211        // We can check its room, sender and membership state
1212        let (in_room_id, event) = &result[&event_id];
1213        assert_eq!(room_id, in_room_id);
1214        assert_let!(Some(RawNotificationEvent::Invite(raw_invite)) = event);
1215
1216        let invite = raw_invite.deserialize().expect("Could not deserialize invite event");
1217        assert_eq!(invite.state_key, user_id.to_string());
1218        assert_eq!(invite.content.membership, MembershipState::Invite);
1219    }
1220
1221    #[async_test]
1222    async fn test_redacted_event_returns_event_redacted_status() {
1223        let server = MatrixMockServer::new().await;
1224        let client = server.client_builder().build().await;
1225
1226        let room_id = room_id!("!a:b.c");
1227
1228        // Create a redacted message event (no content)
1229        let event_id = owned_event_id!("$redacted:b.c");
1230        let redacted_event = EventFactory::new()
1231            .room(room_id)
1232            .sender(user_id!("@sender:b.c"))
1233            .redacted(&ALICE, RedactedRoomMessageEventContent::new())
1234            .event_id(&event_id)
1235            .into_raw();
1236        let mut room = v5::response::Room::new();
1237        room.timeline = vec![redacted_event];
1238
1239        let mut rooms = BTreeMap::new();
1240        rooms.insert(room_id.to_owned(), room);
1241
1242        server
1243            .mock_sliding_sync()
1244            .ok(assign!(v5::Response::new("1".to_owned()), {
1245                rooms: rooms,
1246            }))
1247            .mount()
1248            .await;
1249
1250        let notification_client =
1251            NotificationClient::new(client.clone(), NotificationProcessSetup::MultipleProcesses)
1252                .await
1253                .expect("Could not create a notification client");
1254
1255        let result: NotificationStatus = notification_client
1256            .get_notification_with_sliding_sync(room_id, &event_id)
1257            .await
1258            .expect("Could not get notification");
1259
1260        match result {
1261            NotificationStatus::EventRedacted => {
1262                // Success - redacted event was properly detected
1263            }
1264            other => panic!("Expected EventRedacted, got {:?}", other),
1265        }
1266    }
1267}