matrix_sdk_ui/
unable_to_decrypt_hook.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a generic interface to subscribe to unable-to-decrypt
16//! events, and notable updates to such events.
17//!
18//! This provides a general trait that a consumer may implement, as well as
19//! utilities to simplify usage of this trait.
20
21use std::{
22    collections::HashMap,
23    sync::{Arc, Mutex},
24};
25
26use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
27use matrix_sdk::{
28    crypto::types::events::UtdCause,
29    executor::{spawn, JoinHandle},
30    sleep::sleep,
31    Client,
32};
33use matrix_sdk_base::{StateStoreDataKey, StateStoreDataValue, StoreError};
34use ruma::{
35    time::{Duration, Instant},
36    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedServerName, UserId,
37};
38use tokio::sync::{Mutex as AsyncMutex, MutexGuard};
39use tracing::error;
40
41/// A generic interface which methods get called whenever we observe a
42/// unable-to-decrypt (UTD) event.
43pub trait UnableToDecryptHook: std::fmt::Debug + Send + Sync {
44    /// Called every time the hook observes an encrypted event that couldn't be
45    /// decrypted.
46    ///
47    /// If the hook manager was configured with a max delay, this could also
48    /// contain extra information for late-decrypted events. See details in
49    /// [`UnableToDecryptInfo::time_to_decrypt`].
50    fn on_utd(&self, info: UnableToDecryptInfo);
51}
52
53/// Information about an event we were unable to decrypt (UTD).
54#[derive(Clone, Debug, Hash, PartialEq, Eq)]
55pub struct UnableToDecryptInfo {
56    /// The identifier of the event that couldn't get decrypted.
57    pub event_id: OwnedEventId,
58
59    /// If the event could be decrypted late (that is, the event was encrypted
60    /// at first, but could be decrypted later on), then this indicates the
61    /// time it took to decrypt the event. If it is not set, this is
62    /// considered a definite UTD.
63    pub time_to_decrypt: Option<Duration>,
64
65    /// What we know about what caused this UTD. E.g. was this event sent when
66    /// we were not a member of this room?
67    pub cause: UtdCause,
68
69    /// The difference between the event creation time (`origin_server_ts`) and
70    /// the time our device was created. If negative, this event was sent
71    /// *before* our device was created.
72    pub event_local_age_millis: i64,
73
74    /// Whether the user had verified their own identity at the point they
75    /// received the UTD event.
76    pub user_trusts_own_identity: bool,
77
78    /// The homeserver of the user that sent the undecryptable event.
79    pub sender_homeserver: OwnedServerName,
80
81    /// Our local user's own homeserver, or `None` if the client is not logged
82    /// in.
83    pub own_homeserver: Option<OwnedServerName>,
84}
85
86/// Data about a UTD event which we are waiting to report to the parent hook.
87#[derive(Debug)]
88struct PendingUtdReport {
89    /// The time that we received the UTD report from the timeline code.
90    marked_utd_at: Instant,
91
92    /// The task that will report this UTD to the parent hook.
93    report_task: JoinHandle<()>,
94
95    /// The UnableToDecryptInfo structure for this UTD event.
96    utd_info: UnableToDecryptInfo,
97}
98
99/// A manager over an existing [`UnableToDecryptHook`] that deduplicates UTDs
100/// on similar events, and adds basic consistency checks.
101///
102/// It can also implement a grace period before reporting an event as a UTD, if
103/// configured with [`Self::with_max_delay`]. Instead of immediately reporting
104/// the UTD, the reporting will be delayed by the max delay at most; if the
105/// event could eventually get decrypted, it may be reported before the end of
106/// that delay.
107#[derive(Debug)]
108pub struct UtdHookManager {
109    /// A Client associated with the UTD hook. This is used to access the store
110    /// which we persist our data to.
111    client: Client,
112
113    /// The parent hook we'll call, when we have found a unique UTD.
114    parent: Arc<dyn UnableToDecryptHook>,
115
116    /// An optional delay before marking the event as UTD ("grace period").
117    max_delay: Option<Duration>,
118
119    /// A mapping of events we're going to report as UTDs, to the tasks to do
120    /// so.
121    ///
122    /// Note: this is empty if no [`Self::max_delay`] is set.
123    ///
124    /// Note: this is theoretically unbounded in size, although this set of
125    /// tasks will degrow over time, as tasks expire after the max delay.
126    pending_delayed: Arc<Mutex<HashMap<OwnedEventId, PendingUtdReport>>>,
127
128    /// Bloom filter containing the event IDs of events which have been reported
129    /// as UTDs
130    reported_utds: Arc<AsyncMutex<GrowableBloom>>,
131}
132
133impl UtdHookManager {
134    /// Create a new [`UtdHookManager`] for the given hook.
135    ///
136    /// A [`Client`] must also be provided; this provides a link to the
137    /// [`matrix_sdk_base::StateStore`] which is used to load and store the
138    /// persistent data.
139    pub fn new(parent: Arc<dyn UnableToDecryptHook>, client: Client) -> Self {
140        let bloom_filter =
141            // Some slightly arbitrarily-chosen parameters here. We specify that, after 1000
142            // UTDs, we want to have a false-positive rate of 1%.
143            //
144            // The GrowableBloomFilter is based on a series of (partitioned) Bloom filters;
145            // once the first starts getting full (the expected false-positive
146            // rate gets too high), it adds another Bloom filter. Each new entry
147            // is recorded in the most recent Bloom filter; when querying, if
148            // *any* of the component filters show a match, that shows
149            // an overall match.
150            //
151            // The first component filter is created based on the parameters we give. For
152            // reasons derived in the paper [1], a partitioned Bloom filter with
153            // target false-positive rate `P` after `n` insertions requires a
154            // number of slices `k` given by:
155            //
156            // k = log2(1/P) = -ln(P) / ln(2)
157            //
158            // ... where each slice has a number of bits `m` given by
159            //
160            // m = n / ln(2)
161            //
162            // We have to have a whole number of slices and bits, so the total number of
163            // bits M is:
164            //
165            // M = ceil(k) * ceil(m)
166            //   = ceil(-ln(P) / ln(2)) * ceil(n / ln(2))
167            //
168            // In other words, our FP rate of 1% after 1000 insertions requires:
169            //
170            // M = ceil(-ln(0.01) / ln(2)) * ceil(1000 / ln(2))
171            //   = 7 * 1443 = 10101 bits
172            //
173            // So our filter starts off with 1263 bytes of data (plus a little overhead).
174            // Once we hit 1000 UTDs, we add a second component filter with a capacity
175            // double that of the original and target error rate 85% of the
176            // original (another 2526 bytes), which then lasts us until a total
177            // of 3000 UTDs.
178            //
179            // [1]: https://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf
180            GrowableBloomBuilder::new().estimated_insertions(1000).desired_error_ratio(0.01).build();
181
182        Self {
183            client,
184            parent,
185            max_delay: None,
186            pending_delayed: Default::default(),
187            reported_utds: Arc::new(AsyncMutex::new(bloom_filter)),
188        }
189    }
190
191    /// Reports UTDs with the given max delay.
192    ///
193    /// Note: late decryptions are always reported, even if there was a grace
194    /// period set for the reporting of the UTD.
195    pub fn with_max_delay(mut self, delay: Duration) -> Self {
196        self.max_delay = Some(delay);
197        self
198    }
199
200    /// Load the persistent data for the UTD hook from the store.
201    ///
202    /// If the client previously used a UtdHookManager, and UTDs were
203    /// encountered, the data on the reported UTDs is loaded from the store.
204    /// Otherwise, there is no effect.
205    pub async fn reload_from_store(&mut self) -> Result<(), StoreError> {
206        let existing_data =
207            self.client.store().get_kv_data(StateStoreDataKey::UtdHookManagerData).await?;
208
209        if let Some(existing_data) = existing_data {
210            let bloom_filter = existing_data
211                .into_utd_hook_manager_data()
212                .expect("StateStore::get_kv_data should return data of the right type");
213            self.reported_utds = Arc::new(AsyncMutex::new(bloom_filter));
214        }
215        Ok(())
216    }
217
218    /// The function to call whenever a UTD is seen for the first time.
219    ///
220    /// Pipe in any information that needs to be included in the final report.
221    ///
222    /// # Arguments
223    ///  * `event_id` - The ID of the event that could not be decrypted.
224    ///  * `cause` - Our best guess at the reason why the event can't be
225    ///    decrypted.
226    ///  * `event_timestamp` - The event's `origin_server_ts` field (or creation
227    ///    time for local echo).
228    ///  * `sender_user_id` - The Matrix user ID of the user that sent the
229    ///    undecryptable message.
230    pub(crate) async fn on_utd(
231        &self,
232        event_id: &EventId,
233        cause: UtdCause,
234        event_timestamp: MilliSecondsSinceUnixEpoch,
235        sender_user_id: &UserId,
236    ) {
237        // Hold the lock on `reported_utds` throughout, to avoid races with other
238        // threads.
239        let mut reported_utds_lock = self.reported_utds.lock().await;
240
241        // Check if this, or a previous instance of UtdHookManager, has already reported
242        // this UTD, and bail out if not.
243        if reported_utds_lock.contains(event_id) {
244            return;
245        }
246
247        // Otherwise, check if we already have a task to handle this UTD.
248        if self.pending_delayed.lock().unwrap().contains_key(event_id) {
249            return;
250        }
251
252        let event_local_age_millis = i64::from(event_timestamp.get()).saturating_sub_unsigned(
253            self.client.encryption().device_creation_timestamp().await.get().into(),
254        );
255
256        let own_user_id = self.client.user_id();
257        let user_trusts_own_identity = if let Some(own_user_id) = own_user_id {
258            if let Ok(Some(own_id)) = self.client.encryption().get_user_identity(own_user_id).await
259            {
260                own_id.is_verified()
261            } else {
262                false
263            }
264        } else {
265            false
266        };
267
268        let own_homeserver = own_user_id.map(|id| id.server_name().to_owned());
269        let sender_homeserver = sender_user_id.server_name().to_owned();
270
271        let info = UnableToDecryptInfo {
272            event_id: event_id.to_owned(),
273            time_to_decrypt: None,
274            cause,
275            event_local_age_millis,
276            user_trusts_own_identity,
277            own_homeserver,
278            sender_homeserver,
279        };
280
281        let Some(max_delay) = self.max_delay else {
282            // No delay: immediately report the event to the parent hook.
283            Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
284            return;
285        };
286
287        // Clone data shared with the task below.
288        let pending_delayed = self.pending_delayed.clone();
289        let reported_utds = self.reported_utds.clone();
290        let parent = self.parent.clone();
291        let client = self.client.clone();
292        let owned_event_id = event_id.to_owned();
293
294        // Spawn a task that will wait for the given delay, and maybe call the parent
295        // hook then.
296        let handle = spawn(async move {
297            // Wait for the given delay.
298            sleep(max_delay).await;
299
300            // Make sure we take out the lock on `reported_utds` before removing the entry
301            // from `pending_delayed`, to ensure we don't race against another call to
302            // `on_utd` (which could otherwise see that the entry has been
303            // removed from `pending_delayed` but not yet added to
304            // `reported_utds`).
305            let mut reported_utds_lock = reported_utds.lock().await;
306
307            // Remove the task from the outstanding set. But if it's already been removed,
308            // it's been decrypted since the task was added!
309            let pending_report = pending_delayed.lock().unwrap().remove(&owned_event_id);
310            if let Some(pending_report) = pending_report {
311                Self::report_utd(
312                    pending_report.utd_info,
313                    &parent,
314                    &client,
315                    &mut reported_utds_lock,
316                )
317                .await;
318            }
319        });
320
321        // Add the task to the set of pending tasks.
322        self.pending_delayed.lock().unwrap().insert(
323            event_id.to_owned(),
324            PendingUtdReport { marked_utd_at: Instant::now(), report_task: handle, utd_info: info },
325        );
326    }
327
328    /// The function to call whenever an event that was marked as a UTD has
329    /// eventually been decrypted.
330    ///
331    /// Note: if this is called for an event that was never marked as a UTD
332    /// before, it has no effect.
333    pub(crate) async fn on_late_decrypt(&self, event_id: &EventId) {
334        // Hold the lock on `reported_utds` throughout, to avoid races with other
335        // threads.
336        let mut reported_utds_lock = self.reported_utds.lock().await;
337
338        // Only let the parent hook know about the late decryption if the event is
339        // a pending UTD. If so, remove the event from the pending list —
340        // doing so will cause the reporting task to no-op if it runs.
341        let Some(pending_utd_report) = self.pending_delayed.lock().unwrap().remove(event_id) else {
342            return;
343        };
344
345        // We can also cancel the reporting task.
346        pending_utd_report.report_task.abort();
347
348        // Update the UTD Info struct with new data, then report it
349        let mut info = pending_utd_report.utd_info;
350        info.time_to_decrypt = Some(pending_utd_report.marked_utd_at.elapsed());
351        Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
352    }
353
354    /// Helper for [`UtdHookManager::on_utd`] and
355    /// [`UtdHookManager.on_late_decrypt`]: reports the UTD to the parent,
356    /// and records the event as reported.
357    ///
358    /// Must be called with the lock held on [`UtdHookManager::reported_utds`],
359    /// and takes a `MutexGuard` to enforce that.
360    async fn report_utd(
361        info: UnableToDecryptInfo,
362        parent_hook: &Arc<dyn UnableToDecryptHook>,
363        client: &Client,
364        reported_utds_lock: &mut MutexGuard<'_, GrowableBloom>,
365    ) {
366        let event_id = info.event_id.clone();
367        parent_hook.on_utd(info);
368        reported_utds_lock.insert(event_id);
369        if let Err(e) = client
370            .store()
371            .set_kv_data(
372                StateStoreDataKey::UtdHookManagerData,
373                StateStoreDataValue::UtdHookManagerData(reported_utds_lock.clone()),
374            )
375            .await
376        {
377            error!("Unable to persist UTD report data: {}", e);
378        }
379    }
380}
381
382impl Drop for UtdHookManager {
383    fn drop(&mut self) {
384        // Cancel all the outstanding delayed tasks to report UTDs.
385        //
386        // Here, we don't take the lock on `reported_utd`s (indeed, we can't, since
387        // `reported_utds` has an async mutex, and `drop` has to be sync), but
388        // that's ok. We can't race against `on_utd` or `on_late_decrypt`, since
389        // they both have `&self` references which mean `drop` can't be called.
390        // We *could* race against one of the actual tasks to report
391        // UTDs, but that's ok too: either the report task will bail out when it sees
392        // the entry has been removed from `pending_delayed` (which is fine), or the
393        // report task will successfully report the UTD (which is fine).
394        let mut pending_delayed = self.pending_delayed.lock().unwrap();
395        for (_, pending_utd_report) in pending_delayed.drain() {
396            pending_utd_report.report_task.abort();
397        }
398    }
399}
400
401#[cfg(test)]
402mod tests {
403    use matrix_sdk::test_utils::{logged_in_client, no_retry_test_client};
404    use matrix_sdk_test::async_test;
405    use ruma::{event_id, server_name, user_id};
406
407    use super::*;
408
409    #[derive(Debug, Default)]
410    struct Dummy {
411        utds: Mutex<Vec<UnableToDecryptInfo>>,
412    }
413
414    impl UnableToDecryptHook for Dummy {
415        fn on_utd(&self, info: UnableToDecryptInfo) {
416            self.utds.lock().unwrap().push(info);
417        }
418    }
419
420    #[async_test]
421    async fn test_deduplicates_utds() {
422        // If I create a dummy hook,
423        let hook = Arc::new(Dummy::default());
424
425        // And I wrap with the UtdHookManager,
426        let wrapper = UtdHookManager::new(hook.clone(), logged_in_client(None).await);
427
428        // And I call the `on_utd` method multiple times, sometimes on the same event,
429        let event_timestamp = MilliSecondsSinceUnixEpoch::now();
430        let sender_user = user_id!("@example2:localhost");
431        let federated_user = user_id!("@example2:example.com");
432        wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
433        wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
434        wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
435        wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
436        wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
437        wrapper.on_utd(event_id!("$3"), UtdCause::Unknown, event_timestamp, sender_user).await;
438
439        // Then the event ids have been deduplicated,
440        {
441            let utds = hook.utds.lock().unwrap();
442            assert_eq!(utds.len(), 3);
443            assert_eq!(utds[0].event_id, event_id!("$1"));
444            assert_eq!(utds[1].event_id, event_id!("$2"));
445            assert_eq!(utds[2].event_id, event_id!("$3"));
446
447            // No event is a late-decryption event.
448            assert!(utds[0].time_to_decrypt.is_none());
449            assert!(utds[1].time_to_decrypt.is_none());
450            assert!(utds[2].time_to_decrypt.is_none());
451
452            // event_local_age_millis should be a small positive number, because the
453            // timestamp we used was after we created the device
454            let utd_local_age = utds[0].event_local_age_millis;
455            assert!(utd_local_age >= 0);
456            assert!(utd_local_age <= 1000);
457
458            assert_eq!(utds[0].sender_homeserver, server_name!("localhost"));
459            assert_eq!(utds[0].own_homeserver, Some(server_name!("localhost").to_owned()));
460
461            assert_eq!(utds[1].sender_homeserver, server_name!("example.com"));
462            assert_eq!(utds[1].own_homeserver, Some(server_name!("localhost").to_owned()));
463        }
464    }
465
466    #[async_test]
467    async fn test_deduplicates_utds_from_previous_session() {
468        // Use a single client for both hooks, so that both hooks are backed by the same
469        // memorystore.
470        let client = no_retry_test_client(None).await;
471
472        // Dummy hook 1, with the first UtdHookManager
473        {
474            let hook = Arc::new(Dummy::default());
475            let wrapper = UtdHookManager::new(hook.clone(), client.clone());
476
477            // I call it a couple of times with different events
478            wrapper
479                .on_utd(
480                    event_id!("$1"),
481                    UtdCause::Unknown,
482                    MilliSecondsSinceUnixEpoch::now(),
483                    user_id!("@a:b"),
484                )
485                .await;
486            wrapper
487                .on_utd(
488                    event_id!("$2"),
489                    UtdCause::Unknown,
490                    MilliSecondsSinceUnixEpoch::now(),
491                    user_id!("@a:b"),
492                )
493                .await;
494
495            // Sanity-check the reported event IDs
496            {
497                let utds = hook.utds.lock().unwrap();
498                assert_eq!(utds.len(), 2);
499                assert_eq!(utds[0].event_id, event_id!("$1"));
500                assert!(utds[0].time_to_decrypt.is_none());
501                assert_eq!(utds[1].event_id, event_id!("$2"));
502                assert!(utds[1].time_to_decrypt.is_none());
503            }
504        }
505
506        // Now, create a *new* hook, with a *new* UtdHookManager
507        {
508            let hook = Arc::new(Dummy::default());
509            let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
510            wrapper.reload_from_store().await.unwrap();
511
512            // Call it with more events, some of which match the previous instance
513            wrapper
514                .on_utd(
515                    event_id!("$1"),
516                    UtdCause::Unknown,
517                    MilliSecondsSinceUnixEpoch::now(),
518                    user_id!("@a:b"),
519                )
520                .await;
521            wrapper
522                .on_utd(
523                    event_id!("$3"),
524                    UtdCause::Unknown,
525                    MilliSecondsSinceUnixEpoch::now(),
526                    user_id!("@a:b"),
527                )
528                .await;
529
530            // Only the *new* ones should be reported
531            let utds = hook.utds.lock().unwrap();
532            assert_eq!(utds.len(), 1);
533            assert_eq!(utds[0].event_id, event_id!("$3"));
534        }
535    }
536
537    /// Test that UTD events which had not yet been reported in a previous
538    /// session, are reported in the next session.
539    #[async_test]
540    async fn test_does_not_deduplicate_late_utds_from_previous_session() {
541        // Use a single client for both hooks, so that both hooks are backed by the same
542        // memorystore.
543        let client = no_retry_test_client(None).await;
544
545        // Dummy hook 1, with the first UtdHookManager
546        {
547            let hook = Arc::new(Dummy::default());
548            let wrapper = UtdHookManager::new(hook.clone(), client.clone())
549                .with_max_delay(Duration::from_secs(2));
550
551            // a UTD event
552            wrapper
553                .on_utd(
554                    event_id!("$1"),
555                    UtdCause::Unknown,
556                    MilliSecondsSinceUnixEpoch::now(),
557                    user_id!("@a:b"),
558                )
559                .await;
560
561            // The event ID should not yet have been reported.
562            {
563                let utds = hook.utds.lock().unwrap();
564                assert_eq!(utds.len(), 0);
565            }
566        }
567
568        // Now, create a *new* hook, with a *new* UtdHookManager
569        {
570            let hook = Arc::new(Dummy::default());
571            let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
572            wrapper.reload_from_store().await.unwrap();
573
574            // Call the new hook with the same event
575            wrapper
576                .on_utd(
577                    event_id!("$1"),
578                    UtdCause::Unknown,
579                    MilliSecondsSinceUnixEpoch::now(),
580                    user_id!("@a:b"),
581                )
582                .await;
583
584            // And it should be reported.
585            sleep(Duration::from_millis(2500)).await;
586
587            let utds = hook.utds.lock().unwrap();
588            assert_eq!(utds.len(), 1);
589            assert_eq!(utds[0].event_id, event_id!("$1"));
590        }
591    }
592
593    #[async_test]
594    async fn test_on_late_decrypted_no_effect() {
595        // If I create a dummy hook,
596        let hook = Arc::new(Dummy::default());
597
598        // And I wrap with the UtdHookManager,
599        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
600
601        // And I call the `on_late_decrypt` method before the event had been marked as
602        // utd,
603        wrapper.on_late_decrypt(event_id!("$1")).await;
604
605        // Then nothing is registered in the parent hook.
606        assert!(hook.utds.lock().unwrap().is_empty());
607    }
608
609    #[async_test]
610    async fn test_on_late_decrypted_after_utd_no_grace_period() {
611        // If I create a dummy hook,
612        let hook = Arc::new(Dummy::default());
613
614        // And I wrap with the UtdHookManager,
615        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
616
617        // And I call the `on_utd` method for an event,
618        wrapper
619            .on_utd(
620                event_id!("$1"),
621                UtdCause::Unknown,
622                MilliSecondsSinceUnixEpoch::now(),
623                user_id!("@a:b"),
624            )
625            .await;
626
627        // Then the UTD has been notified, but not as late-decrypted event.
628        {
629            let utds = hook.utds.lock().unwrap();
630            assert_eq!(utds.len(), 1);
631            assert_eq!(utds[0].event_id, event_id!("$1"));
632            assert!(utds[0].time_to_decrypt.is_none());
633        }
634
635        // And when I call the `on_late_decrypt` method,
636        wrapper.on_late_decrypt(event_id!("$1")).await;
637
638        // Then the event is not reported again as a late-decryption.
639        {
640            let utds = hook.utds.lock().unwrap();
641            assert_eq!(utds.len(), 1);
642
643            // The previous report is still there. (There was no grace period.)
644            assert_eq!(utds[0].event_id, event_id!("$1"));
645            assert!(utds[0].time_to_decrypt.is_none());
646        }
647    }
648
649    #[cfg(not(target_arch = "wasm32"))] // wasm32 has no time for that
650    #[async_test]
651    async fn test_delayed_utd() {
652        // If I create a dummy hook,
653        let hook = Arc::new(Dummy::default());
654
655        // And I wrap with the UtdHookManager, configured to delay reporting after 2
656        // seconds.
657        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
658            .with_max_delay(Duration::from_secs(2));
659
660        // And I call the `on_utd` method for an event,
661        wrapper
662            .on_utd(
663                event_id!("$1"),
664                UtdCause::Unknown,
665                MilliSecondsSinceUnixEpoch::now(),
666                user_id!("@a:b"),
667            )
668            .await;
669
670        // Then the UTD is not being reported immediately.
671        assert!(hook.utds.lock().unwrap().is_empty());
672        assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
673
674        // If I wait for 1 second, then it's still not been notified yet.
675        sleep(Duration::from_secs(1)).await;
676
677        assert!(hook.utds.lock().unwrap().is_empty());
678        assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
679
680        // But if I wait just a bit more, then it's getting notified as a definite UTD.
681        sleep(Duration::from_millis(1500)).await;
682
683        {
684            let utds = hook.utds.lock().unwrap();
685            assert_eq!(utds.len(), 1);
686            assert_eq!(utds[0].event_id, event_id!("$1"));
687            assert!(utds[0].time_to_decrypt.is_none());
688        }
689
690        assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
691    }
692
693    #[cfg(not(target_arch = "wasm32"))] // wasm32 has no time for that
694    #[async_test]
695    async fn test_delayed_late_decryption() {
696        // If I create a dummy hook,
697        let hook = Arc::new(Dummy::default());
698
699        // And I wrap with the UtdHookManager, configured to delay reporting after 2
700        // seconds.
701        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
702            .with_max_delay(Duration::from_secs(2));
703
704        // And I call the `on_utd` method for an event,
705        wrapper
706            .on_utd(
707                event_id!("$1"),
708                UtdCause::Unknown,
709                MilliSecondsSinceUnixEpoch::now(),
710                user_id!("@a:b"),
711            )
712            .await;
713
714        // Then the UTD has not been notified quite yet.
715        assert!(hook.utds.lock().unwrap().is_empty());
716        assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
717
718        // If I wait for 1 second, and mark the event as late-decrypted,
719        sleep(Duration::from_secs(1)).await;
720
721        wrapper.on_late_decrypt(event_id!("$1")).await;
722
723        // Then it's being immediately reported as a late-decryption UTD.
724        {
725            let utds = hook.utds.lock().unwrap();
726            assert_eq!(utds.len(), 1);
727            assert_eq!(utds[0].event_id, event_id!("$1"));
728            assert!(utds[0].time_to_decrypt.is_some());
729        }
730
731        // And there aren't any pending delayed reports anymore.
732        assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
733    }
734}