Skip to main content

matrix_sdk_ui/
unable_to_decrypt_hook.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a generic interface to subscribe to unable-to-decrypt
16//! events, and notable updates to such events.
17//!
18//! This provides a general trait that a consumer may implement, as well as
19//! utilities to simplify usage of this trait.
20
21use std::{
22    collections::HashMap,
23    sync::{Arc, Mutex},
24};
25
26use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
27use matrix_sdk::{Client, sleep::sleep, task_monitor::BackgroundTaskHandle};
28use matrix_sdk_base::{
29    SendOutsideWasm, StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm,
30    crypto::types::events::UtdCause,
31};
32use ruma::{
33    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedServerName, UserId,
34    time::{Duration, Instant},
35};
36use tokio::sync::{Mutex as AsyncMutex, MutexGuard};
37use tracing::{error, trace};
38
39/// A generic interface which methods get called whenever we observe a
40/// unable-to-decrypt (UTD) event.
41pub trait UnableToDecryptHook: std::fmt::Debug + SendOutsideWasm + SyncOutsideWasm {
42    /// Called every time the hook observes an encrypted event that couldn't be
43    /// decrypted.
44    ///
45    /// If the hook manager was configured with a max delay, this could also
46    /// contain extra information for late-decrypted events. See details in
47    /// [`UnableToDecryptInfo::time_to_decrypt`].
48    fn on_utd(&self, info: UnableToDecryptInfo);
49}
50
51/// Information about an event we were unable to decrypt (UTD).
52#[derive(Clone, Debug, Hash, PartialEq, Eq)]
53pub struct UnableToDecryptInfo {
54    /// The identifier of the event that couldn't get decrypted.
55    pub event_id: OwnedEventId,
56
57    /// If the event could be decrypted late (that is, the event was encrypted
58    /// at first, but could be decrypted later on), then this indicates the
59    /// time it took to decrypt the event. If it is not set, this is
60    /// considered a definite UTD.
61    pub time_to_decrypt: Option<Duration>,
62
63    /// What we know about what caused this UTD. E.g. was this event sent when
64    /// we were not a member of this room?
65    pub cause: UtdCause,
66
67    /// The difference between the event creation time (`origin_server_ts`) and
68    /// the time our device was created. If negative, this event was sent
69    /// *before* our device was created.
70    pub event_local_age_millis: i64,
71
72    /// Whether the user had verified their own identity at the point they
73    /// received the UTD event.
74    pub user_trusts_own_identity: bool,
75
76    /// The homeserver of the user that sent the undecryptable event.
77    pub sender_homeserver: OwnedServerName,
78
79    /// Our local user's own homeserver, or `None` if the client is not logged
80    /// in.
81    pub own_homeserver: Option<OwnedServerName>,
82}
83
84/// Data about a UTD event which we are waiting to report to the parent hook.
85#[derive(Debug)]
86struct PendingUtdReport {
87    /// The time that we received the UTD report from the timeline code.
88    marked_utd_at: Instant,
89
90    /// The task that will report this UTD to the parent hook.
91    report_task: BackgroundTaskHandle,
92
93    /// The UnableToDecryptInfo structure for this UTD event.
94    utd_info: UnableToDecryptInfo,
95}
96
97/// A manager over an existing [`UnableToDecryptHook`] that deduplicates UTDs
98/// on similar events, and adds basic consistency checks.
99///
100/// It can also implement a grace period before reporting an event as a UTD, if
101/// configured with [`Self::with_max_delay`]. Instead of immediately reporting
102/// the UTD, the reporting will be delayed by the max delay at most; if the
103/// event could eventually get decrypted, it may be reported before the end of
104/// that delay.
105#[derive(Debug)]
106pub struct UtdHookManager {
107    /// A Client associated with the UTD hook. This is used to access the store
108    /// which we persist our data to.
109    client: Client,
110
111    /// The parent hook we'll call, when we have found a unique UTD.
112    parent: Arc<dyn UnableToDecryptHook>,
113
114    /// An optional delay before marking the event as UTD ("grace period").
115    max_delay: Option<Duration>,
116
117    /// A mapping of events we're going to report as UTDs, to the tasks to do
118    /// so.
119    ///
120    /// Note: this is empty if no [`Self::max_delay`] is set.
121    ///
122    /// Note: this is theoretically unbounded in size, although this set of
123    /// tasks will degrow over time, as tasks expire after the max delay.
124    pending_delayed: Arc<Mutex<HashMap<OwnedEventId, PendingUtdReport>>>,
125
126    /// Bloom filter containing the event IDs of events which have been reported
127    /// as UTDs
128    reported_utds: Arc<AsyncMutex<GrowableBloom>>,
129}
130
131impl UtdHookManager {
132    /// Create a new [`UtdHookManager`] for the given hook.
133    ///
134    /// A [`Client`] must also be provided; this provides a link to the
135    /// [`matrix_sdk_base::StateStore`] which is used to load and store the
136    /// persistent data.
137    pub fn new(parent: Arc<dyn UnableToDecryptHook>, client: Client) -> Self {
138        let bloom_filter =
139            // Some slightly arbitrarily-chosen parameters here. We specify that, after 1000
140            // UTDs, we want to have a false-positive rate of 1%.
141            //
142            // The GrowableBloomFilter is based on a series of (partitioned) Bloom filters;
143            // once the first starts getting full (the expected false-positive
144            // rate gets too high), it adds another Bloom filter. Each new entry
145            // is recorded in the most recent Bloom filter; when querying, if
146            // *any* of the component filters show a match, that shows
147            // an overall match.
148            //
149            // The first component filter is created based on the parameters we give. For
150            // reasons derived in the paper [1], a partitioned Bloom filter with
151            // target false-positive rate `P` after `n` insertions requires a
152            // number of slices `k` given by:
153            //
154            // k = log2(1/P) = -ln(P) / ln(2)
155            //
156            // ... where each slice has a number of bits `m` given by
157            //
158            // m = n / ln(2)
159            //
160            // We have to have a whole number of slices and bits, so the total number of
161            // bits M is:
162            //
163            // M = ceil(k) * ceil(m)
164            //   = ceil(-ln(P) / ln(2)) * ceil(n / ln(2))
165            //
166            // In other words, our FP rate of 1% after 1000 insertions requires:
167            //
168            // M = ceil(-ln(0.01) / ln(2)) * ceil(1000 / ln(2))
169            //   = 7 * 1443 = 10101 bits
170            //
171            // So our filter starts off with 1263 bytes of data (plus a little overhead).
172            // Once we hit 1000 UTDs, we add a second component filter with a capacity
173            // double that of the original and target error rate 85% of the
174            // original (another 2526 bytes), which then lasts us until a total
175            // of 3000 UTDs.
176            //
177            // [1]: https://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf
178            GrowableBloomBuilder::new().estimated_insertions(1000).desired_error_ratio(0.01).build();
179
180        Self {
181            client,
182            parent,
183            max_delay: None,
184            pending_delayed: Default::default(),
185            reported_utds: Arc::new(AsyncMutex::new(bloom_filter)),
186        }
187    }
188
189    /// Reports UTDs with the given max delay.
190    ///
191    /// Note: late decryptions are always reported, even if there was a grace
192    /// period set for the reporting of the UTD.
193    pub fn with_max_delay(mut self, delay: Duration) -> Self {
194        self.max_delay = Some(delay);
195        self
196    }
197
198    /// Load the persistent data for the UTD hook from the store.
199    ///
200    /// If the client previously used a UtdHookManager, and UTDs were
201    /// encountered, the data on the reported UTDs is loaded from the store.
202    /// Otherwise, there is no effect.
203    pub async fn reload_from_store(&mut self) -> Result<(), StoreError> {
204        let existing_data =
205            self.client.state_store().get_kv_data(StateStoreDataKey::UtdHookManagerData).await?;
206
207        if let Some(existing_data) = existing_data {
208            let bloom_filter = existing_data
209                .into_utd_hook_manager_data()
210                .expect("StateStore::get_kv_data should return data of the right type");
211            self.reported_utds = Arc::new(AsyncMutex::new(bloom_filter));
212        }
213        Ok(())
214    }
215
216    /// The function to call whenever a UTD is seen for the first time.
217    ///
218    /// Pipe in any information that needs to be included in the final report.
219    ///
220    /// # Arguments
221    ///  * `event_id` - The ID of the event that could not be decrypted.
222    ///  * `cause` - Our best guess at the reason why the event can't be
223    ///    decrypted.
224    ///  * `event_timestamp` - The event's `origin_server_ts` field (or creation
225    ///    time for local echo).
226    ///  * `sender_user_id` - The Matrix user ID of the user that sent the
227    ///    undecryptable message.
228    pub(crate) async fn on_utd(
229        &self,
230        event_id: &EventId,
231        cause: UtdCause,
232        event_timestamp: MilliSecondsSinceUnixEpoch,
233        sender_user_id: &UserId,
234    ) {
235        trace!(%event_id, "UtdHookManager: Observed UTD");
236        // Hold the lock on `reported_utds` throughout, to avoid races with other
237        // threads.
238        let mut reported_utds_lock = self.reported_utds.lock().await;
239
240        // Check if this, or a previous instance of UtdHookManager, has already reported
241        // this UTD, and bail out if not.
242        if reported_utds_lock.contains(event_id) {
243            return;
244        }
245
246        // Otherwise, check if we already have a task to handle this UTD.
247        if self.pending_delayed.lock().unwrap().contains_key(event_id) {
248            return;
249        }
250
251        let event_local_age_millis = i64::from(event_timestamp.get()).saturating_sub_unsigned(
252            self.client.encryption().device_creation_timestamp().await.get().into(),
253        );
254
255        let own_user_id = self.client.user_id();
256        let user_trusts_own_identity = if let Some(own_user_id) = own_user_id {
257            if let Ok(Some(own_id)) = self.client.encryption().get_user_identity(own_user_id).await
258            {
259                own_id.is_verified()
260            } else {
261                false
262            }
263        } else {
264            false
265        };
266
267        let own_homeserver = own_user_id.map(|id| id.server_name().to_owned());
268        let sender_homeserver = sender_user_id.server_name().to_owned();
269
270        let info = UnableToDecryptInfo {
271            event_id: event_id.to_owned(),
272            time_to_decrypt: None,
273            cause,
274            event_local_age_millis,
275            user_trusts_own_identity,
276            own_homeserver,
277            sender_homeserver,
278        };
279
280        let Some(max_delay) = self.max_delay else {
281            // No delay: immediately report the event to the parent hook.
282            Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
283            return;
284        };
285
286        // Clone data shared with the task below.
287        let pending_delayed = self.pending_delayed.clone();
288        let reported_utds = self.reported_utds.clone();
289        let parent = self.parent.clone();
290        let client = self.client.clone();
291        let owned_event_id = event_id.to_owned();
292
293        // Spawn a task that will wait for the given delay, and maybe call the parent
294        // hook then.
295        let handle = self.client.task_monitor().spawn_background_task("utd_hook", async move {
296            // Wait for the given delay.
297            sleep(max_delay).await;
298
299            // Make sure we take out the lock on `reported_utds` before removing the entry
300            // from `pending_delayed`, to ensure we don't race against another call to
301            // `on_utd` (which could otherwise see that the entry has been
302            // removed from `pending_delayed` but not yet added to
303            // `reported_utds`).
304            let mut reported_utds_lock = reported_utds.lock().await;
305
306            // Remove the task from the outstanding set. But if it's already been removed,
307            // it's been decrypted since the task was added!
308            let pending_report = pending_delayed.lock().unwrap().remove(&owned_event_id);
309            if let Some(pending_report) = pending_report {
310                Self::report_utd(
311                    pending_report.utd_info,
312                    &parent,
313                    &client,
314                    &mut reported_utds_lock,
315                )
316                .await;
317            }
318        });
319
320        // Add the task to the set of pending tasks.
321        self.pending_delayed.lock().unwrap().insert(
322            event_id.to_owned(),
323            PendingUtdReport { marked_utd_at: Instant::now(), report_task: handle, utd_info: info },
324        );
325    }
326
327    /// The function to call whenever an event that was marked as a UTD has
328    /// eventually been decrypted.
329    ///
330    /// Note: if this is called for an event that was never marked as a UTD
331    /// before, it has no effect.
332    pub(crate) async fn on_late_decrypt(&self, event_id: &EventId) {
333        trace!(%event_id, "UtdHookManager: On late decrypt");
334        // Hold the lock on `reported_utds` throughout, to avoid races with other
335        // threads.
336        let mut reported_utds_lock = self.reported_utds.lock().await;
337
338        // Only let the parent hook know about the late decryption if the event is
339        // a pending UTD. If so, remove the event from the pending list —
340        // doing so will cause the reporting task to no-op if it runs.
341        let Some(pending_utd_report) = self.pending_delayed.lock().unwrap().remove(event_id) else {
342            trace!(%event_id, "UtdHookManager: received a late decrypt report for an unknown utd");
343            return;
344        };
345
346        // We can also cancel the reporting task.
347        pending_utd_report.report_task.abort();
348
349        // Update the UTD Info struct with new data, then report it
350        let mut info = pending_utd_report.utd_info;
351        info.time_to_decrypt = Some(pending_utd_report.marked_utd_at.elapsed());
352        Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
353    }
354
355    /// Helper for [`UtdHookManager::on_utd`] and
356    /// [`UtdHookManager.on_late_decrypt`]: reports the UTD to the parent,
357    /// and records the event as reported.
358    ///
359    /// Must be called with the lock held on [`UtdHookManager::reported_utds`],
360    /// and takes a `MutexGuard` to enforce that.
361    async fn report_utd(
362        info: UnableToDecryptInfo,
363        parent_hook: &Arc<dyn UnableToDecryptHook>,
364        client: &Client,
365        reported_utds_lock: &mut MutexGuard<'_, GrowableBloom>,
366    ) {
367        let event_id = info.event_id.clone();
368        parent_hook.on_utd(info);
369        reported_utds_lock.insert(event_id);
370        if let Err(e) = client
371            .state_store()
372            .set_kv_data(
373                StateStoreDataKey::UtdHookManagerData,
374                StateStoreDataValue::UtdHookManagerData(reported_utds_lock.clone()),
375            )
376            .await
377        {
378            error!("Unable to persist UTD report data: {}", e);
379        }
380    }
381}
382
383impl Drop for UtdHookManager {
384    fn drop(&mut self) {
385        // Cancel all the outstanding delayed tasks to report UTDs.
386        //
387        // Here, we don't take the lock on `reported_utd`s (indeed, we can't, since
388        // `reported_utds` has an async mutex, and `drop` has to be sync), but
389        // that's ok. We can't race against `on_utd` or `on_late_decrypt`, since
390        // they both have `&self` references which mean `drop` can't be called.
391        // We *could* race against one of the actual tasks to report
392        // UTDs, but that's ok too: either the report task will bail out when it sees
393        // the entry has been removed from `pending_delayed` (which is fine), or the
394        // report task will successfully report the UTD (which is fine).
395        let mut pending_delayed = self.pending_delayed.lock().unwrap();
396        for (_, pending_utd_report) in pending_delayed.drain() {
397            pending_utd_report.report_task.abort();
398        }
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use matrix_sdk::test_utils::{logged_in_client, no_retry_test_client};
405    use matrix_sdk_test::async_test;
406    use ruma::{event_id, owned_server_name, server_name, user_id};
407
408    use super::*;
409
410    #[derive(Debug, Default)]
411    struct Dummy {
412        utds: Mutex<Vec<UnableToDecryptInfo>>,
413    }
414
415    impl UnableToDecryptHook for Dummy {
416        fn on_utd(&self, info: UnableToDecryptInfo) {
417            self.utds.lock().unwrap().push(info);
418        }
419    }
420
421    #[async_test]
422    async fn test_deduplicates_utds() {
423        // If I create a dummy hook,
424        let hook = Arc::new(Dummy::default());
425
426        // And I wrap with the UtdHookManager,
427        let wrapper = UtdHookManager::new(hook.clone(), logged_in_client(None).await);
428
429        // And I call the `on_utd` method multiple times, sometimes on the same event,
430        let event_timestamp = MilliSecondsSinceUnixEpoch::now();
431        let sender_user = user_id!("@example2:localhost");
432        let federated_user = user_id!("@example2:example.com");
433        wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
434        wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
435        wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
436        wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
437        wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
438        wrapper.on_utd(event_id!("$3"), UtdCause::Unknown, event_timestamp, sender_user).await;
439
440        // Then the event ids have been deduplicated,
441        {
442            let utds = hook.utds.lock().unwrap();
443            assert_eq!(utds.len(), 3);
444            assert_eq!(utds[0].event_id, event_id!("$1"));
445            assert_eq!(utds[1].event_id, event_id!("$2"));
446            assert_eq!(utds[2].event_id, event_id!("$3"));
447
448            // No event is a late-decryption event.
449            assert!(utds[0].time_to_decrypt.is_none());
450            assert!(utds[1].time_to_decrypt.is_none());
451            assert!(utds[2].time_to_decrypt.is_none());
452
453            // event_local_age_millis should be a small positive number, because the
454            // timestamp we used was after we created the device
455            let utd_local_age = utds[0].event_local_age_millis;
456            assert!(utd_local_age >= 0);
457            assert!(utd_local_age <= 1000);
458
459            assert_eq!(utds[0].sender_homeserver, server_name!("localhost"));
460            assert_eq!(utds[0].own_homeserver, Some(owned_server_name!("localhost")));
461
462            assert_eq!(utds[1].sender_homeserver, server_name!("example.com"));
463            assert_eq!(utds[1].own_homeserver, Some(owned_server_name!("localhost")));
464        }
465    }
466
467    #[async_test]
468    async fn test_deduplicates_utds_from_previous_session() {
469        // Use a single client for both hooks, so that both hooks are backed by the same
470        // memorystore.
471        let client = no_retry_test_client(None).await;
472
473        // Dummy hook 1, with the first UtdHookManager
474        {
475            let hook = Arc::new(Dummy::default());
476            let wrapper = UtdHookManager::new(hook.clone(), client.clone());
477
478            // I call it a couple of times with different events
479            wrapper
480                .on_utd(
481                    event_id!("$1"),
482                    UtdCause::Unknown,
483                    MilliSecondsSinceUnixEpoch::now(),
484                    user_id!("@a:b"),
485                )
486                .await;
487            wrapper
488                .on_utd(
489                    event_id!("$2"),
490                    UtdCause::Unknown,
491                    MilliSecondsSinceUnixEpoch::now(),
492                    user_id!("@a:b"),
493                )
494                .await;
495
496            // Sanity-check the reported event IDs
497            {
498                let utds = hook.utds.lock().unwrap();
499                assert_eq!(utds.len(), 2);
500                assert_eq!(utds[0].event_id, event_id!("$1"));
501                assert!(utds[0].time_to_decrypt.is_none());
502                assert_eq!(utds[1].event_id, event_id!("$2"));
503                assert!(utds[1].time_to_decrypt.is_none());
504            }
505        }
506
507        // Now, create a *new* hook, with a *new* UtdHookManager
508        {
509            let hook = Arc::new(Dummy::default());
510            let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
511            wrapper.reload_from_store().await.unwrap();
512
513            // Call it with more events, some of which match the previous instance
514            wrapper
515                .on_utd(
516                    event_id!("$1"),
517                    UtdCause::Unknown,
518                    MilliSecondsSinceUnixEpoch::now(),
519                    user_id!("@a:b"),
520                )
521                .await;
522            wrapper
523                .on_utd(
524                    event_id!("$3"),
525                    UtdCause::Unknown,
526                    MilliSecondsSinceUnixEpoch::now(),
527                    user_id!("@a:b"),
528                )
529                .await;
530
531            // Only the *new* ones should be reported
532            let utds = hook.utds.lock().unwrap();
533            assert_eq!(utds.len(), 1);
534            assert_eq!(utds[0].event_id, event_id!("$3"));
535        }
536    }
537
538    /// Test that UTD events which had not yet been reported in a previous
539    /// session, are reported in the next session.
540    #[async_test]
541    async fn test_does_not_deduplicate_late_utds_from_previous_session() {
542        // Use a single client for both hooks, so that both hooks are backed by the same
543        // memorystore.
544        let client = no_retry_test_client(None).await;
545
546        // Dummy hook 1, with the first UtdHookManager
547        {
548            let hook = Arc::new(Dummy::default());
549            let wrapper = UtdHookManager::new(hook.clone(), client.clone())
550                .with_max_delay(Duration::from_secs(2));
551
552            // a UTD event
553            wrapper
554                .on_utd(
555                    event_id!("$1"),
556                    UtdCause::Unknown,
557                    MilliSecondsSinceUnixEpoch::now(),
558                    user_id!("@a:b"),
559                )
560                .await;
561
562            // The event ID should not yet have been reported.
563            {
564                let utds = hook.utds.lock().unwrap();
565                assert_eq!(utds.len(), 0);
566            }
567        }
568
569        // Now, create a *new* hook, with a *new* UtdHookManager
570        {
571            let hook = Arc::new(Dummy::default());
572            let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
573            wrapper.reload_from_store().await.unwrap();
574
575            // Call the new hook with the same event
576            wrapper
577                .on_utd(
578                    event_id!("$1"),
579                    UtdCause::Unknown,
580                    MilliSecondsSinceUnixEpoch::now(),
581                    user_id!("@a:b"),
582                )
583                .await;
584
585            // And it should be reported.
586            sleep(Duration::from_millis(2500)).await;
587
588            let utds = hook.utds.lock().unwrap();
589            assert_eq!(utds.len(), 1);
590            assert_eq!(utds[0].event_id, event_id!("$1"));
591        }
592    }
593
594    #[async_test]
595    async fn test_on_late_decrypted_no_effect() {
596        // If I create a dummy hook,
597        let hook = Arc::new(Dummy::default());
598
599        // And I wrap with the UtdHookManager,
600        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
601
602        // And I call the `on_late_decrypt` method before the event had been marked as
603        // utd,
604        wrapper.on_late_decrypt(event_id!("$1")).await;
605
606        // Then nothing is registered in the parent hook.
607        assert!(hook.utds.lock().unwrap().is_empty());
608    }
609
610    #[async_test]
611    async fn test_on_late_decrypted_after_utd_no_grace_period() {
612        // If I create a dummy hook,
613        let hook = Arc::new(Dummy::default());
614
615        // And I wrap with the UtdHookManager,
616        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
617
618        // And I call the `on_utd` method for an event,
619        wrapper
620            .on_utd(
621                event_id!("$1"),
622                UtdCause::Unknown,
623                MilliSecondsSinceUnixEpoch::now(),
624                user_id!("@a:b"),
625            )
626            .await;
627
628        // Then the UTD has been notified, but not as late-decrypted event.
629        {
630            let utds = hook.utds.lock().unwrap();
631            assert_eq!(utds.len(), 1);
632            assert_eq!(utds[0].event_id, event_id!("$1"));
633            assert!(utds[0].time_to_decrypt.is_none());
634        }
635
636        // And when I call the `on_late_decrypt` method,
637        wrapper.on_late_decrypt(event_id!("$1")).await;
638
639        // Then the event is not reported again as a late-decryption.
640        {
641            let utds = hook.utds.lock().unwrap();
642            assert_eq!(utds.len(), 1);
643
644            // The previous report is still there. (There was no grace period.)
645            assert_eq!(utds[0].event_id, event_id!("$1"));
646            assert!(utds[0].time_to_decrypt.is_none());
647        }
648    }
649
650    #[cfg(not(target_family = "wasm"))] // wasm32 has no time for that
651    #[async_test]
652    async fn test_delayed_utd() {
653        // If I create a dummy hook,
654        let hook = Arc::new(Dummy::default());
655
656        // And I wrap with the UtdHookManager, configured to delay reporting after 2
657        // seconds.
658        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
659            .with_max_delay(Duration::from_secs(2));
660
661        // And I call the `on_utd` method for an event,
662        wrapper
663            .on_utd(
664                event_id!("$1"),
665                UtdCause::Unknown,
666                MilliSecondsSinceUnixEpoch::now(),
667                user_id!("@a:b"),
668            )
669            .await;
670
671        // Then the UTD is not being reported immediately.
672        assert!(hook.utds.lock().unwrap().is_empty());
673        assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
674
675        // If I wait for 1 second, then it's still not been notified yet.
676        sleep(Duration::from_secs(1)).await;
677
678        assert!(hook.utds.lock().unwrap().is_empty());
679        assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
680
681        // But if I wait just a bit more, then it's getting notified as a definite UTD.
682        sleep(Duration::from_millis(1500)).await;
683
684        {
685            let utds = hook.utds.lock().unwrap();
686            assert_eq!(utds.len(), 1);
687            assert_eq!(utds[0].event_id, event_id!("$1"));
688            assert!(utds[0].time_to_decrypt.is_none());
689        }
690
691        assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
692    }
693
694    #[cfg(not(target_family = "wasm"))] // wasm32 has no time for that
695    #[async_test]
696    async fn test_delayed_late_decryption() {
697        // If I create a dummy hook,
698        let hook = Arc::new(Dummy::default());
699
700        // And I wrap with the UtdHookManager, configured to delay reporting after 2
701        // seconds.
702        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
703            .with_max_delay(Duration::from_secs(2));
704
705        // And I call the `on_utd` method for an event,
706        wrapper
707            .on_utd(
708                event_id!("$1"),
709                UtdCause::Unknown,
710                MilliSecondsSinceUnixEpoch::now(),
711                user_id!("@a:b"),
712            )
713            .await;
714
715        // Then the UTD has not been notified quite yet.
716        assert!(hook.utds.lock().unwrap().is_empty());
717        assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
718
719        // If I wait for 1 second, and mark the event as late-decrypted,
720        sleep(Duration::from_secs(1)).await;
721
722        wrapper.on_late_decrypt(event_id!("$1")).await;
723
724        // Then it's being immediately reported as a late-decryption UTD.
725        {
726            let utds = hook.utds.lock().unwrap();
727            assert_eq!(utds.len(), 1);
728            assert_eq!(utds[0].event_id, event_id!("$1"));
729            assert!(utds[0].time_to_decrypt.is_some());
730        }
731
732        // And there aren't any pending delayed reports anymore.
733        assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
734    }
735}