matrix_sdk_ui/
notification_client.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeMap,
17    sync::{Arc, Mutex},
18    time::Duration,
19};
20
21use futures_util::{StreamExt as _, pin_mut};
22use matrix_sdk::{
23    Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
24};
25use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
26use ruma::{
27    EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
28    api::client::sync::sync_events::v5 as http,
29    assign,
30    events::{
31        AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
32        AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
33        TimelineEventType,
34        room::{
35            encrypted::OriginalSyncRoomEncryptedEvent,
36            join_rules::JoinRule,
37            member::{MembershipState, StrippedRoomMemberEvent},
38            message::{Relation, SyncRoomMessageEvent},
39        },
40    },
41    html::RemoveReplyFallback,
42    push::Action,
43    serde::Raw,
44    uint,
45};
46use thiserror::Error;
47use tokio::sync::Mutex as AsyncMutex;
48use tracing::{debug, info, instrument, trace, warn};
49
50use crate::{
51    DEFAULT_SANITIZER_MODE,
52    encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
53    sync_service::SyncService,
54};
55
56/// What kind of process setup do we have for this notification client?
57#[derive(Clone)]
58pub enum NotificationProcessSetup {
59    /// The notification client may run on a separate process than the rest of
60    /// the app.
61    ///
62    /// For instance, this is the case on iOS, where notifications are handled
63    /// in a separate process (the Notification Service Extension, aka NSE).
64    ///
65    /// In that case, a cross-process lock will be used to coordinate writes
66    /// into the stores handled by the SDK.
67    MultipleProcesses,
68
69    /// The notification client runs in the same process as the rest of the
70    /// `Client` performing syncs.
71    ///
72    /// For instance, this is the case on Android, where a notification will
73    /// wake up the main app process.
74    ///
75    /// In that case, a smart reference to the [`SyncService`] must be provided.
76    SingleProcess { sync_service: Arc<SyncService> },
77}
78
79/// A client specialized for handling push notifications received over the
80/// network, for an app.
81///
82/// In particular, it takes care of running a full decryption sync, in case the
83/// event in the notification was impossible to decrypt beforehand.
84pub struct NotificationClient {
85    /// SDK client that uses an in-memory state store.
86    client: Client,
87
88    /// SDK client that uses the same state store as the caller's context.
89    parent_client: Client,
90
91    /// Is the notification client running on its own process or not?
92    process_setup: NotificationProcessSetup,
93
94    /// A mutex to serialize requests to the notifications sliding sync.
95    ///
96    /// If several notifications come in at the same time (e.g. network was
97    /// unreachable because of airplane mode or something similar), then we
98    /// need to make sure that repeated calls to `get_notification` won't
99    /// cause multiple requests with the same `conn_id` we're using for
100    /// notifications. This mutex solves this by sequentializing the requests.
101    notification_sync_mutex: AsyncMutex<()>,
102
103    /// A mutex to serialize requests to the encryption sliding sync that's used
104    /// in case we didn't have the keys to decipher an event.
105    ///
106    /// Same reasoning as [`Self::notification_sync_mutex`].
107    encryption_sync_mutex: AsyncMutex<()>,
108}
109
110impl NotificationClient {
111    const CONNECTION_ID: &'static str = "notifications";
112    const LOCK_ID: &'static str = "notifications";
113
114    /// Create a new notification client.
115    pub async fn new(
116        parent_client: Client,
117        process_setup: NotificationProcessSetup,
118    ) -> Result<Self, Error> {
119        let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
120
121        Ok(NotificationClient {
122            client,
123            parent_client,
124            notification_sync_mutex: AsyncMutex::new(()),
125            encryption_sync_mutex: AsyncMutex::new(()),
126            process_setup,
127        })
128    }
129
130    /// Fetches a room by its ID using the in-memory state store backed client.
131    /// Useful to retrieve room information after running the limited
132    /// notification client sliding sync loop.
133    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
134        self.client.get_room(room_id)
135    }
136
137    /// Fetches the content of a notification.
138    ///
139    /// This will first try to get the notification using a short-lived sliding
140    /// sync, and if the sliding-sync can't find the event, then it'll use a
141    /// `/context` query to find the event with associated member information.
142    ///
143    /// An error result means that we couldn't resolve the notification; in that
144    /// case, a dummy notification may be displayed instead.
145    #[instrument(skip(self))]
146    pub async fn get_notification(
147        &self,
148        room_id: &RoomId,
149        event_id: &EventId,
150    ) -> Result<NotificationStatus, Error> {
151        let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
152        match status {
153            NotificationStatus::Event(..) | NotificationStatus::EventFilteredOut => Ok(status),
154            NotificationStatus::EventNotFound => {
155                self.get_notification_with_context(room_id, event_id).await
156            }
157        }
158    }
159
160    /// Fetches the content of several notifications.
161    ///
162    /// This will first try to get the notifications using a short-lived sliding
163    /// sync, and if the sliding-sync can't find the events, then it'll use a
164    /// `/context` query to find the events with associated member information.
165    ///
166    /// An error result at the top level means that something failed when trying
167    /// to set up the notification fetching.
168    ///
169    /// For each notification item you can also receive an error, which means
170    /// something failed when trying to fetch that particular notification
171    /// (decryption, fetching push actions, etc.); in that case, a dummy
172    /// notification may be displayed instead.
173    pub async fn get_notifications(
174        &self,
175        requests: &[NotificationItemsRequest],
176    ) -> Result<BatchNotificationFetchingResult, Error> {
177        let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
178
179        for request in requests {
180            for event_id in &request.event_ids {
181                match notifications.get_mut(event_id) {
182                    // If the notification for a given event wasn't found with sliding sync, try
183                    // with a /context for each event.
184                    Some(Ok(NotificationStatus::EventNotFound)) | None => {
185                        notifications.insert(
186                            event_id.to_owned(),
187                            self.get_notification_with_context(&request.room_id, event_id).await,
188                        );
189                    }
190
191                    _ => {}
192                }
193            }
194        }
195
196        Ok(notifications)
197    }
198
199    /// Run an encryption sync loop, in case an event is still encrypted.
200    ///
201    /// Will return `Ok(Some)` if and only if:
202    /// - the event was encrypted,
203    /// - we successfully ran an encryption sync or waited long enough for an
204    ///   existing encryption sync to decrypt the event.
205    ///
206    /// Otherwise, if the event was not encrypted, or couldn't be decrypted
207    /// (without causing a fatal error), will return `Ok(None)`.
208    #[instrument(skip_all)]
209    async fn retry_decryption(
210        &self,
211        room: &Room,
212        raw_event: &Raw<AnySyncTimelineEvent>,
213    ) -> Result<Option<TimelineEvent>, Error> {
214        let event: AnySyncTimelineEvent =
215            raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
216
217        if !is_event_encrypted(event.event_type()) {
218            return Ok(None);
219        }
220
221        // Serialize calls to this function.
222        let _guard = self.encryption_sync_mutex.lock().await;
223
224        // The message is still encrypted, and the client is configured to retry
225        // decryption.
226        //
227        // Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop:
228        // - the first iteration allows to get SS events as well as send e2ee requests.
229        // - the second one let the SS homeserver forward events triggered by the
230        //   sending of e2ee requests.
231        //
232        // Keep timeouts small for both, since we might be short on time.
233
234        let with_locking = WithLocking::from(matches!(
235            self.process_setup,
236            NotificationProcessSetup::MultipleProcesses
237        ));
238
239        let push_ctx = room.push_context().await?;
240        let sync_permit_guard = match &self.process_setup {
241            NotificationProcessSetup::MultipleProcesses => {
242                // We're running on our own process, dedicated for notifications. In that case,
243                // create a dummy sync permit; we're guaranteed there's at most one since we've
244                // acquired the `encryption_sync_mutex' lock here.
245                let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
246                sync_permit.lock_owned().await
247            }
248
249            NotificationProcessSetup::SingleProcess { sync_service } => {
250                if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
251                    permit_guard
252                } else {
253                    // There's already a sync service active, thus the encryption sync is already
254                    // running elsewhere. As a matter of fact, if the event was encrypted, that
255                    // means we were racing against the encryption sync. Wait a bit, attempt to
256                    // decrypt, and carry on.
257
258                    // We repeat the sleep 3 times at most, each iteration we
259                    // double the amount of time waited, so overall we may wait up to 7 times this
260                    // amount.
261                    let mut wait = 200;
262
263                    debug!("Encryption sync running in background");
264                    for _ in 0..3 {
265                        trace!("waiting for decryption…");
266
267                        sleep(Duration::from_millis(wait)).await;
268
269                        // Note: We specify the cast type in case the
270                        // `experimental-encrypted-state-events` feature is enabled, which provides
271                        // multiple cast implementations.
272                        let new_event = room
273                            .decrypt_event(
274                                raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
275                                push_ctx.as_ref(),
276                            )
277                            .await?;
278
279                        match new_event.kind {
280                            matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
281                                utd_info, ..} => {
282                                if utd_info.reason.is_missing_room_key() {
283                                    // Decryption error that could be caused by a missing room
284                                    // key; retry in a few.
285                                    wait *= 2;
286                                } else {
287                                    debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
288                                    return Ok(None);
289                                }
290                            }
291                            _ => {
292                                trace!("Waiting succeeded and event could be decrypted!");
293                                return Ok(Some(new_event));
294                            }
295                        }
296                    }
297
298                    // We couldn't decrypt the event after waiting a few times, abort.
299                    debug!("Timeout waiting for the encryption sync to decrypt notification.");
300                    return Ok(None);
301                }
302            }
303        };
304
305        let encryption_sync = EncryptionSyncService::new(
306            self.client.clone(),
307            Some((Duration::from_secs(3), Duration::from_secs(4))),
308            with_locking,
309        )
310        .await;
311
312        // Just log out errors, but don't have them abort the notification processing:
313        // an undecrypted notification is still better than no
314        // notifications.
315
316        match encryption_sync {
317            Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
318                // Note: We specify the cast type in case the
319                // `experimental-encrypted-state-events` feature is enabled, which provides
320                // multiple cast implementations.
321                Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
322                    Ok(new_event) => match new_event.kind {
323                        matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
324                            utd_info, ..
325                        } => {
326                            trace!(
327                                "Encryption sync failed to decrypt the event: {:?}",
328                                utd_info.reason
329                            );
330                            Ok(None)
331                        }
332                        _ => {
333                            trace!("Encryption sync managed to decrypt the event.");
334                            Ok(Some(new_event))
335                        }
336                    },
337                    Err(err) => {
338                        trace!("Encryption sync failed to decrypt the event: {err}");
339                        Ok(None)
340                    }
341                },
342                Err(err) => {
343                    warn!("Encryption sync error: {err:#}");
344                    Ok(None)
345                }
346            },
347            Err(err) => {
348                warn!("Encryption sync build error: {err:#}",);
349                Ok(None)
350            }
351        }
352    }
353
354    /// Try to run a sliding sync (without encryption) to retrieve the events
355    /// from the notification.
356    ///
357    /// An event can either be:
358    /// - an invite event,
359    /// - or a non-invite event.
360    ///
361    /// In case it's a non-invite event, it's rather easy: we'll request
362    /// explicit state that'll be useful for building the
363    /// `NotificationItem`, and subscribe to the room which the notification
364    /// relates to.
365    ///
366    /// In case it's an invite-event, it's trickier because the stripped event
367    /// may not contain the event id, so we can't just match on it. Rather,
368    /// we look at stripped room member events that may be fitting (i.e.
369    /// match the current user and are invites), and if the SDK concludes the
370    /// room was in the invited state, and we didn't find the event by id,
371    /// *then* we'll use that stripped room member event.
372    #[instrument(skip_all)]
373    async fn try_sliding_sync(
374        &self,
375        requests: &[NotificationItemsRequest],
376    ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
377        // Serialize all the calls to this method by taking a lock at the beginning,
378        // that will be dropped later.
379        let _guard = self.notification_sync_mutex.lock().await;
380
381        // Set up a sliding sync that only subscribes to the room that had the
382        // notification, so we can figure out the full event and associated
383        // information.
384
385        let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
386
387        let handler_raw_notification = raw_notifications.clone();
388
389        let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
390
391        let timeline_event_handler = self.client.add_event_handler({
392            let requests = requests.clone();
393            move |raw: Raw<AnySyncTimelineEvent>| async move {
394                match &raw.get_field::<OwnedEventId>("event_id") {
395                    Ok(Some(event_id)) => {
396                        let Some(request) =
397                            &requests.iter().find(|request| request.event_ids.contains(event_id))
398                        else {
399                            return;
400                        };
401
402                        let room_id = request.room_id.clone();
403
404                        // found it! There shouldn't be a previous event before, but if
405                        // there is, that should be ok to
406                        // just replace it.
407                        handler_raw_notification.lock().unwrap().insert(
408                            event_id.to_owned(),
409                            (room_id, Some(RawNotificationEvent::Timeline(raw))),
410                        );
411                    }
412                    Ok(None) => {
413                        warn!("a sync event had no event id");
414                    }
415                    Err(err) => {
416                        warn!("failed to deserialize sync event id: {err}");
417                    }
418                }
419            }
420        });
421
422        // We'll only use this event if the room is in the invited state.
423        let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
424
425        let user_id = self.client.user_id().unwrap().to_owned();
426        let handler_raw_invites = raw_invites.clone();
427        let handler_raw_notifications = raw_notifications.clone();
428        let stripped_member_handler = self.client.add_event_handler({
429            let requests = requests.clone();
430            move |raw: Raw<StrippedRoomMemberEvent>| async move {
431                let deserialized = match raw.deserialize() {
432                    Ok(d) => d,
433                    Err(err) => {
434                        warn!("failed to deserialize raw stripped room member event: {err}");
435                        return;
436                    }
437                };
438
439                trace!("received a stripped room member event");
440
441                // Try to match the event by event_id, as it's the most precise. In theory, we
442                // shouldn't receive it, so that's a first attempt.
443                match &raw.get_field::<OwnedEventId>("event_id") {
444                    Ok(Some(event_id)) => {
445                        let request =
446                            &requests.iter().find(|request| request.event_ids.contains(event_id));
447                        if request.is_none() {
448                            return;
449                        }
450                        let room_id = request.unwrap().room_id.clone();
451
452                        // found it! There shouldn't be a previous event before, but if
453                        // there is, that should be ok to
454                        // just replace it.
455                        handler_raw_notifications.lock().unwrap().insert(
456                            event_id.to_owned(),
457                            (room_id, Some(RawNotificationEvent::Invite(raw))),
458                        );
459                        return;
460                    }
461                    Ok(None) => {
462                        warn!("a room member event had no id");
463                    }
464                    Err(err) => {
465                        warn!("failed to deserialize room member event id: {err}");
466                    }
467                }
468
469                // Try to match the event by membership and state_key for the current user.
470                if deserialized.content.membership == MembershipState::Invite
471                    && deserialized.state_key == user_id
472                {
473                    trace!("found an invite event for the current user");
474                    // This could be it! There might be several of these following each other, so
475                    // assume it's the latest one (in sync ordering), and override a previous one if
476                    // present.
477                    handler_raw_invites
478                        .lock()
479                        .unwrap()
480                        .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
481                } else {
482                    trace!("not an invite event, or not for the current user");
483                }
484            }
485        });
486
487        // Room power levels are necessary to build the push context.
488        let required_state = vec![
489            (StateEventType::RoomEncryption, "".to_owned()),
490            (StateEventType::RoomMember, "$LAZY".to_owned()),
491            (StateEventType::RoomMember, "$ME".to_owned()),
492            (StateEventType::RoomCanonicalAlias, "".to_owned()),
493            (StateEventType::RoomName, "".to_owned()),
494            (StateEventType::RoomAvatar, "".to_owned()),
495            (StateEventType::RoomPowerLevels, "".to_owned()),
496            (StateEventType::RoomJoinRules, "".to_owned()),
497            (StateEventType::CallMember, "*".to_owned()),
498            (StateEventType::RoomCreate, "".to_owned()),
499            (StateEventType::MemberHints, "".to_owned()),
500        ];
501
502        let invites = SlidingSyncList::builder("invites")
503            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
504            .timeline_limit(8)
505            .required_state(required_state.clone())
506            .filters(Some(assign!(http::request::ListFilters::default(), {
507                is_invite: Some(true),
508            })));
509
510        let sync = self
511            .client
512            .sliding_sync(Self::CONNECTION_ID)?
513            .poll_timeout(Duration::from_secs(1))
514            .network_timeout(Duration::from_secs(3))
515            .with_account_data_extension(
516                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
517            )
518            .add_list(invites)
519            .build()
520            .await?;
521
522        let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
523        sync.subscribe_to_rooms(
524            &room_ids,
525            Some(assign!(http::request::RoomSubscription::default(), {
526                required_state,
527                timeline_limit: uint!(16)
528            })),
529            true,
530        );
531
532        let mut remaining_attempts = 3;
533
534        let stream = sync.sync();
535        pin_mut!(stream);
536
537        // Sum the expected event count for each room
538        let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
539
540        loop {
541            if stream.next().await.is_none() {
542                // Sliding sync aborted early.
543                break;
544            }
545
546            if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
547                == expected_event_count
548            {
549                // We got the events.
550                break;
551            }
552
553            remaining_attempts -= 1;
554            if remaining_attempts == 0 {
555                // We're out of luck.
556                break;
557            }
558        }
559
560        self.client.remove_event_handler(stripped_member_handler);
561        self.client.remove_event_handler(timeline_event_handler);
562
563        let mut notifications = raw_notifications.clone().lock().unwrap().clone();
564        let mut missing_event_ids = Vec::new();
565
566        // Create the list of missing event ids after the syncs.
567        for request in requests.iter() {
568            for event_id in &request.event_ids {
569                if !notifications.contains_key(event_id) {
570                    missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
571                }
572            }
573        }
574
575        // Try checking if the missing notifications could be invites.
576        for (room_id, missing_event_id) in missing_event_ids {
577            trace!("we didn't have a non-invite event, looking for invited room now");
578            if let Some(room) = self.client.get_room(&room_id) {
579                if room.state() == RoomState::Invited {
580                    if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
581                        notifications.insert(
582                            missing_event_id.to_owned(),
583                            (room_id.to_owned(), stripped_event),
584                        );
585                    }
586                } else {
587                    debug!("the room isn't in the invited state");
588                }
589            } else {
590                warn!(%room_id, "unknown room, can't check for invite events");
591            }
592        }
593
594        let found = if notifications.len() == expected_event_count { "" } else { "not " };
595        trace!("all notification events have{found} been found");
596
597        Ok(notifications)
598    }
599
600    pub async fn get_notification_with_sliding_sync(
601        &self,
602        room_id: &RoomId,
603        event_id: &EventId,
604    ) -> Result<NotificationStatus, Error> {
605        info!("fetching notification event with a sliding sync");
606
607        let request = NotificationItemsRequest {
608            room_id: room_id.to_owned(),
609            event_ids: vec![event_id.to_owned()],
610        };
611
612        let mut get_notifications_result =
613            self.get_notifications_with_sliding_sync(&[request]).await?;
614
615        get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
616    }
617
618    /// Given a (decrypted or not) event, figure out whether it should be
619    /// filtered out for other client-side reasons (such as the sender being
620    /// ignored, for instance), and returns the corresponding
621    /// [`NotificationStatus`].
622    async fn compute_status(
623        &self,
624        room: &Room,
625        push_actions: Option<&[Action]>,
626        raw_event: RawNotificationEvent,
627        state_events: Vec<Raw<AnyStateEvent>>,
628    ) -> Result<NotificationStatus, Error> {
629        if let Some(actions) = push_actions
630            && !actions.iter().any(|a| a.should_notify())
631        {
632            // The event shouldn't notify: return early.
633            return Ok(NotificationStatus::EventFilteredOut);
634        }
635
636        let notification_item =
637            NotificationItem::new(room, raw_event, push_actions, state_events).await?;
638
639        if self.client.is_user_ignored(notification_item.event.sender()).await {
640            Ok(NotificationStatus::EventFilteredOut)
641        } else {
642            Ok(NotificationStatus::Event(Box::new(notification_item)))
643        }
644    }
645
646    /// Get a list of full notifications, given a room id and event ids.
647    ///
648    /// This will run a small sliding sync to retrieve the content of the
649    /// events, along with extra data to form a rich notification context.
650    pub async fn get_notifications_with_sliding_sync(
651        &self,
652        requests: &[NotificationItemsRequest],
653    ) -> Result<BatchNotificationFetchingResult, Error> {
654        let raw_events = self.try_sliding_sync(requests).await?;
655
656        let mut batch_result = BatchNotificationFetchingResult::new();
657
658        for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
659            // At this point it should have been added by the sync, if it's not, give up.
660            let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
661
662            let Some(raw_event) = raw_event else {
663                // The event was not found, so we can't build a notification.
664                batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
665                continue;
666            };
667
668            let (raw_event, push_actions) = match &raw_event {
669                RawNotificationEvent::Timeline(timeline_event) => {
670                    // Timeline events may be encrypted, so make sure they get decrypted first.
671                    match self.retry_decryption(&room, timeline_event).await {
672                        Ok(Some(timeline_event)) => {
673                            let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
674                            (
675                                RawNotificationEvent::Timeline(timeline_event.into_raw()),
676                                push_actions,
677                            )
678                        }
679
680                        Ok(None) => {
681                            // The event was either not encrypted in the first place, or we
682                            // couldn't decrypt it after retrying. Use the raw event as is.
683                            match room.event_push_actions(timeline_event).await {
684                                Ok(push_actions) => (raw_event.clone(), push_actions),
685                                Err(err) => {
686                                    // Could not get push actions.
687                                    batch_result.insert(event_id, Err(err.into()));
688                                    continue;
689                                }
690                            }
691                        }
692
693                        Err(err) => {
694                            batch_result.insert(event_id, Err(err));
695                            continue;
696                        }
697                    }
698                }
699
700                RawNotificationEvent::Invite(invite_event) => {
701                    // Invite events can't be encrypted, so they should be in clear text.
702                    match room.event_push_actions(invite_event).await {
703                        Ok(push_actions) => {
704                            (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
705                        }
706                        Err(err) => {
707                            batch_result.insert(event_id, Err(err.into()));
708                            continue;
709                        }
710                    }
711                }
712            };
713
714            let notification_status_result =
715                self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
716
717            batch_result.insert(event_id, notification_status_result);
718        }
719
720        Ok(batch_result)
721    }
722
723    /// Retrieve a notification using a `/context` query.
724    ///
725    /// This is for clients that are already running other sliding syncs in the
726    /// same process, so that most of the contextual information for the
727    /// notification should already be there. In particular, the room containing
728    /// the event MUST be known (via a sliding sync for invites, or another
729    /// sliding sync).
730    ///
731    /// An error result means that we couldn't resolve the notification; in that
732    /// case, a dummy notification may be displayed instead. A `None` result
733    /// means the notification has been filtered out by the user's push
734    /// rules.
735    pub async fn get_notification_with_context(
736        &self,
737        room_id: &RoomId,
738        event_id: &EventId,
739    ) -> Result<NotificationStatus, Error> {
740        info!("fetching notification event with a /context query");
741
742        // See above comment.
743        let Some(room) = self.parent_client.get_room(room_id) else {
744            return Err(Error::UnknownRoom);
745        };
746
747        let response = room.event_with_context(event_id, true, uint!(0), None).await?;
748
749        let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
750        let state_events = response.state;
751
752        if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
753            timeline_event = decrypted_event;
754        }
755
756        let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
757
758        self.compute_status(
759            &room,
760            push_actions.as_deref(),
761            RawNotificationEvent::Timeline(timeline_event.into_raw()),
762            state_events,
763        )
764        .await
765    }
766}
767
768fn is_event_encrypted(event_type: TimelineEventType) -> bool {
769    let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
770
771    #[cfg(feature = "unstable-msc3956")]
772    let is_still_encrypted =
773        is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
774
775    is_still_encrypted
776}
777
778#[derive(Debug)]
779pub enum NotificationStatus {
780    /// The event has been found and was not filtered out.
781    Event(Box<NotificationItem>),
782    /// The event couldn't be found in the network queries used to find it.
783    EventNotFound,
784    /// The event has been filtered out, either because of the user's push
785    /// rules, or because the user which triggered it is ignored by the
786    /// current user.
787    EventFilteredOut,
788}
789
790#[derive(Debug, Clone)]
791pub struct NotificationItemsRequest {
792    pub room_id: OwnedRoomId,
793    pub event_ids: Vec<OwnedEventId>,
794}
795
796type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
797
798/// The Notification event as it was fetched from remote for the
799/// given `event_id`, represented as Raw but decrypted, thus only
800/// whether it is an invite or regular Timeline event has been
801/// determined.
802#[derive(Debug, Clone)]
803pub enum RawNotificationEvent {
804    /// The raw event for a timeline event
805    Timeline(Raw<AnySyncTimelineEvent>),
806    /// The notification contains an invitation with the given
807    /// StrippedRoomMemberEvent (in raw here)
808    Invite(Raw<StrippedRoomMemberEvent>),
809}
810
811/// The deserialized Event as it was fetched from remote for the
812/// given `event_id` and after decryption (if possible).
813#[derive(Debug)]
814pub enum NotificationEvent {
815    /// The Notification was for a TimelineEvent
816    Timeline(Box<AnySyncTimelineEvent>),
817    /// The Notification is an invite with the given stripped room event data
818    Invite(Box<StrippedRoomMemberEvent>),
819}
820
821impl NotificationEvent {
822    pub fn sender(&self) -> &UserId {
823        match self {
824            NotificationEvent::Timeline(ev) => ev.sender(),
825            NotificationEvent::Invite(ev) => &ev.sender,
826        }
827    }
828
829    /// Returns the root event id of the thread the notification event is in, if
830    /// any.
831    fn thread_id(&self) -> Option<OwnedEventId> {
832        let NotificationEvent::Timeline(sync_timeline_event) = &self else {
833            return None;
834        };
835        let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
836            return None;
837        };
838        let content = event.original_content()?;
839        match content {
840            AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
841                Relation::Thread(thread) => Some(thread.event_id),
842                _ => None,
843            },
844            _ => None,
845        }
846    }
847}
848
849/// A notification with its full content.
850#[derive(Debug)]
851pub struct NotificationItem {
852    /// Underlying Ruma event.
853    pub event: NotificationEvent,
854
855    /// The raw of the underlying event.
856    pub raw_event: RawNotificationEvent,
857
858    /// Display name of the sender.
859    pub sender_display_name: Option<String>,
860    /// Avatar URL of the sender.
861    pub sender_avatar_url: Option<String>,
862    /// Is the sender's name ambiguous?
863    pub is_sender_name_ambiguous: bool,
864
865    /// Room computed display name.
866    pub room_computed_display_name: String,
867    /// Room avatar URL.
868    pub room_avatar_url: Option<String>,
869    /// Room canonical alias.
870    pub room_canonical_alias: Option<String>,
871    /// Room topic.
872    pub room_topic: Option<String>,
873    /// Room join rule.
874    ///
875    /// Set to `None` if the join rule for this room is not available.
876    pub room_join_rule: Option<JoinRule>,
877    /// Is this room encrypted?
878    pub is_room_encrypted: Option<bool>,
879    /// Is this room considered a direct message?
880    pub is_direct_message_room: bool,
881    /// Numbers of members who joined the room.
882    pub joined_members_count: u64,
883    /// Is the room a space?
884    pub is_space: bool,
885
886    /// Is it a noisy notification? (i.e. does any push action contain a sound
887    /// action)
888    ///
889    /// It is set if and only if the push actions could be determined.
890    pub is_noisy: Option<bool>,
891    pub has_mention: Option<bool>,
892    pub thread_id: Option<OwnedEventId>,
893
894    /// The push actions for this notification (notify, sound, highlight, etc.).
895    pub actions: Option<Vec<Action>>,
896}
897
898impl NotificationItem {
899    async fn new(
900        room: &Room,
901        raw_event: RawNotificationEvent,
902        push_actions: Option<&[Action]>,
903        state_events: Vec<Raw<AnyStateEvent>>,
904    ) -> Result<Self, Error> {
905        let event = match &raw_event {
906            RawNotificationEvent::Timeline(raw_event) => {
907                let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
908                if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
909                    SyncRoomMessageEvent::Original(ev),
910                )) = &mut event
911                {
912                    ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
913                }
914                NotificationEvent::Timeline(Box::new(event))
915            }
916            RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
917                raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
918            )),
919        };
920
921        let sender = match room.state() {
922            RoomState::Invited => room.invite_details().await?.inviter,
923            _ => room.get_member_no_sync(event.sender()).await?,
924        };
925
926        let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
927            match &sender {
928                Some(sender) => (
929                    sender.display_name().map(|s| s.to_owned()),
930                    sender.avatar_url().map(|s| s.to_string()),
931                    sender.name_ambiguous(),
932                ),
933                None => (None, None, false),
934            };
935
936        if sender_display_name.is_none() || sender_avatar_url.is_none() {
937            let sender_id = event.sender();
938            for ev in state_events {
939                let ev = match ev.deserialize() {
940                    Ok(ev) => ev,
941                    Err(err) => {
942                        warn!("Failed to deserialize a state event: {err}");
943                        continue;
944                    }
945                };
946                if ev.sender() != sender_id {
947                    continue;
948                }
949                if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
950                    content,
951                    ..
952                }) = ev.content()
953                {
954                    if sender_display_name.is_none() {
955                        sender_display_name = content.displayname;
956                    }
957                    if sender_avatar_url.is_none() {
958                        sender_avatar_url = content.avatar_url.map(|url| url.to_string());
959                    }
960                }
961            }
962        }
963
964        let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
965        let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
966        let thread_id = event.thread_id().clone();
967
968        let item = NotificationItem {
969            event,
970            raw_event,
971            sender_display_name,
972            sender_avatar_url,
973            is_sender_name_ambiguous,
974            room_computed_display_name: room.display_name().await?.to_string(),
975            room_avatar_url: room.avatar_url().map(|s| s.to_string()),
976            room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
977            room_topic: room.topic(),
978            room_join_rule: room.join_rule(),
979            is_direct_message_room: room.is_direct().await?,
980            is_room_encrypted: room
981                .latest_encryption_state()
982                .await
983                .map(|state| state.is_encrypted())
984                .ok(),
985            joined_members_count: room.joined_members_count(),
986            is_space: room.is_space(),
987            is_noisy,
988            has_mention,
989            thread_id,
990            actions: push_actions.map(|actions| actions.to_vec()),
991        };
992
993        Ok(item)
994    }
995
996    /// Returns whether this room is public or not, based on the join rule.
997    ///
998    /// Maybe return `None` if the join rule is not available.
999    pub fn is_public(&self) -> Option<bool> {
1000        self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
1001    }
1002}
1003
1004/// An error for the [`NotificationClient`].
1005#[derive(Debug, Error)]
1006pub enum Error {
1007    #[error(transparent)]
1008    BuildingLocalClient(ClientBuildError),
1009
1010    /// The room associated to this event wasn't found.
1011    #[error("unknown room for a notification")]
1012    UnknownRoom,
1013
1014    /// The Ruma event contained within this notification couldn't be parsed.
1015    #[error("invalid ruma event")]
1016    InvalidRumaEvent,
1017
1018    /// When calling `get_notification_with_sliding_sync`, the room was missing
1019    /// in the response.
1020    #[error("the sliding sync response doesn't include the target room")]
1021    SlidingSyncEmptyRoom,
1022
1023    #[error("the event was missing in the `/context` query")]
1024    ContextMissingEvent,
1025
1026    /// An error forwarded from the client.
1027    #[error(transparent)]
1028    SdkError(#[from] matrix_sdk::Error),
1029
1030    /// An error forwarded from the underlying state store.
1031    #[error(transparent)]
1032    StoreError(#[from] StoreError),
1033}
1034
1035#[cfg(test)]
1036mod tests {
1037    use assert_matches2::assert_let;
1038    use matrix_sdk::test_utils::mocks::MatrixMockServer;
1039    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1040    use ruma::{event_id, room_id, user_id};
1041
1042    use crate::notification_client::{NotificationItem, RawNotificationEvent};
1043
1044    #[async_test]
1045    async fn test_notification_item_returns_thread_id() {
1046        let server = MatrixMockServer::new().await;
1047        let client = server.client_builder().build().await;
1048
1049        let room_id = room_id!("!a:b.c");
1050        let thread_root_event_id = event_id!("$root:b.c");
1051        let message = EventFactory::new()
1052            .room(room_id)
1053            .sender(user_id!("@sender:b.c"))
1054            .text_msg("Threaded")
1055            .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1056            .into_raw_sync();
1057        let room = server.sync_joined_room(&client, room_id).await;
1058
1059        let raw_notification_event = RawNotificationEvent::Timeline(message);
1060        let notification_item =
1061            NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1062                .await
1063                .expect("Could not create notification item");
1064
1065        assert_let!(Some(thread_id) = notification_item.thread_id);
1066        assert_eq!(thread_id, thread_root_event_id);
1067    }
1068}