matrix_sdk_ui/
notification_client.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeMap,
17    sync::{Arc, Mutex},
18    time::Duration,
19};
20
21use futures_util::{StreamExt as _, pin_mut};
22use matrix_sdk::{
23    Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
24};
25use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
26use ruma::{
27    EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
28    api::client::sync::sync_events::v5 as http,
29    assign,
30    events::{
31        AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
32        AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
33        TimelineEventType,
34        room::{
35            encrypted::OriginalSyncRoomEncryptedEvent,
36            join_rules::JoinRule,
37            member::{MembershipState, StrippedRoomMemberEvent},
38            message::{Relation, SyncRoomMessageEvent},
39        },
40    },
41    html::RemoveReplyFallback,
42    push::Action,
43    serde::Raw,
44    uint,
45};
46use thiserror::Error;
47use tokio::sync::Mutex as AsyncMutex;
48use tracing::{debug, info, instrument, trace, warn};
49
50use crate::{
51    DEFAULT_SANITIZER_MODE,
52    encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
53    sync_service::SyncService,
54};
55
56/// What kind of process setup do we have for this notification client?
57#[derive(Clone)]
58pub enum NotificationProcessSetup {
59    /// The notification client may run on a separate process than the rest of
60    /// the app.
61    ///
62    /// For instance, this is the case on iOS, where notifications are handled
63    /// in a separate process (the Notification Service Extension, aka NSE).
64    ///
65    /// In that case, a cross-process lock will be used to coordinate writes
66    /// into the stores handled by the SDK.
67    MultipleProcesses,
68
69    /// The notification client runs in the same process as the rest of the
70    /// `Client` performing syncs.
71    ///
72    /// For instance, this is the case on Android, where a notification will
73    /// wake up the main app process.
74    ///
75    /// In that case, a smart reference to the [`SyncService`] must be provided.
76    SingleProcess { sync_service: Arc<SyncService> },
77}
78
79/// A client specialized for handling push notifications received over the
80/// network, for an app.
81///
82/// In particular, it takes care of running a full decryption sync, in case the
83/// event in the notification was impossible to decrypt beforehand.
84pub struct NotificationClient {
85    /// SDK client that uses an in-memory state store.
86    client: Client,
87
88    /// SDK client that uses the same state store as the caller's context.
89    parent_client: Client,
90
91    /// Is the notification client running on its own process or not?
92    process_setup: NotificationProcessSetup,
93
94    /// A mutex to serialize requests to the notifications sliding sync.
95    ///
96    /// If several notifications come in at the same time (e.g. network was
97    /// unreachable because of airplane mode or something similar), then we
98    /// need to make sure that repeated calls to `get_notification` won't
99    /// cause multiple requests with the same `conn_id` we're using for
100    /// notifications. This mutex solves this by sequentializing the requests.
101    notification_sync_mutex: AsyncMutex<()>,
102
103    /// A mutex to serialize requests to the encryption sliding sync that's used
104    /// in case we didn't have the keys to decipher an event.
105    ///
106    /// Same reasoning as [`Self::notification_sync_mutex`].
107    encryption_sync_mutex: AsyncMutex<()>,
108}
109
110impl NotificationClient {
111    const CONNECTION_ID: &'static str = "notifications";
112    const LOCK_ID: &'static str = "notifications";
113
114    /// Create a new notification client.
115    pub async fn new(
116        parent_client: Client,
117        process_setup: NotificationProcessSetup,
118    ) -> Result<Self, Error> {
119        let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
120
121        Ok(NotificationClient {
122            client,
123            parent_client,
124            notification_sync_mutex: AsyncMutex::new(()),
125            encryption_sync_mutex: AsyncMutex::new(()),
126            process_setup,
127        })
128    }
129
130    /// Fetches a room by its ID using the in-memory state store backed client.
131    /// Useful to retrieve room information after running the limited
132    /// notification client sliding sync loop.
133    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
134        self.client.get_room(room_id)
135    }
136
137    /// Fetches the content of a notification.
138    ///
139    /// This will first try to get the notification using a short-lived sliding
140    /// sync, and if the sliding-sync can't find the event, then it'll use a
141    /// `/context` query to find the event with associated member information.
142    ///
143    /// An error result means that we couldn't resolve the notification; in that
144    /// case, a dummy notification may be displayed instead.
145    #[instrument(skip(self))]
146    pub async fn get_notification(
147        &self,
148        room_id: &RoomId,
149        event_id: &EventId,
150    ) -> Result<NotificationStatus, Error> {
151        let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
152        match status {
153            NotificationStatus::Event(..) | NotificationStatus::EventFilteredOut => Ok(status),
154            NotificationStatus::EventNotFound => {
155                self.get_notification_with_context(room_id, event_id).await
156            }
157        }
158    }
159
160    /// Fetches the content of several notifications.
161    ///
162    /// This will first try to get the notifications using a short-lived sliding
163    /// sync, and if the sliding-sync can't find the events, then it'll use a
164    /// `/context` query to find the events with associated member information.
165    ///
166    /// An error result at the top level means that something failed when trying
167    /// to set up the notification fetching.
168    ///
169    /// For each notification item you can also receive an error, which means
170    /// something failed when trying to fetch that particular notification
171    /// (decryption, fetching push actions, etc.); in that case, a dummy
172    /// notification may be displayed instead.
173    pub async fn get_notifications(
174        &self,
175        requests: &[NotificationItemsRequest],
176    ) -> Result<BatchNotificationFetchingResult, Error> {
177        let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
178
179        for request in requests {
180            for event_id in &request.event_ids {
181                match notifications.get_mut(event_id) {
182                    // If the notification for a given event wasn't found with sliding sync, try
183                    // with a /context for each event.
184                    Some(Ok(NotificationStatus::EventNotFound)) | None => {
185                        notifications.insert(
186                            event_id.to_owned(),
187                            self.get_notification_with_context(&request.room_id, event_id).await,
188                        );
189                    }
190
191                    _ => {}
192                }
193            }
194        }
195
196        Ok(notifications)
197    }
198
199    /// Run an encryption sync loop, in case an event is still encrypted.
200    ///
201    /// Will return `Ok(Some)` if and only if:
202    /// - the event was encrypted,
203    /// - we successfully ran an encryption sync or waited long enough for an
204    ///   existing encryption sync to decrypt the event.
205    ///
206    /// Otherwise, if the event was not encrypted, or couldn't be decrypted
207    /// (without causing a fatal error), will return `Ok(None)`.
208    #[instrument(skip_all)]
209    async fn retry_decryption(
210        &self,
211        room: &Room,
212        raw_event: &Raw<AnySyncTimelineEvent>,
213    ) -> Result<Option<TimelineEvent>, Error> {
214        let event: AnySyncTimelineEvent =
215            raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
216
217        if !is_event_encrypted(event.event_type()) {
218            return Ok(None);
219        }
220
221        // Serialize calls to this function.
222        let _guard = self.encryption_sync_mutex.lock().await;
223
224        // The message is still encrypted, and the client is configured to retry
225        // decryption.
226        //
227        // Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop:
228        // - the first iteration allows to get SS events as well as send e2ee requests.
229        // - the second one let the SS homeserver forward events triggered by the
230        //   sending of e2ee requests.
231        //
232        // Keep timeouts small for both, since we might be short on time.
233
234        let with_locking = WithLocking::from(matches!(
235            self.process_setup,
236            NotificationProcessSetup::MultipleProcesses
237        ));
238
239        let push_ctx = room.push_context().await?;
240        let sync_permit_guard = match &self.process_setup {
241            NotificationProcessSetup::MultipleProcesses => {
242                // We're running on our own process, dedicated for notifications. In that case,
243                // create a dummy sync permit; we're guaranteed there's at most one since we've
244                // acquired the `encryption_sync_mutex' lock here.
245                let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
246                sync_permit.lock_owned().await
247            }
248
249            NotificationProcessSetup::SingleProcess { sync_service } => {
250                if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
251                    permit_guard
252                } else {
253                    // There's already a sync service active, thus the encryption sync is already
254                    // running elsewhere. As a matter of fact, if the event was encrypted, that
255                    // means we were racing against the encryption sync. Wait a bit, attempt to
256                    // decrypt, and carry on.
257
258                    // We repeat the sleep 3 times at most, each iteration we
259                    // double the amount of time waited, so overall we may wait up to 7 times this
260                    // amount.
261                    let mut wait = 200;
262
263                    debug!("Encryption sync running in background");
264                    for _ in 0..3 {
265                        trace!("waiting for decryption…");
266
267                        sleep(Duration::from_millis(wait)).await;
268
269                        // Note: We specify the cast type in case the
270                        // `experimental-encrypted-state-events` feature is enabled, which provides
271                        // multiple cast implementations.
272                        let new_event = room
273                            .decrypt_event(
274                                raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
275                                push_ctx.as_ref(),
276                            )
277                            .await?;
278
279                        match new_event.kind {
280                            matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
281                                utd_info, ..} => {
282                                if utd_info.reason.is_missing_room_key() {
283                                    // Decryption error that could be caused by a missing room
284                                    // key; retry in a few.
285                                    wait *= 2;
286                                } else {
287                                    debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
288                                    return Ok(None);
289                                }
290                            }
291                            _ => {
292                                trace!("Waiting succeeded and event could be decrypted!");
293                                return Ok(Some(new_event));
294                            }
295                        }
296                    }
297
298                    // We couldn't decrypt the event after waiting a few times, abort.
299                    debug!("Timeout waiting for the encryption sync to decrypt notification.");
300                    return Ok(None);
301                }
302            }
303        };
304
305        let encryption_sync = EncryptionSyncService::new(
306            self.client.clone(),
307            Some((Duration::from_secs(3), Duration::from_secs(4))),
308            with_locking,
309        )
310        .await;
311
312        // Just log out errors, but don't have them abort the notification processing:
313        // an undecrypted notification is still better than no
314        // notifications.
315
316        match encryption_sync {
317            Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
318                // Note: We specify the cast type in case the
319                // `experimental-encrypted-state-events` feature is enabled, which provides
320                // multiple cast implementations.
321                Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
322                    Ok(new_event) => match new_event.kind {
323                        matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
324                            utd_info, ..
325                        } => {
326                            trace!(
327                                "Encryption sync failed to decrypt the event: {:?}",
328                                utd_info.reason
329                            );
330                            Ok(None)
331                        }
332                        _ => {
333                            trace!("Encryption sync managed to decrypt the event.");
334                            Ok(Some(new_event))
335                        }
336                    },
337                    Err(err) => {
338                        trace!("Encryption sync failed to decrypt the event: {err}");
339                        Ok(None)
340                    }
341                },
342                Err(err) => {
343                    warn!("Encryption sync error: {err:#}");
344                    Ok(None)
345                }
346            },
347            Err(err) => {
348                warn!("Encryption sync build error: {err:#}",);
349                Ok(None)
350            }
351        }
352    }
353
354    /// Try to run a sliding sync (without encryption) to retrieve the events
355    /// from the notification.
356    ///
357    /// An event can either be:
358    /// - an invite event,
359    /// - or a non-invite event.
360    ///
361    /// In case it's a non-invite event, it's rather easy: we'll request
362    /// explicit state that'll be useful for building the
363    /// `NotificationItem`, and subscribe to the room which the notification
364    /// relates to.
365    ///
366    /// In case it's an invite-event, it's trickier because the stripped event
367    /// may not contain the event id, so we can't just match on it. Rather,
368    /// we look at stripped room member events that may be fitting (i.e.
369    /// match the current user and are invites), and if the SDK concludes the
370    /// room was in the invited state, and we didn't find the event by id,
371    /// *then* we'll use that stripped room member event.
372    #[instrument(skip_all)]
373    async fn try_sliding_sync(
374        &self,
375        requests: &[NotificationItemsRequest],
376    ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
377        // Serialize all the calls to this method by taking a lock at the beginning,
378        // that will be dropped later.
379        let _guard = self.notification_sync_mutex.lock().await;
380
381        // Set up a sliding sync that only subscribes to the room that had the
382        // notification, so we can figure out the full event and associated
383        // information.
384
385        let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
386
387        let handler_raw_notification = raw_notifications.clone();
388
389        let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
390
391        let timeline_event_handler = self.client.add_event_handler({
392            let requests = requests.clone();
393            move |raw: Raw<AnySyncTimelineEvent>| async move {
394                match &raw.get_field::<OwnedEventId>("event_id") {
395                    Ok(Some(event_id)) => {
396                        let request =
397                            &requests.iter().find(|request| request.event_ids.contains(event_id));
398                        if request.is_none() {
399                            return;
400                        }
401                        let room_id = request.unwrap().room_id.clone();
402                        for request in requests.iter() {
403                            if request.event_ids.contains(event_id) {
404                                // found it! There shouldn't be a previous event before, but if
405                                // there is, that should be ok to
406                                // just replace it.
407                                handler_raw_notification.lock().unwrap().insert(
408                                    event_id.to_owned(),
409                                    (room_id, Some(RawNotificationEvent::Timeline(raw))),
410                                );
411                                return;
412                            }
413                        }
414                    }
415                    Ok(None) => {
416                        warn!("a sync event had no event id");
417                    }
418                    Err(err) => {
419                        warn!("failed to deserialize sync event id: {err}");
420                    }
421                }
422            }
423        });
424
425        // We'll only use this event if the room is in the invited state.
426        let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
427
428        let user_id = self.client.user_id().unwrap().to_owned();
429        let handler_raw_invites = raw_invites.clone();
430        let handler_raw_notifications = raw_notifications.clone();
431        let stripped_member_handler = self.client.add_event_handler({
432            let requests = requests.clone();
433            move |raw: Raw<StrippedRoomMemberEvent>| async move {
434                let deserialized = match raw.deserialize() {
435                    Ok(d) => d,
436                    Err(err) => {
437                        warn!("failed to deserialize raw stripped room member event: {err}");
438                        return;
439                    }
440                };
441
442                trace!("received a stripped room member event");
443
444                // Try to match the event by event_id, as it's the most precise. In theory, we
445                // shouldn't receive it, so that's a first attempt.
446                match &raw.get_field::<OwnedEventId>("event_id") {
447                    Ok(Some(event_id)) => {
448                        let request =
449                            &requests.iter().find(|request| request.event_ids.contains(event_id));
450                        if request.is_none() {
451                            return;
452                        }
453                        let room_id = request.unwrap().room_id.clone();
454
455                        // found it! There shouldn't be a previous event before, but if
456                        // there is, that should be ok to
457                        // just replace it.
458                        handler_raw_notifications.lock().unwrap().insert(
459                            event_id.to_owned(),
460                            (room_id, Some(RawNotificationEvent::Invite(raw))),
461                        );
462                        return;
463                    }
464                    Ok(None) => {
465                        warn!("a room member event had no id");
466                    }
467                    Err(err) => {
468                        warn!("failed to deserialize room member event id: {err}");
469                    }
470                }
471
472                // Try to match the event by membership and state_key for the current user.
473                if deserialized.content.membership == MembershipState::Invite
474                    && deserialized.state_key == user_id
475                {
476                    trace!("found an invite event for the current user");
477                    // This could be it! There might be several of these following each other, so
478                    // assume it's the latest one (in sync ordering), and override a previous one if
479                    // present.
480                    handler_raw_invites
481                        .lock()
482                        .unwrap()
483                        .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
484                } else {
485                    trace!("not an invite event, or not for the current user");
486                }
487            }
488        });
489
490        // Room power levels are necessary to build the push context.
491        let required_state = vec![
492            (StateEventType::RoomEncryption, "".to_owned()),
493            (StateEventType::RoomMember, "$LAZY".to_owned()),
494            (StateEventType::RoomMember, "$ME".to_owned()),
495            (StateEventType::RoomCanonicalAlias, "".to_owned()),
496            (StateEventType::RoomName, "".to_owned()),
497            (StateEventType::RoomPowerLevels, "".to_owned()),
498            (StateEventType::RoomJoinRules, "".to_owned()),
499            (StateEventType::CallMember, "*".to_owned()),
500            (StateEventType::RoomCreate, "".to_owned()),
501        ];
502
503        let invites = SlidingSyncList::builder("invites")
504            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
505            .timeline_limit(8)
506            .required_state(required_state.clone())
507            .filters(Some(assign!(http::request::ListFilters::default(), {
508                is_invite: Some(true),
509            })));
510
511        let sync = self
512            .client
513            .sliding_sync(Self::CONNECTION_ID)?
514            .poll_timeout(Duration::from_secs(1))
515            .network_timeout(Duration::from_secs(3))
516            .with_account_data_extension(
517                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
518            )
519            .add_list(invites)
520            .build()
521            .await?;
522
523        let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
524        sync.subscribe_to_rooms(
525            &room_ids,
526            Some(assign!(http::request::RoomSubscription::default(), {
527                required_state,
528                timeline_limit: uint!(16)
529            })),
530            true,
531        );
532
533        let mut remaining_attempts = 3;
534
535        let stream = sync.sync();
536        pin_mut!(stream);
537
538        // Sum the expected event count for each room
539        let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
540
541        loop {
542            if stream.next().await.is_none() {
543                // Sliding sync aborted early.
544                break;
545            }
546
547            if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
548                == expected_event_count
549            {
550                // We got the events.
551                break;
552            }
553
554            remaining_attempts -= 1;
555            if remaining_attempts == 0 {
556                // We're out of luck.
557                break;
558            }
559        }
560
561        self.client.remove_event_handler(stripped_member_handler);
562        self.client.remove_event_handler(timeline_event_handler);
563
564        let mut notifications = raw_notifications.clone().lock().unwrap().clone();
565        let mut missing_event_ids = Vec::new();
566
567        // Create the list of missing event ids after the syncs.
568        for request in requests.iter() {
569            for event_id in &request.event_ids {
570                if !notifications.contains_key(event_id) {
571                    missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
572                }
573            }
574        }
575
576        // Try checking if the missing notifications could be invites.
577        for (room_id, missing_event_id) in missing_event_ids {
578            trace!("we didn't have a non-invite event, looking for invited room now");
579            if let Some(room) = self.client.get_room(&room_id) {
580                if room.state() == RoomState::Invited {
581                    if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
582                        notifications.insert(
583                            missing_event_id.to_owned(),
584                            (room_id.to_owned(), stripped_event),
585                        );
586                    }
587                } else {
588                    debug!("the room isn't in the invited state");
589                }
590            } else {
591                warn!(%room_id, "unknown room, can't check for invite events");
592            }
593        }
594
595        let found = if notifications.len() == expected_event_count { "" } else { "not " };
596        trace!("all notification events have{found} been found");
597
598        Ok(notifications)
599    }
600
601    pub async fn get_notification_with_sliding_sync(
602        &self,
603        room_id: &RoomId,
604        event_id: &EventId,
605    ) -> Result<NotificationStatus, Error> {
606        info!("fetching notification event with a sliding sync");
607
608        let request = NotificationItemsRequest {
609            room_id: room_id.to_owned(),
610            event_ids: vec![event_id.to_owned()],
611        };
612
613        let mut get_notifications_result =
614            self.get_notifications_with_sliding_sync(&[request]).await?;
615
616        get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
617    }
618
619    /// Given a (decrypted or not) event, figure out whether it should be
620    /// filtered out for other client-side reasons (such as the sender being
621    /// ignored, for instance), and returns the corresponding
622    /// [`NotificationStatus`].
623    async fn compute_status(
624        &self,
625        room: &Room,
626        push_actions: Option<&[Action]>,
627        raw_event: RawNotificationEvent,
628        state_events: Vec<Raw<AnyStateEvent>>,
629    ) -> Result<NotificationStatus, Error> {
630        if let Some(actions) = push_actions
631            && !actions.iter().any(|a| a.should_notify())
632        {
633            // The event shouldn't notify: return early.
634            return Ok(NotificationStatus::EventFilteredOut);
635        }
636
637        let notification_item =
638            NotificationItem::new(room, raw_event, push_actions, state_events).await?;
639
640        if self.client.is_user_ignored(notification_item.event.sender()).await {
641            Ok(NotificationStatus::EventFilteredOut)
642        } else {
643            Ok(NotificationStatus::Event(Box::new(notification_item)))
644        }
645    }
646
647    /// Get a list of full notifications, given a room id and event ids.
648    ///
649    /// This will run a small sliding sync to retrieve the content of the
650    /// events, along with extra data to form a rich notification context.
651    pub async fn get_notifications_with_sliding_sync(
652        &self,
653        requests: &[NotificationItemsRequest],
654    ) -> Result<BatchNotificationFetchingResult, Error> {
655        let raw_events = self.try_sliding_sync(requests).await?;
656
657        let mut batch_result = BatchNotificationFetchingResult::new();
658
659        for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
660            // At this point it should have been added by the sync, if it's not, give up.
661            let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
662
663            let Some(raw_event) = raw_event else {
664                // The event was not found, so we can't build a notification.
665                batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
666                continue;
667            };
668
669            let (raw_event, push_actions) = match &raw_event {
670                RawNotificationEvent::Timeline(timeline_event) => {
671                    // Timeline events may be encrypted, so make sure they get decrypted first.
672                    match self.retry_decryption(&room, timeline_event).await {
673                        Ok(Some(timeline_event)) => {
674                            let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
675                            (
676                                RawNotificationEvent::Timeline(timeline_event.into_raw()),
677                                push_actions,
678                            )
679                        }
680
681                        Ok(None) => {
682                            // The event was either not encrypted in the first place, or we
683                            // couldn't decrypt it after retrying. Use the raw event as is.
684                            match room.event_push_actions(timeline_event).await {
685                                Ok(push_actions) => (raw_event.clone(), push_actions),
686                                Err(err) => {
687                                    // Could not get push actions.
688                                    batch_result.insert(event_id, Err(err.into()));
689                                    continue;
690                                }
691                            }
692                        }
693
694                        Err(err) => {
695                            batch_result.insert(event_id, Err(err));
696                            continue;
697                        }
698                    }
699                }
700
701                RawNotificationEvent::Invite(invite_event) => {
702                    // Invite events can't be encrypted, so they should be in clear text.
703                    match room.event_push_actions(invite_event).await {
704                        Ok(push_actions) => {
705                            (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
706                        }
707                        Err(err) => {
708                            batch_result.insert(event_id, Err(err.into()));
709                            continue;
710                        }
711                    }
712                }
713            };
714
715            let notification_status_result =
716                self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
717
718            batch_result.insert(event_id, notification_status_result);
719        }
720
721        Ok(batch_result)
722    }
723
724    /// Retrieve a notification using a `/context` query.
725    ///
726    /// This is for clients that are already running other sliding syncs in the
727    /// same process, so that most of the contextual information for the
728    /// notification should already be there. In particular, the room containing
729    /// the event MUST be known (via a sliding sync for invites, or another
730    /// sliding sync).
731    ///
732    /// An error result means that we couldn't resolve the notification; in that
733    /// case, a dummy notification may be displayed instead. A `None` result
734    /// means the notification has been filtered out by the user's push
735    /// rules.
736    pub async fn get_notification_with_context(
737        &self,
738        room_id: &RoomId,
739        event_id: &EventId,
740    ) -> Result<NotificationStatus, Error> {
741        info!("fetching notification event with a /context query");
742
743        // See above comment.
744        let Some(room) = self.parent_client.get_room(room_id) else {
745            return Err(Error::UnknownRoom);
746        };
747
748        let response = room.event_with_context(event_id, true, uint!(0), None).await?;
749
750        let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
751        let state_events = response.state;
752
753        if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
754            timeline_event = decrypted_event;
755        }
756
757        let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
758
759        self.compute_status(
760            &room,
761            push_actions.as_deref(),
762            RawNotificationEvent::Timeline(timeline_event.into_raw()),
763            state_events,
764        )
765        .await
766    }
767}
768
769fn is_event_encrypted(event_type: TimelineEventType) -> bool {
770    let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
771
772    #[cfg(feature = "unstable-msc3956")]
773    let is_still_encrypted =
774        is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
775
776    is_still_encrypted
777}
778
779#[derive(Debug)]
780pub enum NotificationStatus {
781    /// The event has been found and was not filtered out.
782    Event(Box<NotificationItem>),
783    /// The event couldn't be found in the network queries used to find it.
784    EventNotFound,
785    /// The event has been filtered out, either because of the user's push
786    /// rules, or because the user which triggered it is ignored by the
787    /// current user.
788    EventFilteredOut,
789}
790
791#[derive(Debug, Clone)]
792pub struct NotificationItemsRequest {
793    pub room_id: OwnedRoomId,
794    pub event_ids: Vec<OwnedEventId>,
795}
796
797type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
798
799/// The Notification event as it was fetched from remote for the
800/// given `event_id`, represented as Raw but decrypted, thus only
801/// whether it is an invite or regular Timeline event has been
802/// determined.
803#[derive(Debug, Clone)]
804pub enum RawNotificationEvent {
805    /// The raw event for a timeline event
806    Timeline(Raw<AnySyncTimelineEvent>),
807    /// The notification contains an invitation with the given
808    /// StrippedRoomMemberEvent (in raw here)
809    Invite(Raw<StrippedRoomMemberEvent>),
810}
811
812/// The deserialized Event as it was fetched from remote for the
813/// given `event_id` and after decryption (if possible).
814#[derive(Debug)]
815pub enum NotificationEvent {
816    /// The Notification was for a TimelineEvent
817    Timeline(Box<AnySyncTimelineEvent>),
818    /// The Notification is an invite with the given stripped room event data
819    Invite(Box<StrippedRoomMemberEvent>),
820}
821
822impl NotificationEvent {
823    pub fn sender(&self) -> &UserId {
824        match self {
825            NotificationEvent::Timeline(ev) => ev.sender(),
826            NotificationEvent::Invite(ev) => &ev.sender,
827        }
828    }
829
830    /// Returns the root event id of the thread the notification event is in, if
831    /// any.
832    fn thread_id(&self) -> Option<OwnedEventId> {
833        let NotificationEvent::Timeline(sync_timeline_event) = &self else {
834            return None;
835        };
836        let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
837            return None;
838        };
839        let content = event.original_content()?;
840        match content {
841            AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
842                Relation::Thread(thread) => Some(thread.event_id),
843                _ => None,
844            },
845            _ => None,
846        }
847    }
848}
849
850/// A notification with its full content.
851#[derive(Debug)]
852pub struct NotificationItem {
853    /// Underlying Ruma event.
854    pub event: NotificationEvent,
855
856    /// The raw of the underlying event.
857    pub raw_event: RawNotificationEvent,
858
859    /// Display name of the sender.
860    pub sender_display_name: Option<String>,
861    /// Avatar URL of the sender.
862    pub sender_avatar_url: Option<String>,
863    /// Is the sender's name ambiguous?
864    pub is_sender_name_ambiguous: bool,
865
866    /// Room computed display name.
867    pub room_computed_display_name: String,
868    /// Room avatar URL.
869    pub room_avatar_url: Option<String>,
870    /// Room canonical alias.
871    pub room_canonical_alias: Option<String>,
872    /// Room topic.
873    pub room_topic: Option<String>,
874    /// Room join rule.
875    ///
876    /// Set to `None` if the join rule for this room is not available.
877    pub room_join_rule: Option<JoinRule>,
878    /// Is this room encrypted?
879    pub is_room_encrypted: Option<bool>,
880    /// Is this room considered a direct message?
881    pub is_direct_message_room: bool,
882    /// Numbers of members who joined the room.
883    pub joined_members_count: u64,
884
885    /// Is it a noisy notification? (i.e. does any push action contain a sound
886    /// action)
887    ///
888    /// It is set if and only if the push actions could be determined.
889    pub is_noisy: Option<bool>,
890    pub has_mention: Option<bool>,
891    pub thread_id: Option<OwnedEventId>,
892}
893
894impl NotificationItem {
895    async fn new(
896        room: &Room,
897        raw_event: RawNotificationEvent,
898        push_actions: Option<&[Action]>,
899        state_events: Vec<Raw<AnyStateEvent>>,
900    ) -> Result<Self, Error> {
901        let event = match &raw_event {
902            RawNotificationEvent::Timeline(raw_event) => {
903                let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
904                if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
905                    SyncRoomMessageEvent::Original(ev),
906                )) = &mut event
907                {
908                    ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
909                }
910                NotificationEvent::Timeline(Box::new(event))
911            }
912            RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
913                raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
914            )),
915        };
916
917        let sender = match room.state() {
918            RoomState::Invited => room.invite_details().await?.inviter,
919            _ => room.get_member_no_sync(event.sender()).await?,
920        };
921
922        let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
923            match &sender {
924                Some(sender) => (
925                    sender.display_name().map(|s| s.to_owned()),
926                    sender.avatar_url().map(|s| s.to_string()),
927                    sender.name_ambiguous(),
928                ),
929                None => (None, None, false),
930            };
931
932        if sender_display_name.is_none() || sender_avatar_url.is_none() {
933            let sender_id = event.sender();
934            for ev in state_events {
935                let ev = match ev.deserialize() {
936                    Ok(ev) => ev,
937                    Err(err) => {
938                        warn!("Failed to deserialize a state event: {err}");
939                        continue;
940                    }
941                };
942                if ev.sender() != sender_id {
943                    continue;
944                }
945                if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
946                    content,
947                    ..
948                }) = ev.content()
949                {
950                    if sender_display_name.is_none() {
951                        sender_display_name = content.displayname;
952                    }
953                    if sender_avatar_url.is_none() {
954                        sender_avatar_url = content.avatar_url.map(|url| url.to_string());
955                    }
956                }
957            }
958        }
959
960        let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
961        let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
962        let thread_id = event.thread_id().clone();
963
964        let item = NotificationItem {
965            event,
966            raw_event,
967            sender_display_name,
968            sender_avatar_url,
969            is_sender_name_ambiguous,
970            room_computed_display_name: room.display_name().await?.to_string(),
971            room_avatar_url: room.avatar_url().map(|s| s.to_string()),
972            room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
973            room_topic: room.topic(),
974            room_join_rule: room.join_rule(),
975            is_direct_message_room: room.is_direct().await?,
976            is_room_encrypted: room
977                .latest_encryption_state()
978                .await
979                .map(|state| state.is_encrypted())
980                .ok(),
981            joined_members_count: room.joined_members_count(),
982            is_noisy,
983            has_mention,
984            thread_id,
985        };
986
987        Ok(item)
988    }
989
990    /// Returns whether this room is public or not, based on the join rule.
991    ///
992    /// Maybe return `None` if the join rule is not available.
993    pub fn is_public(&self) -> Option<bool> {
994        self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
995    }
996}
997
998/// An error for the [`NotificationClient`].
999#[derive(Debug, Error)]
1000pub enum Error {
1001    #[error(transparent)]
1002    BuildingLocalClient(ClientBuildError),
1003
1004    /// The room associated to this event wasn't found.
1005    #[error("unknown room for a notification")]
1006    UnknownRoom,
1007
1008    /// The Ruma event contained within this notification couldn't be parsed.
1009    #[error("invalid ruma event")]
1010    InvalidRumaEvent,
1011
1012    /// When calling `get_notification_with_sliding_sync`, the room was missing
1013    /// in the response.
1014    #[error("the sliding sync response doesn't include the target room")]
1015    SlidingSyncEmptyRoom,
1016
1017    #[error("the event was missing in the `/context` query")]
1018    ContextMissingEvent,
1019
1020    /// An error forwarded from the client.
1021    #[error(transparent)]
1022    SdkError(#[from] matrix_sdk::Error),
1023
1024    /// An error forwarded from the underlying state store.
1025    #[error(transparent)]
1026    StoreError(#[from] StoreError),
1027}
1028
1029#[cfg(test)]
1030mod tests {
1031    use assert_matches2::assert_let;
1032    use matrix_sdk::test_utils::mocks::MatrixMockServer;
1033    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1034    use ruma::{event_id, room_id, user_id};
1035
1036    use crate::notification_client::{NotificationItem, RawNotificationEvent};
1037
1038    #[async_test]
1039    async fn test_notification_item_returns_thread_id() {
1040        let server = MatrixMockServer::new().await;
1041        let client = server.client_builder().build().await;
1042
1043        let room_id = room_id!("!a:b.c");
1044        let thread_root_event_id = event_id!("$root:b.c");
1045        let message = EventFactory::new()
1046            .room(room_id)
1047            .sender(user_id!("@sender:b.c"))
1048            .text_msg("Threaded")
1049            .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1050            .into_raw_sync();
1051        let room = server.sync_joined_room(&client, room_id).await;
1052
1053        let raw_notification_event = RawNotificationEvent::Timeline(message);
1054        let notification_item =
1055            NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1056                .await
1057                .expect("Could not create notification item");
1058
1059        assert_let!(Some(thread_id) = notification_item.thread_id);
1060        assert_eq!(thread_id, thread_root_event_id);
1061    }
1062}