matrix_sdk_ui/
notification_client.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::BTreeMap,
17    sync::{Arc, Mutex},
18    time::Duration,
19};
20
21use futures_util::{StreamExt as _, pin_mut};
22use matrix_sdk::{
23    Client, ClientBuildError, SlidingSyncList, SlidingSyncMode, room::Room, sleep::sleep,
24};
25use matrix_sdk_base::{RoomState, StoreError, deserialized_responses::TimelineEvent};
26use ruma::{
27    EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
28    api::client::sync::sync_events::v5 as http,
29    assign,
30    events::{
31        AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
32        AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
33        TimelineEventType,
34        room::{
35            encrypted::OriginalSyncRoomEncryptedEvent,
36            join_rules::JoinRule,
37            member::{MembershipState, StrippedRoomMemberEvent},
38            message::{Relation, SyncRoomMessageEvent},
39        },
40    },
41    html::RemoveReplyFallback,
42    push::Action,
43    serde::Raw,
44    uint,
45};
46use thiserror::Error;
47use tokio::sync::Mutex as AsyncMutex;
48use tracing::{debug, info, instrument, trace, warn};
49
50use crate::{
51    DEFAULT_SANITIZER_MODE,
52    encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
53    sync_service::SyncService,
54};
55
56/// What kind of process setup do we have for this notification client?
57#[derive(Clone)]
58pub enum NotificationProcessSetup {
59    /// The notification client may run on a separate process than the rest of
60    /// the app.
61    ///
62    /// For instance, this is the case on iOS, where notifications are handled
63    /// in a separate process (the Notification Service Extension, aka NSE).
64    ///
65    /// In that case, a cross-process lock will be used to coordinate writes
66    /// into the stores handled by the SDK.
67    MultipleProcesses,
68
69    /// The notification client runs in the same process as the rest of the
70    /// `Client` performing syncs.
71    ///
72    /// For instance, this is the case on Android, where a notification will
73    /// wake up the main app process.
74    ///
75    /// In that case, a smart reference to the [`SyncService`] must be provided.
76    SingleProcess { sync_service: Arc<SyncService> },
77}
78
79/// A client specialized for handling push notifications received over the
80/// network, for an app.
81///
82/// In particular, it takes care of running a full decryption sync, in case the
83/// event in the notification was impossible to decrypt beforehand.
84pub struct NotificationClient {
85    /// SDK client that uses an in-memory state store.
86    client: Client,
87
88    /// SDK client that uses the same state store as the caller's context.
89    parent_client: Client,
90
91    /// Is the notification client running on its own process or not?
92    process_setup: NotificationProcessSetup,
93
94    /// A mutex to serialize requests to the notifications sliding sync.
95    ///
96    /// If several notifications come in at the same time (e.g. network was
97    /// unreachable because of airplane mode or something similar), then we
98    /// need to make sure that repeated calls to `get_notification` won't
99    /// cause multiple requests with the same `conn_id` we're using for
100    /// notifications. This mutex solves this by sequentializing the requests.
101    notification_sync_mutex: AsyncMutex<()>,
102
103    /// A mutex to serialize requests to the encryption sliding sync that's used
104    /// in case we didn't have the keys to decipher an event.
105    ///
106    /// Same reasoning as [`Self::notification_sync_mutex`].
107    encryption_sync_mutex: AsyncMutex<()>,
108}
109
110impl NotificationClient {
111    const CONNECTION_ID: &'static str = "notifications";
112    const LOCK_ID: &'static str = "notifications";
113
114    /// Create a new notification client.
115    pub async fn new(
116        parent_client: Client,
117        process_setup: NotificationProcessSetup,
118    ) -> Result<Self, Error> {
119        let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
120
121        Ok(NotificationClient {
122            client,
123            parent_client,
124            notification_sync_mutex: AsyncMutex::new(()),
125            encryption_sync_mutex: AsyncMutex::new(()),
126            process_setup,
127        })
128    }
129
130    /// Fetches a room by its ID using the in-memory state store backed client.
131    /// Useful to retrieve room information after running the limited
132    /// notification client sliding sync loop.
133    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
134        self.client.get_room(room_id)
135    }
136
137    /// Fetches the content of a notification.
138    ///
139    /// This will first try to get the notification using a short-lived sliding
140    /// sync, and if the sliding-sync can't find the event, then it'll use a
141    /// `/context` query to find the event with associated member information.
142    ///
143    /// An error result means that we couldn't resolve the notification; in that
144    /// case, a dummy notification may be displayed instead.
145    #[instrument(skip(self))]
146    pub async fn get_notification(
147        &self,
148        room_id: &RoomId,
149        event_id: &EventId,
150    ) -> Result<NotificationStatus, Error> {
151        let status = self.get_notification_with_sliding_sync(room_id, event_id).await?;
152        match status {
153            NotificationStatus::Event(..) | NotificationStatus::EventFilteredOut => Ok(status),
154            NotificationStatus::EventNotFound => {
155                self.get_notification_with_context(room_id, event_id).await
156            }
157        }
158    }
159
160    /// Fetches the content of several notifications.
161    ///
162    /// This will first try to get the notifications using a short-lived sliding
163    /// sync, and if the sliding-sync can't find the events, then it'll use a
164    /// `/context` query to find the events with associated member information.
165    ///
166    /// An error result at the top level means that something failed when trying
167    /// to set up the notification fetching.
168    ///
169    /// For each notification item you can also receive an error, which means
170    /// something failed when trying to fetch that particular notification
171    /// (decryption, fetching push actions, etc.); in that case, a dummy
172    /// notification may be displayed instead.
173    pub async fn get_notifications(
174        &self,
175        requests: &[NotificationItemsRequest],
176    ) -> Result<BatchNotificationFetchingResult, Error> {
177        let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
178
179        for request in requests {
180            for event_id in &request.event_ids {
181                match notifications.get_mut(event_id) {
182                    // If the notification for a given event wasn't found with sliding sync, try
183                    // with a /context for each event.
184                    Some(Ok(NotificationStatus::EventNotFound)) | None => {
185                        notifications.insert(
186                            event_id.to_owned(),
187                            self.get_notification_with_context(&request.room_id, event_id).await,
188                        );
189                    }
190
191                    _ => {}
192                }
193            }
194        }
195
196        Ok(notifications)
197    }
198
199    /// Run an encryption sync loop, in case an event is still encrypted.
200    ///
201    /// Will return `Ok(Some)` if and only if:
202    /// - the event was encrypted,
203    /// - we successfully ran an encryption sync or waited long enough for an
204    ///   existing encryption sync to decrypt the event.
205    ///
206    /// Otherwise, if the event was not encrypted, or couldn't be decrypted
207    /// (without causing a fatal error), will return `Ok(None)`.
208    #[instrument(skip_all)]
209    async fn retry_decryption(
210        &self,
211        room: &Room,
212        raw_event: &Raw<AnySyncTimelineEvent>,
213    ) -> Result<Option<TimelineEvent>, Error> {
214        let event: AnySyncTimelineEvent =
215            raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
216
217        if !is_event_encrypted(event.event_type()) {
218            return Ok(None);
219        }
220
221        // Serialize calls to this function.
222        let _guard = self.encryption_sync_mutex.lock().await;
223
224        // The message is still encrypted, and the client is configured to retry
225        // decryption.
226        //
227        // Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop:
228        // - the first iteration allows to get SS events as well as send e2ee requests.
229        // - the second one let the SS homeserver forward events triggered by the
230        //   sending of e2ee requests.
231        //
232        // Keep timeouts small for both, since we might be short on time.
233
234        let with_locking = WithLocking::from(matches!(
235            self.process_setup,
236            NotificationProcessSetup::MultipleProcesses
237        ));
238
239        let push_ctx = room.push_context().await?;
240        let sync_permit_guard = match &self.process_setup {
241            NotificationProcessSetup::MultipleProcesses => {
242                // We're running on our own process, dedicated for notifications. In that case,
243                // create a dummy sync permit; we're guaranteed there's at most one since we've
244                // acquired the `encryption_sync_mutex' lock here.
245                let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
246                sync_permit.lock_owned().await
247            }
248
249            NotificationProcessSetup::SingleProcess { sync_service } => {
250                if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
251                    permit_guard
252                } else {
253                    // There's already a sync service active, thus the encryption sync is already
254                    // running elsewhere. As a matter of fact, if the event was encrypted, that
255                    // means we were racing against the encryption sync. Wait a bit, attempt to
256                    // decrypt, and carry on.
257
258                    // We repeat the sleep 3 times at most, each iteration we
259                    // double the amount of time waited, so overall we may wait up to 7 times this
260                    // amount.
261                    let mut wait = 200;
262
263                    debug!("Encryption sync running in background");
264                    for _ in 0..3 {
265                        trace!("waiting for decryption…");
266
267                        sleep(Duration::from_millis(wait)).await;
268
269                        // Note: We specify the cast type in case the
270                        // `experimental-encrypted-state-events` feature is enabled, which provides
271                        // multiple cast implementations.
272                        let new_event = room
273                            .decrypt_event(
274                                raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(),
275                                push_ctx.as_ref(),
276                            )
277                            .await?;
278
279                        match new_event.kind {
280                            matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
281                                utd_info, ..} => {
282                                if utd_info.reason.is_missing_room_key() {
283                                    // Decryption error that could be caused by a missing room
284                                    // key; retry in a few.
285                                    wait *= 2;
286                                } else {
287                                    debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
288                                    return Ok(None);
289                                }
290                            }
291                            _ => {
292                                trace!("Waiting succeeded and event could be decrypted!");
293                                return Ok(Some(new_event));
294                            }
295                        }
296                    }
297
298                    // We couldn't decrypt the event after waiting a few times, abort.
299                    debug!("Timeout waiting for the encryption sync to decrypt notification.");
300                    return Ok(None);
301                }
302            }
303        };
304
305        let encryption_sync = EncryptionSyncService::new(
306            self.client.clone(),
307            Some((Duration::from_secs(3), Duration::from_secs(4))),
308            with_locking,
309        )
310        .await;
311
312        // Just log out errors, but don't have them abort the notification processing:
313        // an undecrypted notification is still better than no
314        // notifications.
315
316        match encryption_sync {
317            Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
318                // Note: We specify the cast type in case the
319                // `experimental-encrypted-state-events` feature is enabled, which provides
320                // multiple cast implementations.
321                Ok(()) => match room.decrypt_event(raw_event.cast_ref_unchecked::<OriginalSyncRoomEncryptedEvent>(), push_ctx.as_ref()).await {
322                    Ok(new_event) => match new_event.kind {
323                        matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
324                            utd_info, ..
325                        } => {
326                            trace!(
327                                "Encryption sync failed to decrypt the event: {:?}",
328                                utd_info.reason
329                            );
330                            Ok(None)
331                        }
332                        _ => {
333                            trace!("Encryption sync managed to decrypt the event.");
334                            Ok(Some(new_event))
335                        }
336                    },
337                    Err(err) => {
338                        trace!("Encryption sync failed to decrypt the event: {err}");
339                        Ok(None)
340                    }
341                },
342                Err(err) => {
343                    warn!("Encryption sync error: {err:#}");
344                    Ok(None)
345                }
346            },
347            Err(err) => {
348                warn!("Encryption sync build error: {err:#}",);
349                Ok(None)
350            }
351        }
352    }
353
354    /// Try to run a sliding sync (without encryption) to retrieve the events
355    /// from the notification.
356    ///
357    /// An event can either be:
358    /// - an invite event,
359    /// - or a non-invite event.
360    ///
361    /// In case it's a non-invite event, it's rather easy: we'll request
362    /// explicit state that'll be useful for building the
363    /// `NotificationItem`, and subscribe to the room which the notification
364    /// relates to.
365    ///
366    /// In case it's an invite-event, it's trickier because the stripped event
367    /// may not contain the event id, so we can't just match on it. Rather,
368    /// we look at stripped room member events that may be fitting (i.e.
369    /// match the current user and are invites), and if the SDK concludes the
370    /// room was in the invited state, and we didn't find the event by id,
371    /// *then* we'll use that stripped room member event.
372    #[instrument(skip_all)]
373    async fn try_sliding_sync(
374        &self,
375        requests: &[NotificationItemsRequest],
376    ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
377        // Serialize all the calls to this method by taking a lock at the beginning,
378        // that will be dropped later.
379        let _guard = self.notification_sync_mutex.lock().await;
380
381        // Set up a sliding sync that only subscribes to the room that had the
382        // notification, so we can figure out the full event and associated
383        // information.
384
385        let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
386
387        let handler_raw_notification = raw_notifications.clone();
388
389        let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
390
391        let timeline_event_handler = self.client.add_event_handler({
392            let requests = requests.clone();
393            move |raw: Raw<AnySyncTimelineEvent>| async move {
394                match &raw.get_field::<OwnedEventId>("event_id") {
395                    Ok(Some(event_id)) => {
396                        let Some(request) =
397                            &requests.iter().find(|request| request.event_ids.contains(event_id))
398                        else {
399                            return;
400                        };
401
402                        let room_id = request.room_id.clone();
403
404                        // found it! There shouldn't be a previous event before, but if
405                        // there is, that should be ok to
406                        // just replace it.
407                        handler_raw_notification.lock().unwrap().insert(
408                            event_id.to_owned(),
409                            (room_id, Some(RawNotificationEvent::Timeline(raw))),
410                        );
411                    }
412                    Ok(None) => {
413                        warn!("a sync event had no event id");
414                    }
415                    Err(err) => {
416                        warn!("failed to deserialize sync event id: {err}");
417                    }
418                }
419            }
420        });
421
422        // We'll only use this event if the room is in the invited state.
423        let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
424
425        let user_id = self.client.user_id().unwrap().to_owned();
426        let handler_raw_invites = raw_invites.clone();
427        let handler_raw_notifications = raw_notifications.clone();
428        let stripped_member_handler = self.client.add_event_handler({
429            let requests = requests.clone();
430            move |raw: Raw<StrippedRoomMemberEvent>| async move {
431                let deserialized = match raw.deserialize() {
432                    Ok(d) => d,
433                    Err(err) => {
434                        warn!("failed to deserialize raw stripped room member event: {err}");
435                        return;
436                    }
437                };
438
439                trace!("received a stripped room member event");
440
441                // Try to match the event by event_id, as it's the most precise. In theory, we
442                // shouldn't receive it, so that's a first attempt.
443                match &raw.get_field::<OwnedEventId>("event_id") {
444                    Ok(Some(event_id)) => {
445                        let request =
446                            &requests.iter().find(|request| request.event_ids.contains(event_id));
447                        if request.is_none() {
448                            return;
449                        }
450                        let room_id = request.unwrap().room_id.clone();
451
452                        // found it! There shouldn't be a previous event before, but if
453                        // there is, that should be ok to
454                        // just replace it.
455                        handler_raw_notifications.lock().unwrap().insert(
456                            event_id.to_owned(),
457                            (room_id, Some(RawNotificationEvent::Invite(raw))),
458                        );
459                        return;
460                    }
461                    Ok(None) => {
462                        warn!("a room member event had no id");
463                    }
464                    Err(err) => {
465                        warn!("failed to deserialize room member event id: {err}");
466                    }
467                }
468
469                // Try to match the event by membership and state_key for the current user.
470                if deserialized.content.membership == MembershipState::Invite
471                    && deserialized.state_key == user_id
472                {
473                    trace!("found an invite event for the current user");
474                    // This could be it! There might be several of these following each other, so
475                    // assume it's the latest one (in sync ordering), and override a previous one if
476                    // present.
477                    handler_raw_invites
478                        .lock()
479                        .unwrap()
480                        .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
481                } else {
482                    trace!("not an invite event, or not for the current user");
483                }
484            }
485        });
486
487        // Room power levels are necessary to build the push context.
488        let required_state = vec![
489            (StateEventType::RoomEncryption, "".to_owned()),
490            (StateEventType::RoomMember, "$LAZY".to_owned()),
491            (StateEventType::RoomMember, "$ME".to_owned()),
492            (StateEventType::RoomCanonicalAlias, "".to_owned()),
493            (StateEventType::RoomName, "".to_owned()),
494            (StateEventType::RoomAvatar, "".to_owned()),
495            (StateEventType::RoomPowerLevels, "".to_owned()),
496            (StateEventType::RoomJoinRules, "".to_owned()),
497            (StateEventType::CallMember, "*".to_owned()),
498            (StateEventType::RoomCreate, "".to_owned()),
499        ];
500
501        let invites = SlidingSyncList::builder("invites")
502            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
503            .timeline_limit(8)
504            .required_state(required_state.clone())
505            .filters(Some(assign!(http::request::ListFilters::default(), {
506                is_invite: Some(true),
507            })));
508
509        let sync = self
510            .client
511            .sliding_sync(Self::CONNECTION_ID)?
512            .poll_timeout(Duration::from_secs(1))
513            .network_timeout(Duration::from_secs(3))
514            .with_account_data_extension(
515                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
516            )
517            .add_list(invites)
518            .build()
519            .await?;
520
521        let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
522        sync.subscribe_to_rooms(
523            &room_ids,
524            Some(assign!(http::request::RoomSubscription::default(), {
525                required_state,
526                timeline_limit: uint!(16)
527            })),
528            true,
529        );
530
531        let mut remaining_attempts = 3;
532
533        let stream = sync.sync();
534        pin_mut!(stream);
535
536        // Sum the expected event count for each room
537        let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
538
539        loop {
540            if stream.next().await.is_none() {
541                // Sliding sync aborted early.
542                break;
543            }
544
545            if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
546                == expected_event_count
547            {
548                // We got the events.
549                break;
550            }
551
552            remaining_attempts -= 1;
553            if remaining_attempts == 0 {
554                // We're out of luck.
555                break;
556            }
557        }
558
559        self.client.remove_event_handler(stripped_member_handler);
560        self.client.remove_event_handler(timeline_event_handler);
561
562        let mut notifications = raw_notifications.clone().lock().unwrap().clone();
563        let mut missing_event_ids = Vec::new();
564
565        // Create the list of missing event ids after the syncs.
566        for request in requests.iter() {
567            for event_id in &request.event_ids {
568                if !notifications.contains_key(event_id) {
569                    missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
570                }
571            }
572        }
573
574        // Try checking if the missing notifications could be invites.
575        for (room_id, missing_event_id) in missing_event_ids {
576            trace!("we didn't have a non-invite event, looking for invited room now");
577            if let Some(room) = self.client.get_room(&room_id) {
578                if room.state() == RoomState::Invited {
579                    if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
580                        notifications.insert(
581                            missing_event_id.to_owned(),
582                            (room_id.to_owned(), stripped_event),
583                        );
584                    }
585                } else {
586                    debug!("the room isn't in the invited state");
587                }
588            } else {
589                warn!(%room_id, "unknown room, can't check for invite events");
590            }
591        }
592
593        let found = if notifications.len() == expected_event_count { "" } else { "not " };
594        trace!("all notification events have{found} been found");
595
596        Ok(notifications)
597    }
598
599    pub async fn get_notification_with_sliding_sync(
600        &self,
601        room_id: &RoomId,
602        event_id: &EventId,
603    ) -> Result<NotificationStatus, Error> {
604        info!("fetching notification event with a sliding sync");
605
606        let request = NotificationItemsRequest {
607            room_id: room_id.to_owned(),
608            event_ids: vec![event_id.to_owned()],
609        };
610
611        let mut get_notifications_result =
612            self.get_notifications_with_sliding_sync(&[request]).await?;
613
614        get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
615    }
616
617    /// Given a (decrypted or not) event, figure out whether it should be
618    /// filtered out for other client-side reasons (such as the sender being
619    /// ignored, for instance), and returns the corresponding
620    /// [`NotificationStatus`].
621    async fn compute_status(
622        &self,
623        room: &Room,
624        push_actions: Option<&[Action]>,
625        raw_event: RawNotificationEvent,
626        state_events: Vec<Raw<AnyStateEvent>>,
627    ) -> Result<NotificationStatus, Error> {
628        if let Some(actions) = push_actions
629            && !actions.iter().any(|a| a.should_notify())
630        {
631            // The event shouldn't notify: return early.
632            return Ok(NotificationStatus::EventFilteredOut);
633        }
634
635        let notification_item =
636            NotificationItem::new(room, raw_event, push_actions, state_events).await?;
637
638        if self.client.is_user_ignored(notification_item.event.sender()).await {
639            Ok(NotificationStatus::EventFilteredOut)
640        } else {
641            Ok(NotificationStatus::Event(Box::new(notification_item)))
642        }
643    }
644
645    /// Get a list of full notifications, given a room id and event ids.
646    ///
647    /// This will run a small sliding sync to retrieve the content of the
648    /// events, along with extra data to form a rich notification context.
649    pub async fn get_notifications_with_sliding_sync(
650        &self,
651        requests: &[NotificationItemsRequest],
652    ) -> Result<BatchNotificationFetchingResult, Error> {
653        let raw_events = self.try_sliding_sync(requests).await?;
654
655        let mut batch_result = BatchNotificationFetchingResult::new();
656
657        for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
658            // At this point it should have been added by the sync, if it's not, give up.
659            let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
660
661            let Some(raw_event) = raw_event else {
662                // The event was not found, so we can't build a notification.
663                batch_result.insert(event_id, Ok(NotificationStatus::EventNotFound));
664                continue;
665            };
666
667            let (raw_event, push_actions) = match &raw_event {
668                RawNotificationEvent::Timeline(timeline_event) => {
669                    // Timeline events may be encrypted, so make sure they get decrypted first.
670                    match self.retry_decryption(&room, timeline_event).await {
671                        Ok(Some(timeline_event)) => {
672                            let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
673                            (
674                                RawNotificationEvent::Timeline(timeline_event.into_raw()),
675                                push_actions,
676                            )
677                        }
678
679                        Ok(None) => {
680                            // The event was either not encrypted in the first place, or we
681                            // couldn't decrypt it after retrying. Use the raw event as is.
682                            match room.event_push_actions(timeline_event).await {
683                                Ok(push_actions) => (raw_event.clone(), push_actions),
684                                Err(err) => {
685                                    // Could not get push actions.
686                                    batch_result.insert(event_id, Err(err.into()));
687                                    continue;
688                                }
689                            }
690                        }
691
692                        Err(err) => {
693                            batch_result.insert(event_id, Err(err));
694                            continue;
695                        }
696                    }
697                }
698
699                RawNotificationEvent::Invite(invite_event) => {
700                    // Invite events can't be encrypted, so they should be in clear text.
701                    match room.event_push_actions(invite_event).await {
702                        Ok(push_actions) => {
703                            (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
704                        }
705                        Err(err) => {
706                            batch_result.insert(event_id, Err(err.into()));
707                            continue;
708                        }
709                    }
710                }
711            };
712
713            let notification_status_result =
714                self.compute_status(&room, push_actions.as_deref(), raw_event, Vec::new()).await;
715
716            batch_result.insert(event_id, notification_status_result);
717        }
718
719        Ok(batch_result)
720    }
721
722    /// Retrieve a notification using a `/context` query.
723    ///
724    /// This is for clients that are already running other sliding syncs in the
725    /// same process, so that most of the contextual information for the
726    /// notification should already be there. In particular, the room containing
727    /// the event MUST be known (via a sliding sync for invites, or another
728    /// sliding sync).
729    ///
730    /// An error result means that we couldn't resolve the notification; in that
731    /// case, a dummy notification may be displayed instead. A `None` result
732    /// means the notification has been filtered out by the user's push
733    /// rules.
734    pub async fn get_notification_with_context(
735        &self,
736        room_id: &RoomId,
737        event_id: &EventId,
738    ) -> Result<NotificationStatus, Error> {
739        info!("fetching notification event with a /context query");
740
741        // See above comment.
742        let Some(room) = self.parent_client.get_room(room_id) else {
743            return Err(Error::UnknownRoom);
744        };
745
746        let response = room.event_with_context(event_id, true, uint!(0), None).await?;
747
748        let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
749        let state_events = response.state;
750
751        if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
752            timeline_event = decrypted_event;
753        }
754
755        let push_actions = timeline_event.push_actions().map(ToOwned::to_owned);
756
757        self.compute_status(
758            &room,
759            push_actions.as_deref(),
760            RawNotificationEvent::Timeline(timeline_event.into_raw()),
761            state_events,
762        )
763        .await
764    }
765}
766
767fn is_event_encrypted(event_type: TimelineEventType) -> bool {
768    let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
769
770    #[cfg(feature = "unstable-msc3956")]
771    let is_still_encrypted =
772        is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
773
774    is_still_encrypted
775}
776
777#[derive(Debug)]
778pub enum NotificationStatus {
779    /// The event has been found and was not filtered out.
780    Event(Box<NotificationItem>),
781    /// The event couldn't be found in the network queries used to find it.
782    EventNotFound,
783    /// The event has been filtered out, either because of the user's push
784    /// rules, or because the user which triggered it is ignored by the
785    /// current user.
786    EventFilteredOut,
787}
788
789#[derive(Debug, Clone)]
790pub struct NotificationItemsRequest {
791    pub room_id: OwnedRoomId,
792    pub event_ids: Vec<OwnedEventId>,
793}
794
795type BatchNotificationFetchingResult = BTreeMap<OwnedEventId, Result<NotificationStatus, Error>>;
796
797/// The Notification event as it was fetched from remote for the
798/// given `event_id`, represented as Raw but decrypted, thus only
799/// whether it is an invite or regular Timeline event has been
800/// determined.
801#[derive(Debug, Clone)]
802pub enum RawNotificationEvent {
803    /// The raw event for a timeline event
804    Timeline(Raw<AnySyncTimelineEvent>),
805    /// The notification contains an invitation with the given
806    /// StrippedRoomMemberEvent (in raw here)
807    Invite(Raw<StrippedRoomMemberEvent>),
808}
809
810/// The deserialized Event as it was fetched from remote for the
811/// given `event_id` and after decryption (if possible).
812#[derive(Debug)]
813pub enum NotificationEvent {
814    /// The Notification was for a TimelineEvent
815    Timeline(Box<AnySyncTimelineEvent>),
816    /// The Notification is an invite with the given stripped room event data
817    Invite(Box<StrippedRoomMemberEvent>),
818}
819
820impl NotificationEvent {
821    pub fn sender(&self) -> &UserId {
822        match self {
823            NotificationEvent::Timeline(ev) => ev.sender(),
824            NotificationEvent::Invite(ev) => &ev.sender,
825        }
826    }
827
828    /// Returns the root event id of the thread the notification event is in, if
829    /// any.
830    fn thread_id(&self) -> Option<OwnedEventId> {
831        let NotificationEvent::Timeline(sync_timeline_event) = &self else {
832            return None;
833        };
834        let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
835            return None;
836        };
837        let content = event.original_content()?;
838        match content {
839            AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
840                Relation::Thread(thread) => Some(thread.event_id),
841                _ => None,
842            },
843            _ => None,
844        }
845    }
846}
847
848/// A notification with its full content.
849#[derive(Debug)]
850pub struct NotificationItem {
851    /// Underlying Ruma event.
852    pub event: NotificationEvent,
853
854    /// The raw of the underlying event.
855    pub raw_event: RawNotificationEvent,
856
857    /// Display name of the sender.
858    pub sender_display_name: Option<String>,
859    /// Avatar URL of the sender.
860    pub sender_avatar_url: Option<String>,
861    /// Is the sender's name ambiguous?
862    pub is_sender_name_ambiguous: bool,
863
864    /// Room computed display name.
865    pub room_computed_display_name: String,
866    /// Room avatar URL.
867    pub room_avatar_url: Option<String>,
868    /// Room canonical alias.
869    pub room_canonical_alias: Option<String>,
870    /// Room topic.
871    pub room_topic: Option<String>,
872    /// Room join rule.
873    ///
874    /// Set to `None` if the join rule for this room is not available.
875    pub room_join_rule: Option<JoinRule>,
876    /// Is this room encrypted?
877    pub is_room_encrypted: Option<bool>,
878    /// Is this room considered a direct message?
879    pub is_direct_message_room: bool,
880    /// Numbers of members who joined the room.
881    pub joined_members_count: u64,
882    /// Is the room a space?
883    pub is_space: bool,
884
885    /// Is it a noisy notification? (i.e. does any push action contain a sound
886    /// action)
887    ///
888    /// It is set if and only if the push actions could be determined.
889    pub is_noisy: Option<bool>,
890    pub has_mention: Option<bool>,
891    pub thread_id: Option<OwnedEventId>,
892
893    /// The push actions for this notification (notify, sound, highlight, etc.).
894    pub actions: Option<Vec<Action>>,
895}
896
897impl NotificationItem {
898    async fn new(
899        room: &Room,
900        raw_event: RawNotificationEvent,
901        push_actions: Option<&[Action]>,
902        state_events: Vec<Raw<AnyStateEvent>>,
903    ) -> Result<Self, Error> {
904        let event = match &raw_event {
905            RawNotificationEvent::Timeline(raw_event) => {
906                let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
907                if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
908                    SyncRoomMessageEvent::Original(ev),
909                )) = &mut event
910                {
911                    ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
912                }
913                NotificationEvent::Timeline(Box::new(event))
914            }
915            RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
916                raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
917            )),
918        };
919
920        let sender = match room.state() {
921            RoomState::Invited => room.invite_details().await?.inviter,
922            _ => room.get_member_no_sync(event.sender()).await?,
923        };
924
925        let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
926            match &sender {
927                Some(sender) => (
928                    sender.display_name().map(|s| s.to_owned()),
929                    sender.avatar_url().map(|s| s.to_string()),
930                    sender.name_ambiguous(),
931                ),
932                None => (None, None, false),
933            };
934
935        if sender_display_name.is_none() || sender_avatar_url.is_none() {
936            let sender_id = event.sender();
937            for ev in state_events {
938                let ev = match ev.deserialize() {
939                    Ok(ev) => ev,
940                    Err(err) => {
941                        warn!("Failed to deserialize a state event: {err}");
942                        continue;
943                    }
944                };
945                if ev.sender() != sender_id {
946                    continue;
947                }
948                if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
949                    content,
950                    ..
951                }) = ev.content()
952                {
953                    if sender_display_name.is_none() {
954                        sender_display_name = content.displayname;
955                    }
956                    if sender_avatar_url.is_none() {
957                        sender_avatar_url = content.avatar_url.map(|url| url.to_string());
958                    }
959                }
960            }
961        }
962
963        let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
964        let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
965        let thread_id = event.thread_id().clone();
966
967        let item = NotificationItem {
968            event,
969            raw_event,
970            sender_display_name,
971            sender_avatar_url,
972            is_sender_name_ambiguous,
973            room_computed_display_name: room.display_name().await?.to_string(),
974            room_avatar_url: room.avatar_url().map(|s| s.to_string()),
975            room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
976            room_topic: room.topic(),
977            room_join_rule: room.join_rule(),
978            is_direct_message_room: room.is_direct().await?,
979            is_room_encrypted: room
980                .latest_encryption_state()
981                .await
982                .map(|state| state.is_encrypted())
983                .ok(),
984            joined_members_count: room.joined_members_count(),
985            is_space: room.is_space(),
986            is_noisy,
987            has_mention,
988            thread_id,
989            actions: push_actions.map(|actions| actions.to_vec()),
990        };
991
992        Ok(item)
993    }
994
995    /// Returns whether this room is public or not, based on the join rule.
996    ///
997    /// Maybe return `None` if the join rule is not available.
998    pub fn is_public(&self) -> Option<bool> {
999        self.room_join_rule.as_ref().map(|rule| matches!(rule, JoinRule::Public))
1000    }
1001}
1002
1003/// An error for the [`NotificationClient`].
1004#[derive(Debug, Error)]
1005pub enum Error {
1006    #[error(transparent)]
1007    BuildingLocalClient(ClientBuildError),
1008
1009    /// The room associated to this event wasn't found.
1010    #[error("unknown room for a notification")]
1011    UnknownRoom,
1012
1013    /// The Ruma event contained within this notification couldn't be parsed.
1014    #[error("invalid ruma event")]
1015    InvalidRumaEvent,
1016
1017    /// When calling `get_notification_with_sliding_sync`, the room was missing
1018    /// in the response.
1019    #[error("the sliding sync response doesn't include the target room")]
1020    SlidingSyncEmptyRoom,
1021
1022    #[error("the event was missing in the `/context` query")]
1023    ContextMissingEvent,
1024
1025    /// An error forwarded from the client.
1026    #[error(transparent)]
1027    SdkError(#[from] matrix_sdk::Error),
1028
1029    /// An error forwarded from the underlying state store.
1030    #[error(transparent)]
1031    StoreError(#[from] StoreError),
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036    use assert_matches2::assert_let;
1037    use matrix_sdk::test_utils::mocks::MatrixMockServer;
1038    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1039    use ruma::{event_id, room_id, user_id};
1040
1041    use crate::notification_client::{NotificationItem, RawNotificationEvent};
1042
1043    #[async_test]
1044    async fn test_notification_item_returns_thread_id() {
1045        let server = MatrixMockServer::new().await;
1046        let client = server.client_builder().build().await;
1047
1048        let room_id = room_id!("!a:b.c");
1049        let thread_root_event_id = event_id!("$root:b.c");
1050        let message = EventFactory::new()
1051            .room(room_id)
1052            .sender(user_id!("@sender:b.c"))
1053            .text_msg("Threaded")
1054            .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1055            .into_raw_sync();
1056        let room = server.sync_joined_room(&client, room_id).await;
1057
1058        let raw_notification_event = RawNotificationEvent::Timeline(message);
1059        let notification_item =
1060            NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1061                .await
1062                .expect("Could not create notification item");
1063
1064        assert_let!(Some(thread_id) = notification_item.thread_id);
1065        assert_eq!(thread_id, thread_root_event_id);
1066    }
1067}