matrix_sdk_ui/
unable_to_decrypt_hook.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a generic interface to subscribe to unable-to-decrypt
16//! events, and notable updates to such events.
17//!
18//! This provides a general trait that a consumer may implement, as well as
19//! utilities to simplify usage of this trait.
20
21use std::{
22    collections::HashMap,
23    sync::{Arc, Mutex},
24};
25
26use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
27use matrix_sdk::{
28    Client,
29    crypto::types::events::UtdCause,
30    executor::{JoinHandle, spawn},
31    sleep::sleep,
32};
33use matrix_sdk_base::{
34    SendOutsideWasm, StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm,
35};
36use ruma::{
37    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedServerName, UserId,
38    time::{Duration, Instant},
39};
40use tokio::sync::{Mutex as AsyncMutex, MutexGuard};
41use tracing::{error, trace};
42
43/// A generic interface which methods get called whenever we observe a
44/// unable-to-decrypt (UTD) event.
45pub trait UnableToDecryptHook: std::fmt::Debug + SendOutsideWasm + SyncOutsideWasm {
46    /// Called every time the hook observes an encrypted event that couldn't be
47    /// decrypted.
48    ///
49    /// If the hook manager was configured with a max delay, this could also
50    /// contain extra information for late-decrypted events. See details in
51    /// [`UnableToDecryptInfo::time_to_decrypt`].
52    fn on_utd(&self, info: UnableToDecryptInfo);
53}
54
55/// Information about an event we were unable to decrypt (UTD).
56#[derive(Clone, Debug, Hash, PartialEq, Eq)]
57pub struct UnableToDecryptInfo {
58    /// The identifier of the event that couldn't get decrypted.
59    pub event_id: OwnedEventId,
60
61    /// If the event could be decrypted late (that is, the event was encrypted
62    /// at first, but could be decrypted later on), then this indicates the
63    /// time it took to decrypt the event. If it is not set, this is
64    /// considered a definite UTD.
65    pub time_to_decrypt: Option<Duration>,
66
67    /// What we know about what caused this UTD. E.g. was this event sent when
68    /// we were not a member of this room?
69    pub cause: UtdCause,
70
71    /// The difference between the event creation time (`origin_server_ts`) and
72    /// the time our device was created. If negative, this event was sent
73    /// *before* our device was created.
74    pub event_local_age_millis: i64,
75
76    /// Whether the user had verified their own identity at the point they
77    /// received the UTD event.
78    pub user_trusts_own_identity: bool,
79
80    /// The homeserver of the user that sent the undecryptable event.
81    pub sender_homeserver: OwnedServerName,
82
83    /// Our local user's own homeserver, or `None` if the client is not logged
84    /// in.
85    pub own_homeserver: Option<OwnedServerName>,
86}
87
88/// Data about a UTD event which we are waiting to report to the parent hook.
89#[derive(Debug)]
90struct PendingUtdReport {
91    /// The time that we received the UTD report from the timeline code.
92    marked_utd_at: Instant,
93
94    /// The task that will report this UTD to the parent hook.
95    report_task: JoinHandle<()>,
96
97    /// The UnableToDecryptInfo structure for this UTD event.
98    utd_info: UnableToDecryptInfo,
99}
100
101/// A manager over an existing [`UnableToDecryptHook`] that deduplicates UTDs
102/// on similar events, and adds basic consistency checks.
103///
104/// It can also implement a grace period before reporting an event as a UTD, if
105/// configured with [`Self::with_max_delay`]. Instead of immediately reporting
106/// the UTD, the reporting will be delayed by the max delay at most; if the
107/// event could eventually get decrypted, it may be reported before the end of
108/// that delay.
109#[derive(Debug)]
110pub struct UtdHookManager {
111    /// A Client associated with the UTD hook. This is used to access the store
112    /// which we persist our data to.
113    client: Client,
114
115    /// The parent hook we'll call, when we have found a unique UTD.
116    parent: Arc<dyn UnableToDecryptHook>,
117
118    /// An optional delay before marking the event as UTD ("grace period").
119    max_delay: Option<Duration>,
120
121    /// A mapping of events we're going to report as UTDs, to the tasks to do
122    /// so.
123    ///
124    /// Note: this is empty if no [`Self::max_delay`] is set.
125    ///
126    /// Note: this is theoretically unbounded in size, although this set of
127    /// tasks will degrow over time, as tasks expire after the max delay.
128    pending_delayed: Arc<Mutex<HashMap<OwnedEventId, PendingUtdReport>>>,
129
130    /// Bloom filter containing the event IDs of events which have been reported
131    /// as UTDs
132    reported_utds: Arc<AsyncMutex<GrowableBloom>>,
133}
134
135impl UtdHookManager {
136    /// Create a new [`UtdHookManager`] for the given hook.
137    ///
138    /// A [`Client`] must also be provided; this provides a link to the
139    /// [`matrix_sdk_base::StateStore`] which is used to load and store the
140    /// persistent data.
141    pub fn new(parent: Arc<dyn UnableToDecryptHook>, client: Client) -> Self {
142        let bloom_filter =
143            // Some slightly arbitrarily-chosen parameters here. We specify that, after 1000
144            // UTDs, we want to have a false-positive rate of 1%.
145            //
146            // The GrowableBloomFilter is based on a series of (partitioned) Bloom filters;
147            // once the first starts getting full (the expected false-positive
148            // rate gets too high), it adds another Bloom filter. Each new entry
149            // is recorded in the most recent Bloom filter; when querying, if
150            // *any* of the component filters show a match, that shows
151            // an overall match.
152            //
153            // The first component filter is created based on the parameters we give. For
154            // reasons derived in the paper [1], a partitioned Bloom filter with
155            // target false-positive rate `P` after `n` insertions requires a
156            // number of slices `k` given by:
157            //
158            // k = log2(1/P) = -ln(P) / ln(2)
159            //
160            // ... where each slice has a number of bits `m` given by
161            //
162            // m = n / ln(2)
163            //
164            // We have to have a whole number of slices and bits, so the total number of
165            // bits M is:
166            //
167            // M = ceil(k) * ceil(m)
168            //   = ceil(-ln(P) / ln(2)) * ceil(n / ln(2))
169            //
170            // In other words, our FP rate of 1% after 1000 insertions requires:
171            //
172            // M = ceil(-ln(0.01) / ln(2)) * ceil(1000 / ln(2))
173            //   = 7 * 1443 = 10101 bits
174            //
175            // So our filter starts off with 1263 bytes of data (plus a little overhead).
176            // Once we hit 1000 UTDs, we add a second component filter with a capacity
177            // double that of the original and target error rate 85% of the
178            // original (another 2526 bytes), which then lasts us until a total
179            // of 3000 UTDs.
180            //
181            // [1]: https://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf
182            GrowableBloomBuilder::new().estimated_insertions(1000).desired_error_ratio(0.01).build();
183
184        Self {
185            client,
186            parent,
187            max_delay: None,
188            pending_delayed: Default::default(),
189            reported_utds: Arc::new(AsyncMutex::new(bloom_filter)),
190        }
191    }
192
193    /// Reports UTDs with the given max delay.
194    ///
195    /// Note: late decryptions are always reported, even if there was a grace
196    /// period set for the reporting of the UTD.
197    pub fn with_max_delay(mut self, delay: Duration) -> Self {
198        self.max_delay = Some(delay);
199        self
200    }
201
202    /// Load the persistent data for the UTD hook from the store.
203    ///
204    /// If the client previously used a UtdHookManager, and UTDs were
205    /// encountered, the data on the reported UTDs is loaded from the store.
206    /// Otherwise, there is no effect.
207    pub async fn reload_from_store(&mut self) -> Result<(), StoreError> {
208        let existing_data =
209            self.client.state_store().get_kv_data(StateStoreDataKey::UtdHookManagerData).await?;
210
211        if let Some(existing_data) = existing_data {
212            let bloom_filter = existing_data
213                .into_utd_hook_manager_data()
214                .expect("StateStore::get_kv_data should return data of the right type");
215            self.reported_utds = Arc::new(AsyncMutex::new(bloom_filter));
216        }
217        Ok(())
218    }
219
220    /// The function to call whenever a UTD is seen for the first time.
221    ///
222    /// Pipe in any information that needs to be included in the final report.
223    ///
224    /// # Arguments
225    ///  * `event_id` - The ID of the event that could not be decrypted.
226    ///  * `cause` - Our best guess at the reason why the event can't be
227    ///    decrypted.
228    ///  * `event_timestamp` - The event's `origin_server_ts` field (or creation
229    ///    time for local echo).
230    ///  * `sender_user_id` - The Matrix user ID of the user that sent the
231    ///    undecryptable message.
232    pub(crate) async fn on_utd(
233        &self,
234        event_id: &EventId,
235        cause: UtdCause,
236        event_timestamp: MilliSecondsSinceUnixEpoch,
237        sender_user_id: &UserId,
238    ) {
239        trace!(%event_id, "UtdHookManager: Observed UTD");
240        // Hold the lock on `reported_utds` throughout, to avoid races with other
241        // threads.
242        let mut reported_utds_lock = self.reported_utds.lock().await;
243
244        // Check if this, or a previous instance of UtdHookManager, has already reported
245        // this UTD, and bail out if not.
246        if reported_utds_lock.contains(event_id) {
247            return;
248        }
249
250        // Otherwise, check if we already have a task to handle this UTD.
251        if self.pending_delayed.lock().unwrap().contains_key(event_id) {
252            return;
253        }
254
255        let event_local_age_millis = i64::from(event_timestamp.get()).saturating_sub_unsigned(
256            self.client.encryption().device_creation_timestamp().await.get().into(),
257        );
258
259        let own_user_id = self.client.user_id();
260        let user_trusts_own_identity = if let Some(own_user_id) = own_user_id {
261            if let Ok(Some(own_id)) = self.client.encryption().get_user_identity(own_user_id).await
262            {
263                own_id.is_verified()
264            } else {
265                false
266            }
267        } else {
268            false
269        };
270
271        let own_homeserver = own_user_id.map(|id| id.server_name().to_owned());
272        let sender_homeserver = sender_user_id.server_name().to_owned();
273
274        let info = UnableToDecryptInfo {
275            event_id: event_id.to_owned(),
276            time_to_decrypt: None,
277            cause,
278            event_local_age_millis,
279            user_trusts_own_identity,
280            own_homeserver,
281            sender_homeserver,
282        };
283
284        let Some(max_delay) = self.max_delay else {
285            // No delay: immediately report the event to the parent hook.
286            Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
287            return;
288        };
289
290        // Clone data shared with the task below.
291        let pending_delayed = self.pending_delayed.clone();
292        let reported_utds = self.reported_utds.clone();
293        let parent = self.parent.clone();
294        let client = self.client.clone();
295        let owned_event_id = event_id.to_owned();
296
297        // Spawn a task that will wait for the given delay, and maybe call the parent
298        // hook then.
299        let handle = spawn(async move {
300            // Wait for the given delay.
301            sleep(max_delay).await;
302
303            // Make sure we take out the lock on `reported_utds` before removing the entry
304            // from `pending_delayed`, to ensure we don't race against another call to
305            // `on_utd` (which could otherwise see that the entry has been
306            // removed from `pending_delayed` but not yet added to
307            // `reported_utds`).
308            let mut reported_utds_lock = reported_utds.lock().await;
309
310            // Remove the task from the outstanding set. But if it's already been removed,
311            // it's been decrypted since the task was added!
312            let pending_report = pending_delayed.lock().unwrap().remove(&owned_event_id);
313            if let Some(pending_report) = pending_report {
314                Self::report_utd(
315                    pending_report.utd_info,
316                    &parent,
317                    &client,
318                    &mut reported_utds_lock,
319                )
320                .await;
321            }
322        });
323
324        // Add the task to the set of pending tasks.
325        self.pending_delayed.lock().unwrap().insert(
326            event_id.to_owned(),
327            PendingUtdReport { marked_utd_at: Instant::now(), report_task: handle, utd_info: info },
328        );
329    }
330
331    /// The function to call whenever an event that was marked as a UTD has
332    /// eventually been decrypted.
333    ///
334    /// Note: if this is called for an event that was never marked as a UTD
335    /// before, it has no effect.
336    pub(crate) async fn on_late_decrypt(&self, event_id: &EventId) {
337        trace!(%event_id, "UtdHookManager: On late decrypt");
338        // Hold the lock on `reported_utds` throughout, to avoid races with other
339        // threads.
340        let mut reported_utds_lock = self.reported_utds.lock().await;
341
342        // Only let the parent hook know about the late decryption if the event is
343        // a pending UTD. If so, remove the event from the pending list —
344        // doing so will cause the reporting task to no-op if it runs.
345        let Some(pending_utd_report) = self.pending_delayed.lock().unwrap().remove(event_id) else {
346            trace!(%event_id, "UtdHookManager: received a late decrypt report for an unknown utd");
347            return;
348        };
349
350        // We can also cancel the reporting task.
351        pending_utd_report.report_task.abort();
352
353        // Update the UTD Info struct with new data, then report it
354        let mut info = pending_utd_report.utd_info;
355        info.time_to_decrypt = Some(pending_utd_report.marked_utd_at.elapsed());
356        Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
357    }
358
359    /// Helper for [`UtdHookManager::on_utd`] and
360    /// [`UtdHookManager.on_late_decrypt`]: reports the UTD to the parent,
361    /// and records the event as reported.
362    ///
363    /// Must be called with the lock held on [`UtdHookManager::reported_utds`],
364    /// and takes a `MutexGuard` to enforce that.
365    async fn report_utd(
366        info: UnableToDecryptInfo,
367        parent_hook: &Arc<dyn UnableToDecryptHook>,
368        client: &Client,
369        reported_utds_lock: &mut MutexGuard<'_, GrowableBloom>,
370    ) {
371        let event_id = info.event_id.clone();
372        parent_hook.on_utd(info);
373        reported_utds_lock.insert(event_id);
374        if let Err(e) = client
375            .state_store()
376            .set_kv_data(
377                StateStoreDataKey::UtdHookManagerData,
378                StateStoreDataValue::UtdHookManagerData(reported_utds_lock.clone()),
379            )
380            .await
381        {
382            error!("Unable to persist UTD report data: {}", e);
383        }
384    }
385}
386
387impl Drop for UtdHookManager {
388    fn drop(&mut self) {
389        // Cancel all the outstanding delayed tasks to report UTDs.
390        //
391        // Here, we don't take the lock on `reported_utd`s (indeed, we can't, since
392        // `reported_utds` has an async mutex, and `drop` has to be sync), but
393        // that's ok. We can't race against `on_utd` or `on_late_decrypt`, since
394        // they both have `&self` references which mean `drop` can't be called.
395        // We *could* race against one of the actual tasks to report
396        // UTDs, but that's ok too: either the report task will bail out when it sees
397        // the entry has been removed from `pending_delayed` (which is fine), or the
398        // report task will successfully report the UTD (which is fine).
399        let mut pending_delayed = self.pending_delayed.lock().unwrap();
400        for (_, pending_utd_report) in pending_delayed.drain() {
401            pending_utd_report.report_task.abort();
402        }
403    }
404}
405
406#[cfg(test)]
407mod tests {
408    use matrix_sdk::test_utils::{logged_in_client, no_retry_test_client};
409    use matrix_sdk_test::async_test;
410    use ruma::{event_id, server_name, user_id};
411
412    use super::*;
413
414    #[derive(Debug, Default)]
415    struct Dummy {
416        utds: Mutex<Vec<UnableToDecryptInfo>>,
417    }
418
419    impl UnableToDecryptHook for Dummy {
420        fn on_utd(&self, info: UnableToDecryptInfo) {
421            self.utds.lock().unwrap().push(info);
422        }
423    }
424
425    #[async_test]
426    async fn test_deduplicates_utds() {
427        // If I create a dummy hook,
428        let hook = Arc::new(Dummy::default());
429
430        // And I wrap with the UtdHookManager,
431        let wrapper = UtdHookManager::new(hook.clone(), logged_in_client(None).await);
432
433        // And I call the `on_utd` method multiple times, sometimes on the same event,
434        let event_timestamp = MilliSecondsSinceUnixEpoch::now();
435        let sender_user = user_id!("@example2:localhost");
436        let federated_user = user_id!("@example2:example.com");
437        wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
438        wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
439        wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
440        wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
441        wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
442        wrapper.on_utd(event_id!("$3"), UtdCause::Unknown, event_timestamp, sender_user).await;
443
444        // Then the event ids have been deduplicated,
445        {
446            let utds = hook.utds.lock().unwrap();
447            assert_eq!(utds.len(), 3);
448            assert_eq!(utds[0].event_id, event_id!("$1"));
449            assert_eq!(utds[1].event_id, event_id!("$2"));
450            assert_eq!(utds[2].event_id, event_id!("$3"));
451
452            // No event is a late-decryption event.
453            assert!(utds[0].time_to_decrypt.is_none());
454            assert!(utds[1].time_to_decrypt.is_none());
455            assert!(utds[2].time_to_decrypt.is_none());
456
457            // event_local_age_millis should be a small positive number, because the
458            // timestamp we used was after we created the device
459            let utd_local_age = utds[0].event_local_age_millis;
460            assert!(utd_local_age >= 0);
461            assert!(utd_local_age <= 1000);
462
463            assert_eq!(utds[0].sender_homeserver, server_name!("localhost"));
464            assert_eq!(utds[0].own_homeserver, Some(server_name!("localhost").to_owned()));
465
466            assert_eq!(utds[1].sender_homeserver, server_name!("example.com"));
467            assert_eq!(utds[1].own_homeserver, Some(server_name!("localhost").to_owned()));
468        }
469    }
470
471    #[async_test]
472    async fn test_deduplicates_utds_from_previous_session() {
473        // Use a single client for both hooks, so that both hooks are backed by the same
474        // memorystore.
475        let client = no_retry_test_client(None).await;
476
477        // Dummy hook 1, with the first UtdHookManager
478        {
479            let hook = Arc::new(Dummy::default());
480            let wrapper = UtdHookManager::new(hook.clone(), client.clone());
481
482            // I call it a couple of times with different events
483            wrapper
484                .on_utd(
485                    event_id!("$1"),
486                    UtdCause::Unknown,
487                    MilliSecondsSinceUnixEpoch::now(),
488                    user_id!("@a:b"),
489                )
490                .await;
491            wrapper
492                .on_utd(
493                    event_id!("$2"),
494                    UtdCause::Unknown,
495                    MilliSecondsSinceUnixEpoch::now(),
496                    user_id!("@a:b"),
497                )
498                .await;
499
500            // Sanity-check the reported event IDs
501            {
502                let utds = hook.utds.lock().unwrap();
503                assert_eq!(utds.len(), 2);
504                assert_eq!(utds[0].event_id, event_id!("$1"));
505                assert!(utds[0].time_to_decrypt.is_none());
506                assert_eq!(utds[1].event_id, event_id!("$2"));
507                assert!(utds[1].time_to_decrypt.is_none());
508            }
509        }
510
511        // Now, create a *new* hook, with a *new* UtdHookManager
512        {
513            let hook = Arc::new(Dummy::default());
514            let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
515            wrapper.reload_from_store().await.unwrap();
516
517            // Call it with more events, some of which match the previous instance
518            wrapper
519                .on_utd(
520                    event_id!("$1"),
521                    UtdCause::Unknown,
522                    MilliSecondsSinceUnixEpoch::now(),
523                    user_id!("@a:b"),
524                )
525                .await;
526            wrapper
527                .on_utd(
528                    event_id!("$3"),
529                    UtdCause::Unknown,
530                    MilliSecondsSinceUnixEpoch::now(),
531                    user_id!("@a:b"),
532                )
533                .await;
534
535            // Only the *new* ones should be reported
536            let utds = hook.utds.lock().unwrap();
537            assert_eq!(utds.len(), 1);
538            assert_eq!(utds[0].event_id, event_id!("$3"));
539        }
540    }
541
542    /// Test that UTD events which had not yet been reported in a previous
543    /// session, are reported in the next session.
544    #[async_test]
545    async fn test_does_not_deduplicate_late_utds_from_previous_session() {
546        // Use a single client for both hooks, so that both hooks are backed by the same
547        // memorystore.
548        let client = no_retry_test_client(None).await;
549
550        // Dummy hook 1, with the first UtdHookManager
551        {
552            let hook = Arc::new(Dummy::default());
553            let wrapper = UtdHookManager::new(hook.clone(), client.clone())
554                .with_max_delay(Duration::from_secs(2));
555
556            // a UTD event
557            wrapper
558                .on_utd(
559                    event_id!("$1"),
560                    UtdCause::Unknown,
561                    MilliSecondsSinceUnixEpoch::now(),
562                    user_id!("@a:b"),
563                )
564                .await;
565
566            // The event ID should not yet have been reported.
567            {
568                let utds = hook.utds.lock().unwrap();
569                assert_eq!(utds.len(), 0);
570            }
571        }
572
573        // Now, create a *new* hook, with a *new* UtdHookManager
574        {
575            let hook = Arc::new(Dummy::default());
576            let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
577            wrapper.reload_from_store().await.unwrap();
578
579            // Call the new hook with the same event
580            wrapper
581                .on_utd(
582                    event_id!("$1"),
583                    UtdCause::Unknown,
584                    MilliSecondsSinceUnixEpoch::now(),
585                    user_id!("@a:b"),
586                )
587                .await;
588
589            // And it should be reported.
590            sleep(Duration::from_millis(2500)).await;
591
592            let utds = hook.utds.lock().unwrap();
593            assert_eq!(utds.len(), 1);
594            assert_eq!(utds[0].event_id, event_id!("$1"));
595        }
596    }
597
598    #[async_test]
599    async fn test_on_late_decrypted_no_effect() {
600        // If I create a dummy hook,
601        let hook = Arc::new(Dummy::default());
602
603        // And I wrap with the UtdHookManager,
604        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
605
606        // And I call the `on_late_decrypt` method before the event had been marked as
607        // utd,
608        wrapper.on_late_decrypt(event_id!("$1")).await;
609
610        // Then nothing is registered in the parent hook.
611        assert!(hook.utds.lock().unwrap().is_empty());
612    }
613
614    #[async_test]
615    async fn test_on_late_decrypted_after_utd_no_grace_period() {
616        // If I create a dummy hook,
617        let hook = Arc::new(Dummy::default());
618
619        // And I wrap with the UtdHookManager,
620        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
621
622        // And I call the `on_utd` method for an event,
623        wrapper
624            .on_utd(
625                event_id!("$1"),
626                UtdCause::Unknown,
627                MilliSecondsSinceUnixEpoch::now(),
628                user_id!("@a:b"),
629            )
630            .await;
631
632        // Then the UTD has been notified, but not as late-decrypted event.
633        {
634            let utds = hook.utds.lock().unwrap();
635            assert_eq!(utds.len(), 1);
636            assert_eq!(utds[0].event_id, event_id!("$1"));
637            assert!(utds[0].time_to_decrypt.is_none());
638        }
639
640        // And when I call the `on_late_decrypt` method,
641        wrapper.on_late_decrypt(event_id!("$1")).await;
642
643        // Then the event is not reported again as a late-decryption.
644        {
645            let utds = hook.utds.lock().unwrap();
646            assert_eq!(utds.len(), 1);
647
648            // The previous report is still there. (There was no grace period.)
649            assert_eq!(utds[0].event_id, event_id!("$1"));
650            assert!(utds[0].time_to_decrypt.is_none());
651        }
652    }
653
654    #[cfg(not(target_family = "wasm"))] // wasm32 has no time for that
655    #[async_test]
656    async fn test_delayed_utd() {
657        // If I create a dummy hook,
658        let hook = Arc::new(Dummy::default());
659
660        // And I wrap with the UtdHookManager, configured to delay reporting after 2
661        // seconds.
662        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
663            .with_max_delay(Duration::from_secs(2));
664
665        // And I call the `on_utd` method for an event,
666        wrapper
667            .on_utd(
668                event_id!("$1"),
669                UtdCause::Unknown,
670                MilliSecondsSinceUnixEpoch::now(),
671                user_id!("@a:b"),
672            )
673            .await;
674
675        // Then the UTD is not being reported immediately.
676        assert!(hook.utds.lock().unwrap().is_empty());
677        assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
678
679        // If I wait for 1 second, then it's still not been notified yet.
680        sleep(Duration::from_secs(1)).await;
681
682        assert!(hook.utds.lock().unwrap().is_empty());
683        assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
684
685        // But if I wait just a bit more, then it's getting notified as a definite UTD.
686        sleep(Duration::from_millis(1500)).await;
687
688        {
689            let utds = hook.utds.lock().unwrap();
690            assert_eq!(utds.len(), 1);
691            assert_eq!(utds[0].event_id, event_id!("$1"));
692            assert!(utds[0].time_to_decrypt.is_none());
693        }
694
695        assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
696    }
697
698    #[cfg(not(target_family = "wasm"))] // wasm32 has no time for that
699    #[async_test]
700    async fn test_delayed_late_decryption() {
701        // If I create a dummy hook,
702        let hook = Arc::new(Dummy::default());
703
704        // And I wrap with the UtdHookManager, configured to delay reporting after 2
705        // seconds.
706        let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
707            .with_max_delay(Duration::from_secs(2));
708
709        // And I call the `on_utd` method for an event,
710        wrapper
711            .on_utd(
712                event_id!("$1"),
713                UtdCause::Unknown,
714                MilliSecondsSinceUnixEpoch::now(),
715                user_id!("@a:b"),
716            )
717            .await;
718
719        // Then the UTD has not been notified quite yet.
720        assert!(hook.utds.lock().unwrap().is_empty());
721        assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
722
723        // If I wait for 1 second, and mark the event as late-decrypted,
724        sleep(Duration::from_secs(1)).await;
725
726        wrapper.on_late_decrypt(event_id!("$1")).await;
727
728        // Then it's being immediately reported as a late-decryption UTD.
729        {
730            let utds = hook.utds.lock().unwrap();
731            assert_eq!(utds.len(), 1);
732            assert_eq!(utds[0].event_id, event_id!("$1"));
733            assert!(utds[0].time_to_decrypt.is_some());
734        }
735
736        // And there aren't any pending delayed reports anymore.
737        assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
738    }
739}