matrix_sdk_ui/
notification_client.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for that specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{
17        btree_map::{IntoIter, Iter},
18        BTreeMap,
19    },
20    sync::{Arc, Mutex},
21    time::Duration,
22};
23
24use futures_util::{pin_mut, StreamExt as _};
25use matrix_sdk::{
26    room::Room, sleep::sleep, Client, ClientBuildError, SlidingSyncList, SlidingSyncMode,
27};
28use matrix_sdk_base::{deserialized_responses::TimelineEvent, RoomState, StoreError};
29use ruma::{
30    api::client::sync::sync_events::v5 as http,
31    assign,
32    directory::RoomTypeFilter,
33    events::{
34        room::{
35            join_rules::JoinRule,
36            member::{MembershipState, StrippedRoomMemberEvent},
37            message::{Relation, SyncRoomMessageEvent},
38        },
39        AnyFullStateEventContent, AnyMessageLikeEventContent, AnyStateEvent,
40        AnySyncMessageLikeEvent, AnySyncTimelineEvent, FullStateEventContent, StateEventType,
41        TimelineEventType,
42    },
43    html::RemoveReplyFallback,
44    push::Action,
45    serde::Raw,
46    uint, EventId, OwnedEventId, OwnedRoomId, RoomId, UserId,
47};
48use thiserror::Error;
49use tokio::sync::Mutex as AsyncMutex;
50use tracing::{debug, info, instrument, trace, warn};
51
52use crate::{
53    encryption_sync_service::{EncryptionSyncPermit, EncryptionSyncService, WithLocking},
54    sync_service::SyncService,
55    DEFAULT_SANITIZER_MODE,
56};
57
58/// What kind of process setup do we have for this notification client?
59#[derive(Clone)]
60pub enum NotificationProcessSetup {
61    /// The notification client may run on a separate process than the rest of
62    /// the app.
63    ///
64    /// For instance, this is the case on iOS, where notifications are handled
65    /// in a separate process (the Notification Service Extension, aka NSE).
66    ///
67    /// In that case, a cross-process lock will be used to coordinate writes
68    /// into the stores handled by the SDK.
69    MultipleProcesses,
70
71    /// The notification client runs in the same process as the rest of the
72    /// `Client` performing syncs.
73    ///
74    /// For instance, this is the case on Android, where a notification will
75    /// wake up the main app process.
76    ///
77    /// In that case, a smart reference to the [`SyncService`] must be provided.
78    SingleProcess { sync_service: Arc<SyncService> },
79}
80
81/// A client specialized for handling push notifications received over the
82/// network, for an app.
83///
84/// In particular, it takes care of running a full decryption sync, in case the
85/// event in the notification was impossible to decrypt beforehand.
86pub struct NotificationClient {
87    /// SDK client that uses an in-memory state store.
88    client: Client,
89
90    /// SDK client that uses the same state store as the caller's context.
91    parent_client: Client,
92
93    /// Is the notification client running on its own process or not?
94    process_setup: NotificationProcessSetup,
95
96    /// A mutex to serialize requests to the notifications sliding sync.
97    ///
98    /// If several notifications come in at the same time (e.g. network was
99    /// unreachable because of airplane mode or something similar), then we
100    /// need to make sure that repeated calls to `get_notification` won't
101    /// cause multiple requests with the same `conn_id` we're using for
102    /// notifications. This mutex solves this by sequentializing the requests.
103    notification_sync_mutex: AsyncMutex<()>,
104
105    /// A mutex to serialize requests to the encryption sliding sync that's used
106    /// in case we didn't have the keys to decipher an event.
107    ///
108    /// Same reasoning as [`Self::notification_sync_mutex`].
109    encryption_sync_mutex: AsyncMutex<()>,
110}
111
112impl NotificationClient {
113    const CONNECTION_ID: &'static str = "notifications";
114    const LOCK_ID: &'static str = "notifications";
115
116    /// Create a new notification client.
117    pub async fn new(
118        parent_client: Client,
119        process_setup: NotificationProcessSetup,
120    ) -> Result<Self, Error> {
121        let client = parent_client.notification_client(Self::LOCK_ID.to_owned()).await?;
122
123        Ok(NotificationClient {
124            client,
125            parent_client,
126            notification_sync_mutex: AsyncMutex::new(()),
127            encryption_sync_mutex: AsyncMutex::new(()),
128            process_setup,
129        })
130    }
131
132    /// Fetches a room by its ID using the in-memory state store backed client.
133    /// Useful to retrieve room information after running the limited
134    /// notification client sliding sync loop.
135    pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
136        self.client.get_room(room_id)
137    }
138
139    /// Fetches the content of a notification.
140    ///
141    /// This will first try to get the notification using a short-lived sliding
142    /// sync, and if the sliding-sync can't find the event, then it'll use a
143    /// `/context` query to find the event with associated member information.
144    ///
145    /// An error result means that we couldn't resolve the notification; in that
146    /// case, a dummy notification may be displayed instead. A `None` result
147    /// means the notification has been filtered out by the user's push
148    /// rules.
149    #[instrument(skip(self))]
150    pub async fn get_notification(
151        &self,
152        room_id: &RoomId,
153        event_id: &EventId,
154    ) -> Result<Option<NotificationItem>, Error> {
155        match self.get_notification_with_sliding_sync(room_id, event_id).await? {
156            NotificationStatus::Event(event) => Ok(Some(*event)),
157            NotificationStatus::EventFilteredOut => Ok(None),
158            NotificationStatus::EventNotFound => {
159                self.get_notification_with_context(room_id, event_id).await
160            }
161        }
162    }
163
164    /// Fetches the content of several notifications.
165    ///
166    /// This will first try to get the notifications using a short-lived sliding
167    /// sync, and if the sliding-sync can't find the events, then it'll use a
168    /// `/context` query to find the events with associated member information.
169    ///
170    /// An error result at the top level means that something failed when trying
171    /// to set up the notification fetching. For each notification item you can
172    /// also receive an error, which means something failed when trying to fetch
173    /// that particular notification (decryption, fetching push actions, etc.);
174    /// in that case, a dummy notification may be displayed instead. A
175    /// `None` result means the notification has been filtered out by the
176    /// user's push rules.
177    pub async fn get_notifications(
178        &self,
179        requests: &[NotificationItemsRequest],
180    ) -> Result<BatchNotificationFetchingResult<NotificationItem>, Error> {
181        let mut notifications = self.get_notifications_with_sliding_sync(requests).await?;
182        let mut notification_items = BatchNotificationFetchingResult::new();
183
184        for request in requests {
185            for event_id in &request.event_ids {
186                match notifications.remove(event_id) {
187                    Some(Ok(NotificationStatus::Event(item))) => {
188                        notification_items.add_notification(event_id.to_owned(), *item);
189                    }
190                    Some(Ok(NotificationStatus::EventNotFound)) | None => {
191                        match self.get_notification_with_context(&request.room_id, event_id).await {
192                            Ok(Some(item)) => {
193                                notification_items.add_notification(event_id.to_owned(), item)
194                            }
195                            // Event filtered out, do nothing
196                            Ok(None) => (),
197                            Err(error) => notification_items
198                                .mark_fetching_notification_failed(event_id.to_owned(), error),
199                        }
200                    }
201                    // Event filtered out, do nothing
202                    Some(Ok(NotificationStatus::EventFilteredOut)) => (),
203                    Some(Err(e)) => {
204                        notification_items
205                            .mark_fetching_notification_failed(event_id.to_owned(), e);
206                    }
207                }
208            }
209        }
210
211        Ok(notification_items)
212    }
213
214    /// Run an encryption sync loop, in case an event is still encrypted.
215    ///
216    /// Will return true if and only:
217    /// - the event was encrypted,
218    /// - we successfully ran an encryption sync or waited long enough for an
219    ///   existing encryption sync to
220    /// decrypt the event.
221    #[instrument(skip_all)]
222    async fn retry_decryption(
223        &self,
224        room: &Room,
225        raw_event: &Raw<AnySyncTimelineEvent>,
226    ) -> Result<Option<TimelineEvent>, Error> {
227        let event: AnySyncTimelineEvent =
228            raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
229
230        if !is_event_encrypted(event.event_type()) {
231            return Ok(None);
232        }
233
234        // Serialize calls to this function.
235        let _guard = self.encryption_sync_mutex.lock().await;
236
237        // The message is still encrypted, and the client is configured to retry
238        // decryption.
239        //
240        // Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop:
241        // - the first iteration allows to get SS events as well as send e2ee requests.
242        // - the second one let the SS homeserver forward events triggered by the
243        //   sending of e2ee requests.
244        //
245        // Keep timeouts small for both, since we might be short on time.
246
247        let with_locking = WithLocking::from(matches!(
248            self.process_setup,
249            NotificationProcessSetup::MultipleProcesses
250        ));
251
252        let push_ctx = room.push_context().await?;
253        let sync_permit_guard = match &self.process_setup {
254            NotificationProcessSetup::MultipleProcesses => {
255                // We're running on our own process, dedicated for notifications. In that case,
256                // create a dummy sync permit; we're guaranteed there's at most one since we've
257                // acquired the `encryption_sync_mutex' lock here.
258                let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new()));
259                sync_permit.lock_owned().await
260            }
261
262            NotificationProcessSetup::SingleProcess { sync_service } => {
263                if let Some(permit_guard) = sync_service.try_get_encryption_sync_permit() {
264                    permit_guard
265                } else {
266                    // There's already a sync service active, thus the encryption sync is already
267                    // running elsewhere. As a matter of fact, if the event was encrypted, that
268                    // means we were racing against the encryption sync. Wait a bit, attempt to
269                    // decrypt, and carry on.
270
271                    // We repeat the sleep 3 times at most, each iteration we
272                    // double the amount of time waited, so overall we may wait up to 7 times this
273                    // amount.
274                    let mut wait = 200;
275
276                    debug!("Encryption sync running in background");
277                    for _ in 0..3 {
278                        trace!("waiting for decryption…");
279
280                        sleep(Duration::from_millis(wait)).await;
281
282                        let new_event =
283                            room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await?;
284
285                        match new_event.kind {
286                            matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
287                                utd_info, ..} => {
288                                if utd_info.reason.is_missing_room_key() {
289                                    // Decryption error that could be caused by a missing room
290                                    // key; retry in a few.
291                                    wait *= 2;
292                                } else {
293                                    debug!("Event could not be decrypted, but waiting longer is unlikely to help: {:?}", utd_info.reason);
294                                    return Ok(None);
295                                }
296                            }
297                            _ => {
298                                trace!("Waiting succeeded and event could be decrypted!");
299                                return Ok(Some(new_event));
300                            }
301                        }
302                    }
303
304                    // We couldn't decrypt the event after waiting a few times, abort.
305                    debug!("Timeout waiting for the encryption sync to decrypt notification.");
306                    return Ok(None);
307                }
308            }
309        };
310
311        let encryption_sync = EncryptionSyncService::new(
312            self.client.clone(),
313            Some((Duration::from_secs(3), Duration::from_secs(4))),
314            with_locking,
315        )
316        .await;
317
318        // Just log out errors, but don't have them abort the notification processing:
319        // an undecrypted notification is still better than no
320        // notifications.
321
322        match encryption_sync {
323            Ok(sync) => match sync.run_fixed_iterations(2, sync_permit_guard).await {
324                Ok(()) => match room.decrypt_event(raw_event.cast_ref(), push_ctx.as_ref()).await {
325                    Ok(new_event) => match new_event.kind {
326                        matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt {
327                            utd_info, ..
328                        } => {
329                            trace!(
330                                "Encryption sync failed to decrypt the event: {:?}",
331                                utd_info.reason
332                            );
333                            Ok(None)
334                        }
335                        _ => {
336                            trace!("Encryption sync managed to decrypt the event.");
337                            Ok(Some(new_event))
338                        }
339                    },
340                    Err(err) => {
341                        trace!("Encryption sync failed to decrypt the event: {err}");
342                        Ok(None)
343                    }
344                },
345                Err(err) => {
346                    warn!("Encryption sync error: {err:#}");
347                    Ok(None)
348                }
349            },
350            Err(err) => {
351                warn!("Encryption sync build error: {err:#}",);
352                Ok(None)
353            }
354        }
355    }
356
357    /// Try to run a sliding sync (without encryption) to retrieve the events
358    /// from the notification.
359    ///
360    /// An event can either be:
361    /// - an invite event,
362    /// - or a non-invite event.
363    ///
364    /// In case it's a non-invite event, it's rather easy: we'll request
365    /// explicit state that'll be useful for building the
366    /// `NotificationItem`, and subscribe to the room which the notification
367    /// relates to.
368    ///
369    /// In case it's an invite-event, it's trickier because the stripped event
370    /// may not contain the event id, so we can't just match on it. Rather,
371    /// we look at stripped room member events that may be fitting (i.e.
372    /// match the current user and are invites), and if the SDK concludes the
373    /// room was in the invited state, and we didn't find the event by id,
374    /// *then* we'll use that stripped room member event.
375    #[instrument(skip_all)]
376    async fn try_sliding_sync(
377        &self,
378        requests: &[NotificationItemsRequest],
379    ) -> Result<BTreeMap<OwnedEventId, (OwnedRoomId, Option<RawNotificationEvent>)>, Error> {
380        // Serialize all the calls to this method by taking a lock at the beginning,
381        // that will be dropped later.
382        let _guard = self.notification_sync_mutex.lock().await;
383
384        // Set up a sliding sync that only subscribes to the room that had the
385        // notification, so we can figure out the full event and associated
386        // information.
387
388        let raw_notifications = Arc::new(Mutex::new(BTreeMap::new()));
389
390        let handler_raw_notification = raw_notifications.clone();
391
392        let requests = Arc::new(requests.iter().map(|req| (*req).clone()).collect::<Vec<_>>());
393
394        let timeline_event_handler = self.client.add_event_handler({
395            let requests = requests.clone();
396            move |raw: Raw<AnySyncTimelineEvent>| async move {
397                match &raw.get_field::<OwnedEventId>("event_id") {
398                    Ok(Some(event_id)) => {
399                        let request =
400                            &requests.iter().find(|request| request.event_ids.contains(event_id));
401                        if request.is_none() {
402                            return;
403                        }
404                        let room_id = request.unwrap().room_id.clone();
405                        for request in requests.iter() {
406                            if request.event_ids.contains(event_id) {
407                                // found it! There shouldn't be a previous event before, but if
408                                // there is, that should be ok to
409                                // just replace it.
410                                handler_raw_notification.lock().unwrap().insert(
411                                    event_id.to_owned(),
412                                    (room_id, Some(RawNotificationEvent::Timeline(raw))),
413                                );
414                                return;
415                            }
416                        }
417                    }
418                    Ok(None) => {
419                        warn!("a sync event had no event id");
420                    }
421                    Err(err) => {
422                        warn!("a sync event id couldn't be decoded: {err}");
423                    }
424                }
425            }
426        });
427
428        // We'll only use this event if the room is in the invited state.
429        let raw_invites = Arc::new(Mutex::new(BTreeMap::new()));
430
431        let user_id = self.client.user_id().unwrap().to_owned();
432        let handler_raw_invites = raw_invites.clone();
433        let handler_raw_notifications = raw_notifications.clone();
434        let stripped_member_handler = self.client.add_event_handler({
435            let requests = requests.clone();
436            move |raw: Raw<StrippedRoomMemberEvent>| async move {
437                let deserialized = match raw.deserialize() {
438                    Ok(d) => d,
439                    Err(err) => {
440                        warn!("failed to deserialize raw stripped room member event: {err}");
441                        return;
442                    }
443                };
444
445                trace!("received a stripped room member event");
446
447                // Try to match the event by event_id, as it's the most precise. In theory, we
448                // shouldn't receive it, so that's a first attempt.
449                match &raw.get_field::<OwnedEventId>("event_id") {
450                    Ok(Some(event_id)) => {
451                        let request =
452                            &requests.iter().find(|request| request.event_ids.contains(event_id));
453                        if request.is_none() {
454                            return;
455                        }
456                        let room_id = request.unwrap().room_id.clone();
457
458                        // found it! There shouldn't be a previous event before, but if
459                        // there is, that should be ok to
460                        // just replace it.
461                        handler_raw_notifications.lock().unwrap().insert(
462                            event_id.to_owned(),
463                            (room_id, Some(RawNotificationEvent::Invite(raw))),
464                        );
465                        return;
466                    }
467                    Ok(None) => {
468                        debug!("a room member event had no id");
469                    }
470                    Err(err) => {
471                        debug!("a room member event id couldn't be decoded: {err}");
472                    }
473                }
474
475                // Try to match the event by membership and state_key for the current user.
476                if deserialized.content.membership == MembershipState::Invite
477                    && deserialized.state_key == user_id
478                {
479                    debug!("found an invite event for the current user");
480                    // This could be it! There might be several of these following each other, so
481                    // assume it's the latest one (in sync ordering), and override a previous one if
482                    // present.
483                    handler_raw_invites
484                        .lock()
485                        .unwrap()
486                        .insert(deserialized.state_key, Some(RawNotificationEvent::Invite(raw)));
487                } else {
488                    debug!("not an invite event, or not for the current user");
489                }
490            }
491        });
492
493        // Room power levels are necessary to build the push context.
494        let required_state = vec![
495            (StateEventType::RoomEncryption, "".to_owned()),
496            (StateEventType::RoomMember, "$LAZY".to_owned()),
497            (StateEventType::RoomMember, "$ME".to_owned()),
498            (StateEventType::RoomCanonicalAlias, "".to_owned()),
499            (StateEventType::RoomName, "".to_owned()),
500            (StateEventType::RoomPowerLevels, "".to_owned()),
501            (StateEventType::CallMember, "*".to_owned()),
502        ];
503
504        let invites = SlidingSyncList::builder("invites")
505            .sync_mode(SlidingSyncMode::new_selective().add_range(0..=16))
506            .timeline_limit(8)
507            .required_state(required_state.clone())
508            .filters(Some(assign!(http::request::ListFilters::default(), {
509                is_invite: Some(true),
510                not_room_types: vec![RoomTypeFilter::Space],
511            })));
512
513        let sync = self
514            .client
515            .sliding_sync(Self::CONNECTION_ID)?
516            .poll_timeout(Duration::from_secs(1))
517            .network_timeout(Duration::from_secs(3))
518            .with_account_data_extension(
519                assign!(http::request::AccountData::default(), { enabled: Some(true) }),
520            )
521            .add_list(invites)
522            .build()
523            .await?;
524
525        let room_ids = requests.iter().map(|req| req.room_id.as_ref()).collect::<Vec<_>>();
526        sync.subscribe_to_rooms(
527            &room_ids,
528            Some(assign!(http::request::RoomSubscription::default(), {
529                required_state,
530                timeline_limit: uint!(16)
531            })),
532            true,
533        );
534
535        let mut remaining_attempts = 3;
536
537        let stream = sync.sync();
538        pin_mut!(stream);
539
540        // Sum the expected event count for each room
541        let expected_event_count = requests.iter().map(|req| req.event_ids.len()).sum::<usize>();
542
543        loop {
544            if stream.next().await.is_none() {
545                // Sliding sync aborted early.
546                break;
547            }
548
549            if raw_notifications.lock().unwrap().len() + raw_invites.lock().unwrap().len()
550                == expected_event_count
551            {
552                // We got the events.
553                break;
554            }
555
556            remaining_attempts -= 1;
557            if remaining_attempts == 0 {
558                // We're out of luck.
559                break;
560            }
561        }
562
563        self.client.remove_event_handler(stripped_member_handler);
564        self.client.remove_event_handler(timeline_event_handler);
565
566        let mut notifications = raw_notifications.clone().lock().unwrap().clone();
567        let mut missing_event_ids = Vec::new();
568
569        // Create the list of missing event ids after the syncs
570        for request in requests.iter() {
571            for event_id in &request.event_ids {
572                if !notifications.contains_key(event_id) {
573                    missing_event_ids.push((request.room_id.to_owned(), event_id.to_owned()));
574                }
575            }
576        }
577
578        // Try checking if the missing notifications could be invites
579        for (room_id, missing_event_id) in missing_event_ids {
580            trace!("we didn't have a non-invite event, looking for invited room now");
581            if let Some(room) = self.client.get_room(&room_id) {
582                if room.state() == RoomState::Invited {
583                    if let Some((_, stripped_event)) = raw_invites.lock().unwrap().pop_first() {
584                        notifications.insert(
585                            missing_event_id.to_owned(),
586                            (room_id.to_owned(), stripped_event),
587                        );
588                    }
589                } else {
590                    debug!("the room isn't in the invited state");
591                }
592            } else {
593                debug!("the room isn't an invite");
594            }
595        }
596
597        let found = if notifications.len() == expected_event_count { "" } else { "not " };
598        trace!("all notification events have{found} been found");
599
600        Ok(notifications)
601    }
602
603    pub async fn get_notification_with_sliding_sync(
604        &self,
605        room_id: &RoomId,
606        event_id: &EventId,
607    ) -> Result<NotificationStatus, Error> {
608        let event_ids = vec![event_id.to_owned()];
609        let request = NotificationItemsRequest { room_id: room_id.to_owned(), event_ids };
610        let mut get_notifications_result =
611            self.get_notifications_with_sliding_sync(&[request]).await?;
612        get_notifications_result.remove(event_id).unwrap_or(Ok(NotificationStatus::EventNotFound))
613    }
614
615    /// Get a list of full notifications, given a room id and event ids.
616    ///
617    /// This will run a small sliding sync to retrieve the content of the
618    /// events, along with extra data to form a rich notification context.
619    pub async fn get_notifications_with_sliding_sync(
620        &self,
621        requests: &[NotificationItemsRequest],
622    ) -> Result<BatchNotificationFetchingResult<NotificationStatus>, Error> {
623        let raw_events = self.try_sliding_sync(requests).await?;
624
625        let mut result = BatchNotificationFetchingResult::new();
626
627        for (event_id, (room_id, raw_event)) in raw_events.into_iter() {
628            // At this point it should have been added by the sync, if it's not, give up.
629            let Some(room) = self.client.get_room(&room_id) else { return Err(Error::UnknownRoom) };
630
631            if let Some(raw_event) = raw_event {
632                let (raw_event, push_actions) = match &raw_event {
633                    RawNotificationEvent::Timeline(timeline_event) => {
634                        // Timeline events may be encrypted, so make sure they get decrypted first.
635                        match self.retry_decryption(&room, timeline_event).await {
636                            Ok(Some(mut timeline_event)) => {
637                                let push_actions = timeline_event.push_actions.take();
638                                (
639                                    RawNotificationEvent::Timeline(timeline_event.into_raw()),
640                                    push_actions,
641                                )
642                            }
643                            Ok(None) => {
644                                match room.event_push_actions(timeline_event).await {
645                                    Ok(push_actions) => (raw_event.clone(), push_actions),
646                                    Err(error) => {
647                                        // Could not get push actions,
648                                        result.mark_fetching_notification_failed(
649                                            event_id,
650                                            error.into(),
651                                        );
652                                        continue;
653                                    }
654                                }
655                            }
656                            Err(error) => {
657                                result.mark_fetching_notification_failed(event_id, error);
658                                continue;
659                            }
660                        }
661                    }
662                    RawNotificationEvent::Invite(invite_event) => {
663                        // Invite events can't be encrypted, so they should be in clear text.
664                        match room.event_push_actions(invite_event).await {
665                            Ok(push_actions) => {
666                                (RawNotificationEvent::Invite(invite_event.clone()), push_actions)
667                            }
668                            Err(error) => {
669                                result.mark_fetching_notification_failed(event_id, error.into());
670                                continue;
671                            }
672                        }
673                    }
674                };
675
676                let should_notify = push_actions
677                    .as_ref()
678                    .map(|actions| actions.iter().any(|a| a.should_notify()))
679                    .unwrap_or(false);
680
681                if !should_notify {
682                    result.add_notification(event_id, NotificationStatus::EventFilteredOut);
683                } else {
684                    let notification_status = NotificationItem::new(
685                        &room,
686                        raw_event,
687                        push_actions.as_deref(),
688                        Vec::new(),
689                    )
690                    .await
691                    .map(|event| NotificationStatus::Event(Box::new(event)));
692
693                    match notification_status {
694                        Ok(notification_item) => {
695                            result.add_notification(event_id, notification_item);
696                        }
697                        Err(error) => {
698                            result.mark_fetching_notification_failed(event_id, error);
699                        }
700                    }
701                }
702            } else {
703                result.add_notification(event_id, NotificationStatus::EventNotFound);
704            }
705        }
706
707        Ok(result)
708    }
709
710    /// Retrieve a notification using a `/context` query.
711    ///
712    /// This is for clients that are already running other sliding syncs in the
713    /// same process, so that most of the contextual information for the
714    /// notification should already be there. In particular, the room containing
715    /// the event MUST be known (via a sliding sync for invites, or another
716    /// sliding sync).
717    ///
718    /// An error result means that we couldn't resolve the notification; in that
719    /// case, a dummy notification may be displayed instead. A `None` result
720    /// means the notification has been filtered out by the user's push
721    /// rules.
722    pub async fn get_notification_with_context(
723        &self,
724        room_id: &RoomId,
725        event_id: &EventId,
726    ) -> Result<Option<NotificationItem>, Error> {
727        info!("fetching notification event with a /context query");
728
729        // See above comment.
730        let Some(room) = self.parent_client.get_room(room_id) else {
731            return Err(Error::UnknownRoom);
732        };
733
734        let response = room.event_with_context(event_id, true, uint!(0), None).await?;
735
736        let mut timeline_event = response.event.ok_or(Error::ContextMissingEvent)?;
737        let state_events = response.state;
738
739        if let Some(decrypted_event) = self.retry_decryption(&room, timeline_event.raw()).await? {
740            timeline_event = decrypted_event;
741        }
742
743        if let Some(actions) = timeline_event.push_actions.as_ref() {
744            if !actions.iter().any(|a| a.should_notify()) {
745                return Ok(None);
746            }
747        }
748
749        let push_actions = timeline_event.push_actions.take();
750        Ok(Some(
751            NotificationItem::new(
752                &room,
753                RawNotificationEvent::Timeline(timeline_event.into_raw()),
754                push_actions.as_deref(),
755                state_events,
756            )
757            .await?,
758        ))
759    }
760}
761
762fn is_event_encrypted(event_type: TimelineEventType) -> bool {
763    let is_still_encrypted = matches!(event_type, TimelineEventType::RoomEncrypted);
764
765    #[cfg(feature = "unstable-msc3956")]
766    let is_still_encrypted =
767        is_still_encrypted || matches!(event_type, ruma::events::TimelineEventType::Encrypted);
768
769    is_still_encrypted
770}
771
772#[derive(Debug)]
773pub enum NotificationStatus {
774    Event(Box<NotificationItem>),
775    EventNotFound,
776    EventFilteredOut,
777}
778
779#[derive(Debug, Clone)]
780pub struct NotificationItemsRequest {
781    pub room_id: OwnedRoomId,
782    pub event_ids: Vec<OwnedEventId>,
783}
784
785#[derive(Default)]
786pub struct BatchNotificationFetchingResult<T> {
787    notifications: BTreeMap<OwnedEventId, Result<T, Error>>,
788}
789
790impl<T> BatchNotificationFetchingResult<T> {
791    pub fn new() -> Self {
792        Self { notifications: BTreeMap::new() }
793    }
794
795    fn add_notification(&mut self, event_id: OwnedEventId, notification: T) {
796        self.notifications.insert(event_id, Ok(notification));
797    }
798
799    fn mark_fetching_notification_failed(&mut self, event_id: OwnedEventId, error: Error) {
800        self.notifications.insert(event_id, Err(error));
801    }
802
803    pub fn remove(&mut self, id: &EventId) -> Option<Result<T, Error>> {
804        self.notifications.remove(id)
805    }
806
807    pub fn iter(&self) -> Iter<'_, OwnedEventId, Result<T, Error>> {
808        self.notifications.iter()
809    }
810}
811
812impl<T> IntoIterator for BatchNotificationFetchingResult<T> {
813    type Item = (OwnedEventId, Result<T, Error>);
814    type IntoIter = IntoIter<OwnedEventId, Result<T, Error>>;
815    fn into_iter(self) -> Self::IntoIter {
816        self.notifications.into_iter()
817    }
818}
819
820/// The Notification event as it was fetched from remote for the
821/// given `event_id`, represented as Raw but decrypted, thus only
822/// whether it is an invite or regular Timeline event has been
823/// determined.
824#[derive(Debug, Clone)]
825pub enum RawNotificationEvent {
826    /// The raw event for a timeline event
827    Timeline(Raw<AnySyncTimelineEvent>),
828    /// The notification contains an invitation with the given
829    /// StrippedRoomMemberEvent (in raw here)
830    Invite(Raw<StrippedRoomMemberEvent>),
831}
832
833/// The deserialized Event as it was fetched from remote for the
834/// given `event_id` and after decryption (if possible).
835#[derive(Debug)]
836pub enum NotificationEvent {
837    /// The Notification was for a TimelineEvent
838    Timeline(Box<AnySyncTimelineEvent>),
839    /// The Notification is an invite with the given stripped room event data
840    Invite(Box<StrippedRoomMemberEvent>),
841}
842
843impl NotificationEvent {
844    pub fn sender(&self) -> &UserId {
845        match self {
846            NotificationEvent::Timeline(ev) => ev.sender(),
847            NotificationEvent::Invite(ev) => &ev.sender,
848        }
849    }
850
851    /// Returns the root event id of the thread the notification event is in, if
852    /// any.
853    fn thread_id(&self) -> Option<OwnedEventId> {
854        let NotificationEvent::Timeline(sync_timeline_event) = &self else {
855            return None;
856        };
857        let AnySyncTimelineEvent::MessageLike(event) = sync_timeline_event.as_ref() else {
858            return None;
859        };
860        let content = event.original_content()?;
861        match content {
862            AnyMessageLikeEventContent::RoomMessage(content) => match content.relates_to? {
863                Relation::Thread(thread) => Some(thread.event_id),
864                _ => None,
865            },
866            _ => None,
867        }
868    }
869}
870
871/// A notification with its full content.
872#[derive(Debug)]
873pub struct NotificationItem {
874    /// Underlying Ruma event.
875    pub event: NotificationEvent,
876
877    /// The raw of the underlying event.
878    pub raw_event: RawNotificationEvent,
879
880    /// Display name of the sender.
881    pub sender_display_name: Option<String>,
882    /// Avatar URL of the sender.
883    pub sender_avatar_url: Option<String>,
884    /// Is the sender's name ambiguous?
885    pub is_sender_name_ambiguous: bool,
886
887    /// Room computed display name.
888    pub room_computed_display_name: String,
889    /// Room avatar URL.
890    pub room_avatar_url: Option<String>,
891    /// Room canonical alias.
892    pub room_canonical_alias: Option<String>,
893    /// Room join rule.
894    pub room_join_rule: JoinRule,
895    /// Is this room encrypted?
896    pub is_room_encrypted: Option<bool>,
897    /// Is this a public room?
898    pub is_room_public: bool,
899    /// Is this room considered a direct message?
900    pub is_direct_message_room: bool,
901    /// Numbers of members who joined the room.
902    pub joined_members_count: u64,
903
904    /// Is it a noisy notification? (i.e. does any push action contain a sound
905    /// action)
906    ///
907    /// It is set if and only if the push actions could be determined.
908    pub is_noisy: Option<bool>,
909    pub has_mention: Option<bool>,
910    pub thread_id: Option<OwnedEventId>,
911}
912
913impl NotificationItem {
914    async fn new(
915        room: &Room,
916        raw_event: RawNotificationEvent,
917        push_actions: Option<&[Action]>,
918        state_events: Vec<Raw<AnyStateEvent>>,
919    ) -> Result<Self, Error> {
920        let event = match &raw_event {
921            RawNotificationEvent::Timeline(raw_event) => {
922                let mut event = raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?;
923                if let AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
924                    SyncRoomMessageEvent::Original(ev),
925                )) = &mut event
926                {
927                    ev.content.sanitize(DEFAULT_SANITIZER_MODE, RemoveReplyFallback::Yes);
928                }
929                NotificationEvent::Timeline(Box::new(event))
930            }
931            RawNotificationEvent::Invite(raw_event) => NotificationEvent::Invite(Box::new(
932                raw_event.deserialize().map_err(|_| Error::InvalidRumaEvent)?,
933            )),
934        };
935
936        let sender = match room.state() {
937            RoomState::Invited => room.invite_details().await?.inviter,
938            _ => room.get_member_no_sync(event.sender()).await?,
939        };
940
941        let (mut sender_display_name, mut sender_avatar_url, is_sender_name_ambiguous) =
942            match &sender {
943                Some(sender) => (
944                    sender.display_name().map(|s| s.to_owned()),
945                    sender.avatar_url().map(|s| s.to_string()),
946                    sender.name_ambiguous(),
947                ),
948                None => (None, None, false),
949            };
950
951        if sender_display_name.is_none() || sender_avatar_url.is_none() {
952            let sender_id = event.sender();
953            for ev in state_events {
954                let Ok(ev) = ev.deserialize() else {
955                    continue;
956                };
957                if ev.sender() != sender_id {
958                    continue;
959                }
960                if let AnyFullStateEventContent::RoomMember(FullStateEventContent::Original {
961                    content,
962                    ..
963                }) = ev.content()
964                {
965                    if sender_display_name.is_none() {
966                        sender_display_name = content.displayname;
967                    }
968                    if sender_avatar_url.is_none() {
969                        sender_avatar_url = content.avatar_url.map(|url| url.to_string());
970                    }
971                }
972            }
973        }
974
975        let is_noisy = push_actions.map(|actions| actions.iter().any(|a| a.sound().is_some()));
976        let has_mention = push_actions.map(|actions| actions.iter().any(|a| a.is_highlight()));
977        let thread_id = event.thread_id().clone();
978
979        let item = NotificationItem {
980            event,
981            raw_event,
982            sender_display_name,
983            sender_avatar_url,
984            is_sender_name_ambiguous,
985            room_computed_display_name: room.display_name().await?.to_string(),
986            room_avatar_url: room.avatar_url().map(|s| s.to_string()),
987            room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
988            room_join_rule: room.join_rule(),
989            is_direct_message_room: room.is_direct().await?,
990            is_room_public: room.is_public(),
991            is_room_encrypted: room
992                .latest_encryption_state()
993                .await
994                .map(|state| state.is_encrypted())
995                .ok(),
996            joined_members_count: room.joined_members_count(),
997            is_noisy,
998            has_mention,
999            thread_id,
1000        };
1001
1002        Ok(item)
1003    }
1004}
1005
1006/// An error for the [`NotificationClient`].
1007#[derive(Debug, Error)]
1008pub enum Error {
1009    #[error(transparent)]
1010    BuildingLocalClient(ClientBuildError),
1011
1012    /// The room associated to this event wasn't found.
1013    #[error("unknown room for a notification")]
1014    UnknownRoom,
1015
1016    /// The Ruma event contained within this notification couldn't be parsed.
1017    #[error("invalid ruma event")]
1018    InvalidRumaEvent,
1019
1020    /// When calling `get_notification_with_sliding_sync`, the room was missing
1021    /// in the response.
1022    #[error("the sliding sync response doesn't include the target room")]
1023    SlidingSyncEmptyRoom,
1024
1025    #[error("the event was missing in the `/context` query")]
1026    ContextMissingEvent,
1027
1028    /// An error forwarded from the client.
1029    #[error(transparent)]
1030    SdkError(#[from] matrix_sdk::Error),
1031
1032    /// An error forwarded from the underlying state store.
1033    #[error(transparent)]
1034    StoreError(#[from] StoreError),
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039    use assert_matches2::assert_let;
1040    use matrix_sdk::test_utils::mocks::MatrixMockServer;
1041    use matrix_sdk_test::{async_test, event_factory::EventFactory};
1042    use ruma::{event_id, room_id, user_id};
1043
1044    use crate::notification_client::{NotificationItem, RawNotificationEvent};
1045
1046    #[async_test]
1047    async fn test_notification_item_returns_thread_id() {
1048        let server = MatrixMockServer::new().await;
1049        let client = server.client_builder().build().await;
1050
1051        let room_id = room_id!("!a:b.c");
1052        let thread_root_event_id = event_id!("$root:b.c");
1053        let message = EventFactory::new()
1054            .room(room_id)
1055            .sender(user_id!("@sender:b.c"))
1056            .text_msg("Threaded")
1057            .in_thread(thread_root_event_id, event_id!("$prev:b.c"))
1058            .into_raw_sync();
1059        let room = server.sync_joined_room(&client, room_id).await;
1060
1061        let raw_notification_event = RawNotificationEvent::Timeline(message);
1062        let notification_item =
1063            NotificationItem::new(&room, raw_notification_event, None, Vec::new())
1064                .await
1065                .expect("Could not create notification item");
1066
1067        assert_let!(Some(thread_id) = notification_item.thread_id);
1068        assert_eq!(thread_id, thread_root_event_id);
1069    }
1070}