matrix_sdk_ui/unable_to_decrypt_hook.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a generic interface to subscribe to unable-to-decrypt
16//! events, and notable updates to such events.
17//!
18//! This provides a general trait that a consumer may implement, as well as
19//! utilities to simplify usage of this trait.
20
21use std::{
22 collections::HashMap,
23 sync::{Arc, Mutex},
24};
25
26use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
27use matrix_sdk::{Client, sleep::sleep, task_monitor::BackgroundTaskHandle};
28use matrix_sdk_base::{
29 SendOutsideWasm, StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm,
30 crypto::types::events::UtdCause,
31};
32use ruma::{
33 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedServerName, UserId,
34 time::{Duration, Instant},
35};
36use tokio::sync::{Mutex as AsyncMutex, MutexGuard};
37use tracing::{error, trace};
38
39/// A generic interface which methods get called whenever we observe a
40/// unable-to-decrypt (UTD) event.
41pub trait UnableToDecryptHook: std::fmt::Debug + SendOutsideWasm + SyncOutsideWasm {
42 /// Called every time the hook observes an encrypted event that couldn't be
43 /// decrypted.
44 ///
45 /// If the hook manager was configured with a max delay, this could also
46 /// contain extra information for late-decrypted events. See details in
47 /// [`UnableToDecryptInfo::time_to_decrypt`].
48 fn on_utd(&self, info: UnableToDecryptInfo);
49}
50
51/// Information about an event we were unable to decrypt (UTD).
52#[derive(Clone, Debug, Hash, PartialEq, Eq)]
53pub struct UnableToDecryptInfo {
54 /// The identifier of the event that couldn't get decrypted.
55 pub event_id: OwnedEventId,
56
57 /// If the event could be decrypted late (that is, the event was encrypted
58 /// at first, but could be decrypted later on), then this indicates the
59 /// time it took to decrypt the event. If it is not set, this is
60 /// considered a definite UTD.
61 pub time_to_decrypt: Option<Duration>,
62
63 /// What we know about what caused this UTD. E.g. was this event sent when
64 /// we were not a member of this room?
65 pub cause: UtdCause,
66
67 /// The difference between the event creation time (`origin_server_ts`) and
68 /// the time our device was created. If negative, this event was sent
69 /// *before* our device was created.
70 pub event_local_age_millis: i64,
71
72 /// Whether the user had verified their own identity at the point they
73 /// received the UTD event.
74 pub user_trusts_own_identity: bool,
75
76 /// The homeserver of the user that sent the undecryptable event.
77 pub sender_homeserver: OwnedServerName,
78
79 /// Our local user's own homeserver, or `None` if the client is not logged
80 /// in.
81 pub own_homeserver: Option<OwnedServerName>,
82}
83
84/// Data about a UTD event which we are waiting to report to the parent hook.
85#[derive(Debug)]
86struct PendingUtdReport {
87 /// The time that we received the UTD report from the timeline code.
88 marked_utd_at: Instant,
89
90 /// The task that will report this UTD to the parent hook.
91 report_task: BackgroundTaskHandle,
92
93 /// The UnableToDecryptInfo structure for this UTD event.
94 utd_info: UnableToDecryptInfo,
95}
96
97/// A manager over an existing [`UnableToDecryptHook`] that deduplicates UTDs
98/// on similar events, and adds basic consistency checks.
99///
100/// It can also implement a grace period before reporting an event as a UTD, if
101/// configured with [`Self::with_max_delay`]. Instead of immediately reporting
102/// the UTD, the reporting will be delayed by the max delay at most; if the
103/// event could eventually get decrypted, it may be reported before the end of
104/// that delay.
105#[derive(Debug)]
106pub struct UtdHookManager {
107 /// A Client associated with the UTD hook. This is used to access the store
108 /// which we persist our data to.
109 client: Client,
110
111 /// The parent hook we'll call, when we have found a unique UTD.
112 parent: Arc<dyn UnableToDecryptHook>,
113
114 /// An optional delay before marking the event as UTD ("grace period").
115 max_delay: Option<Duration>,
116
117 /// A mapping of events we're going to report as UTDs, to the tasks to do
118 /// so.
119 ///
120 /// Note: this is empty if no [`Self::max_delay`] is set.
121 ///
122 /// Note: this is theoretically unbounded in size, although this set of
123 /// tasks will degrow over time, as tasks expire after the max delay.
124 pending_delayed: Arc<Mutex<HashMap<OwnedEventId, PendingUtdReport>>>,
125
126 /// Bloom filter containing the event IDs of events which have been reported
127 /// as UTDs
128 reported_utds: Arc<AsyncMutex<GrowableBloom>>,
129}
130
131impl UtdHookManager {
132 /// Create a new [`UtdHookManager`] for the given hook.
133 ///
134 /// A [`Client`] must also be provided; this provides a link to the
135 /// [`matrix_sdk_base::StateStore`] which is used to load and store the
136 /// persistent data.
137 pub fn new(parent: Arc<dyn UnableToDecryptHook>, client: Client) -> Self {
138 let bloom_filter =
139 // Some slightly arbitrarily-chosen parameters here. We specify that, after 1000
140 // UTDs, we want to have a false-positive rate of 1%.
141 //
142 // The GrowableBloomFilter is based on a series of (partitioned) Bloom filters;
143 // once the first starts getting full (the expected false-positive
144 // rate gets too high), it adds another Bloom filter. Each new entry
145 // is recorded in the most recent Bloom filter; when querying, if
146 // *any* of the component filters show a match, that shows
147 // an overall match.
148 //
149 // The first component filter is created based on the parameters we give. For
150 // reasons derived in the paper [1], a partitioned Bloom filter with
151 // target false-positive rate `P` after `n` insertions requires a
152 // number of slices `k` given by:
153 //
154 // k = log2(1/P) = -ln(P) / ln(2)
155 //
156 // ... where each slice has a number of bits `m` given by
157 //
158 // m = n / ln(2)
159 //
160 // We have to have a whole number of slices and bits, so the total number of
161 // bits M is:
162 //
163 // M = ceil(k) * ceil(m)
164 // = ceil(-ln(P) / ln(2)) * ceil(n / ln(2))
165 //
166 // In other words, our FP rate of 1% after 1000 insertions requires:
167 //
168 // M = ceil(-ln(0.01) / ln(2)) * ceil(1000 / ln(2))
169 // = 7 * 1443 = 10101 bits
170 //
171 // So our filter starts off with 1263 bytes of data (plus a little overhead).
172 // Once we hit 1000 UTDs, we add a second component filter with a capacity
173 // double that of the original and target error rate 85% of the
174 // original (another 2526 bytes), which then lasts us until a total
175 // of 3000 UTDs.
176 //
177 // [1]: https://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf
178 GrowableBloomBuilder::new().estimated_insertions(1000).desired_error_ratio(0.01).build();
179
180 Self {
181 client,
182 parent,
183 max_delay: None,
184 pending_delayed: Default::default(),
185 reported_utds: Arc::new(AsyncMutex::new(bloom_filter)),
186 }
187 }
188
189 /// Reports UTDs with the given max delay.
190 ///
191 /// Note: late decryptions are always reported, even if there was a grace
192 /// period set for the reporting of the UTD.
193 pub fn with_max_delay(mut self, delay: Duration) -> Self {
194 self.max_delay = Some(delay);
195 self
196 }
197
198 /// Load the persistent data for the UTD hook from the store.
199 ///
200 /// If the client previously used a UtdHookManager, and UTDs were
201 /// encountered, the data on the reported UTDs is loaded from the store.
202 /// Otherwise, there is no effect.
203 pub async fn reload_from_store(&mut self) -> Result<(), StoreError> {
204 let existing_data =
205 self.client.state_store().get_kv_data(StateStoreDataKey::UtdHookManagerData).await?;
206
207 if let Some(existing_data) = existing_data {
208 let bloom_filter = existing_data
209 .into_utd_hook_manager_data()
210 .expect("StateStore::get_kv_data should return data of the right type");
211 self.reported_utds = Arc::new(AsyncMutex::new(bloom_filter));
212 }
213 Ok(())
214 }
215
216 /// The function to call whenever a UTD is seen for the first time.
217 ///
218 /// Pipe in any information that needs to be included in the final report.
219 ///
220 /// # Arguments
221 /// * `event_id` - The ID of the event that could not be decrypted.
222 /// * `cause` - Our best guess at the reason why the event can't be
223 /// decrypted.
224 /// * `event_timestamp` - The event's `origin_server_ts` field (or creation
225 /// time for local echo).
226 /// * `sender_user_id` - The Matrix user ID of the user that sent the
227 /// undecryptable message.
228 pub(crate) async fn on_utd(
229 &self,
230 event_id: &EventId,
231 cause: UtdCause,
232 event_timestamp: MilliSecondsSinceUnixEpoch,
233 sender_user_id: &UserId,
234 ) {
235 trace!(%event_id, "UtdHookManager: Observed UTD");
236 // Hold the lock on `reported_utds` throughout, to avoid races with other
237 // threads.
238 let mut reported_utds_lock = self.reported_utds.lock().await;
239
240 // Check if this, or a previous instance of UtdHookManager, has already reported
241 // this UTD, and bail out if not.
242 if reported_utds_lock.contains(event_id) {
243 return;
244 }
245
246 // Otherwise, check if we already have a task to handle this UTD.
247 if self.pending_delayed.lock().unwrap().contains_key(event_id) {
248 return;
249 }
250
251 let event_local_age_millis = i64::from(event_timestamp.get()).saturating_sub_unsigned(
252 self.client.encryption().device_creation_timestamp().await.get().into(),
253 );
254
255 let own_user_id = self.client.user_id();
256 let user_trusts_own_identity = if let Some(own_user_id) = own_user_id {
257 if let Ok(Some(own_id)) = self.client.encryption().get_user_identity(own_user_id).await
258 {
259 own_id.is_verified()
260 } else {
261 false
262 }
263 } else {
264 false
265 };
266
267 let own_homeserver = own_user_id.map(|id| id.server_name().to_owned());
268 let sender_homeserver = sender_user_id.server_name().to_owned();
269
270 let info = UnableToDecryptInfo {
271 event_id: event_id.to_owned(),
272 time_to_decrypt: None,
273 cause,
274 event_local_age_millis,
275 user_trusts_own_identity,
276 own_homeserver,
277 sender_homeserver,
278 };
279
280 let Some(max_delay) = self.max_delay else {
281 // No delay: immediately report the event to the parent hook.
282 Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
283 return;
284 };
285
286 // Clone data shared with the task below.
287 let pending_delayed = self.pending_delayed.clone();
288 let reported_utds = self.reported_utds.clone();
289 let parent = self.parent.clone();
290 let client = self.client.clone();
291 let owned_event_id = event_id.to_owned();
292
293 // Spawn a task that will wait for the given delay, and maybe call the parent
294 // hook then.
295 let handle = self.client.task_monitor().spawn_background_task("utd_hook", async move {
296 // Wait for the given delay.
297 sleep(max_delay).await;
298
299 // Make sure we take out the lock on `reported_utds` before removing the entry
300 // from `pending_delayed`, to ensure we don't race against another call to
301 // `on_utd` (which could otherwise see that the entry has been
302 // removed from `pending_delayed` but not yet added to
303 // `reported_utds`).
304 let mut reported_utds_lock = reported_utds.lock().await;
305
306 // Remove the task from the outstanding set. But if it's already been removed,
307 // it's been decrypted since the task was added!
308 let pending_report = pending_delayed.lock().unwrap().remove(&owned_event_id);
309 if let Some(pending_report) = pending_report {
310 Self::report_utd(
311 pending_report.utd_info,
312 &parent,
313 &client,
314 &mut reported_utds_lock,
315 )
316 .await;
317 }
318 });
319
320 // Add the task to the set of pending tasks.
321 self.pending_delayed.lock().unwrap().insert(
322 event_id.to_owned(),
323 PendingUtdReport { marked_utd_at: Instant::now(), report_task: handle, utd_info: info },
324 );
325 }
326
327 /// The function to call whenever an event that was marked as a UTD has
328 /// eventually been decrypted.
329 ///
330 /// Note: if this is called for an event that was never marked as a UTD
331 /// before, it has no effect.
332 pub(crate) async fn on_late_decrypt(&self, event_id: &EventId) {
333 trace!(%event_id, "UtdHookManager: On late decrypt");
334 // Hold the lock on `reported_utds` throughout, to avoid races with other
335 // threads.
336 let mut reported_utds_lock = self.reported_utds.lock().await;
337
338 // Only let the parent hook know about the late decryption if the event is
339 // a pending UTD. If so, remove the event from the pending list —
340 // doing so will cause the reporting task to no-op if it runs.
341 let Some(pending_utd_report) = self.pending_delayed.lock().unwrap().remove(event_id) else {
342 trace!(%event_id, "UtdHookManager: received a late decrypt report for an unknown utd");
343 return;
344 };
345
346 // We can also cancel the reporting task.
347 pending_utd_report.report_task.abort();
348
349 // Update the UTD Info struct with new data, then report it
350 let mut info = pending_utd_report.utd_info;
351 info.time_to_decrypt = Some(pending_utd_report.marked_utd_at.elapsed());
352 Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
353 }
354
355 /// Helper for [`UtdHookManager::on_utd`] and
356 /// [`UtdHookManager.on_late_decrypt`]: reports the UTD to the parent,
357 /// and records the event as reported.
358 ///
359 /// Must be called with the lock held on [`UtdHookManager::reported_utds`],
360 /// and takes a `MutexGuard` to enforce that.
361 async fn report_utd(
362 info: UnableToDecryptInfo,
363 parent_hook: &Arc<dyn UnableToDecryptHook>,
364 client: &Client,
365 reported_utds_lock: &mut MutexGuard<'_, GrowableBloom>,
366 ) {
367 let event_id = info.event_id.clone();
368 parent_hook.on_utd(info);
369 reported_utds_lock.insert(event_id);
370 if let Err(e) = client
371 .state_store()
372 .set_kv_data(
373 StateStoreDataKey::UtdHookManagerData,
374 StateStoreDataValue::UtdHookManagerData(reported_utds_lock.clone()),
375 )
376 .await
377 {
378 error!("Unable to persist UTD report data: {}", e);
379 }
380 }
381}
382
383impl Drop for UtdHookManager {
384 fn drop(&mut self) {
385 // Cancel all the outstanding delayed tasks to report UTDs.
386 //
387 // Here, we don't take the lock on `reported_utd`s (indeed, we can't, since
388 // `reported_utds` has an async mutex, and `drop` has to be sync), but
389 // that's ok. We can't race against `on_utd` or `on_late_decrypt`, since
390 // they both have `&self` references which mean `drop` can't be called.
391 // We *could* race against one of the actual tasks to report
392 // UTDs, but that's ok too: either the report task will bail out when it sees
393 // the entry has been removed from `pending_delayed` (which is fine), or the
394 // report task will successfully report the UTD (which is fine).
395 let mut pending_delayed = self.pending_delayed.lock().unwrap();
396 for (_, pending_utd_report) in pending_delayed.drain() {
397 pending_utd_report.report_task.abort();
398 }
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use matrix_sdk::test_utils::{logged_in_client, no_retry_test_client};
405 use matrix_sdk_test::async_test;
406 use ruma::{event_id, owned_server_name, server_name, user_id};
407
408 use super::*;
409
410 #[derive(Debug, Default)]
411 struct Dummy {
412 utds: Mutex<Vec<UnableToDecryptInfo>>,
413 }
414
415 impl UnableToDecryptHook for Dummy {
416 fn on_utd(&self, info: UnableToDecryptInfo) {
417 self.utds.lock().unwrap().push(info);
418 }
419 }
420
421 #[async_test]
422 async fn test_deduplicates_utds() {
423 // If I create a dummy hook,
424 let hook = Arc::new(Dummy::default());
425
426 // And I wrap with the UtdHookManager,
427 let wrapper = UtdHookManager::new(hook.clone(), logged_in_client(None).await);
428
429 // And I call the `on_utd` method multiple times, sometimes on the same event,
430 let event_timestamp = MilliSecondsSinceUnixEpoch::now();
431 let sender_user = user_id!("@example2:localhost");
432 let federated_user = user_id!("@example2:example.com");
433 wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
434 wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
435 wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
436 wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
437 wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
438 wrapper.on_utd(event_id!("$3"), UtdCause::Unknown, event_timestamp, sender_user).await;
439
440 // Then the event ids have been deduplicated,
441 {
442 let utds = hook.utds.lock().unwrap();
443 assert_eq!(utds.len(), 3);
444 assert_eq!(utds[0].event_id, event_id!("$1"));
445 assert_eq!(utds[1].event_id, event_id!("$2"));
446 assert_eq!(utds[2].event_id, event_id!("$3"));
447
448 // No event is a late-decryption event.
449 assert!(utds[0].time_to_decrypt.is_none());
450 assert!(utds[1].time_to_decrypt.is_none());
451 assert!(utds[2].time_to_decrypt.is_none());
452
453 // event_local_age_millis should be a small positive number, because the
454 // timestamp we used was after we created the device
455 let utd_local_age = utds[0].event_local_age_millis;
456 assert!(utd_local_age >= 0);
457 assert!(utd_local_age <= 1000);
458
459 assert_eq!(utds[0].sender_homeserver, server_name!("localhost"));
460 assert_eq!(utds[0].own_homeserver, Some(owned_server_name!("localhost")));
461
462 assert_eq!(utds[1].sender_homeserver, server_name!("example.com"));
463 assert_eq!(utds[1].own_homeserver, Some(owned_server_name!("localhost")));
464 }
465 }
466
467 #[async_test]
468 async fn test_deduplicates_utds_from_previous_session() {
469 // Use a single client for both hooks, so that both hooks are backed by the same
470 // memorystore.
471 let client = no_retry_test_client(None).await;
472
473 // Dummy hook 1, with the first UtdHookManager
474 {
475 let hook = Arc::new(Dummy::default());
476 let wrapper = UtdHookManager::new(hook.clone(), client.clone());
477
478 // I call it a couple of times with different events
479 wrapper
480 .on_utd(
481 event_id!("$1"),
482 UtdCause::Unknown,
483 MilliSecondsSinceUnixEpoch::now(),
484 user_id!("@a:b"),
485 )
486 .await;
487 wrapper
488 .on_utd(
489 event_id!("$2"),
490 UtdCause::Unknown,
491 MilliSecondsSinceUnixEpoch::now(),
492 user_id!("@a:b"),
493 )
494 .await;
495
496 // Sanity-check the reported event IDs
497 {
498 let utds = hook.utds.lock().unwrap();
499 assert_eq!(utds.len(), 2);
500 assert_eq!(utds[0].event_id, event_id!("$1"));
501 assert!(utds[0].time_to_decrypt.is_none());
502 assert_eq!(utds[1].event_id, event_id!("$2"));
503 assert!(utds[1].time_to_decrypt.is_none());
504 }
505 }
506
507 // Now, create a *new* hook, with a *new* UtdHookManager
508 {
509 let hook = Arc::new(Dummy::default());
510 let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
511 wrapper.reload_from_store().await.unwrap();
512
513 // Call it with more events, some of which match the previous instance
514 wrapper
515 .on_utd(
516 event_id!("$1"),
517 UtdCause::Unknown,
518 MilliSecondsSinceUnixEpoch::now(),
519 user_id!("@a:b"),
520 )
521 .await;
522 wrapper
523 .on_utd(
524 event_id!("$3"),
525 UtdCause::Unknown,
526 MilliSecondsSinceUnixEpoch::now(),
527 user_id!("@a:b"),
528 )
529 .await;
530
531 // Only the *new* ones should be reported
532 let utds = hook.utds.lock().unwrap();
533 assert_eq!(utds.len(), 1);
534 assert_eq!(utds[0].event_id, event_id!("$3"));
535 }
536 }
537
538 /// Test that UTD events which had not yet been reported in a previous
539 /// session, are reported in the next session.
540 #[async_test]
541 async fn test_does_not_deduplicate_late_utds_from_previous_session() {
542 // Use a single client for both hooks, so that both hooks are backed by the same
543 // memorystore.
544 let client = no_retry_test_client(None).await;
545
546 // Dummy hook 1, with the first UtdHookManager
547 {
548 let hook = Arc::new(Dummy::default());
549 let wrapper = UtdHookManager::new(hook.clone(), client.clone())
550 .with_max_delay(Duration::from_secs(2));
551
552 // a UTD event
553 wrapper
554 .on_utd(
555 event_id!("$1"),
556 UtdCause::Unknown,
557 MilliSecondsSinceUnixEpoch::now(),
558 user_id!("@a:b"),
559 )
560 .await;
561
562 // The event ID should not yet have been reported.
563 {
564 let utds = hook.utds.lock().unwrap();
565 assert_eq!(utds.len(), 0);
566 }
567 }
568
569 // Now, create a *new* hook, with a *new* UtdHookManager
570 {
571 let hook = Arc::new(Dummy::default());
572 let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
573 wrapper.reload_from_store().await.unwrap();
574
575 // Call the new hook with the same event
576 wrapper
577 .on_utd(
578 event_id!("$1"),
579 UtdCause::Unknown,
580 MilliSecondsSinceUnixEpoch::now(),
581 user_id!("@a:b"),
582 )
583 .await;
584
585 // And it should be reported.
586 sleep(Duration::from_millis(2500)).await;
587
588 let utds = hook.utds.lock().unwrap();
589 assert_eq!(utds.len(), 1);
590 assert_eq!(utds[0].event_id, event_id!("$1"));
591 }
592 }
593
594 #[async_test]
595 async fn test_on_late_decrypted_no_effect() {
596 // If I create a dummy hook,
597 let hook = Arc::new(Dummy::default());
598
599 // And I wrap with the UtdHookManager,
600 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
601
602 // And I call the `on_late_decrypt` method before the event had been marked as
603 // utd,
604 wrapper.on_late_decrypt(event_id!("$1")).await;
605
606 // Then nothing is registered in the parent hook.
607 assert!(hook.utds.lock().unwrap().is_empty());
608 }
609
610 #[async_test]
611 async fn test_on_late_decrypted_after_utd_no_grace_period() {
612 // If I create a dummy hook,
613 let hook = Arc::new(Dummy::default());
614
615 // And I wrap with the UtdHookManager,
616 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
617
618 // And I call the `on_utd` method for an event,
619 wrapper
620 .on_utd(
621 event_id!("$1"),
622 UtdCause::Unknown,
623 MilliSecondsSinceUnixEpoch::now(),
624 user_id!("@a:b"),
625 )
626 .await;
627
628 // Then the UTD has been notified, but not as late-decrypted event.
629 {
630 let utds = hook.utds.lock().unwrap();
631 assert_eq!(utds.len(), 1);
632 assert_eq!(utds[0].event_id, event_id!("$1"));
633 assert!(utds[0].time_to_decrypt.is_none());
634 }
635
636 // And when I call the `on_late_decrypt` method,
637 wrapper.on_late_decrypt(event_id!("$1")).await;
638
639 // Then the event is not reported again as a late-decryption.
640 {
641 let utds = hook.utds.lock().unwrap();
642 assert_eq!(utds.len(), 1);
643
644 // The previous report is still there. (There was no grace period.)
645 assert_eq!(utds[0].event_id, event_id!("$1"));
646 assert!(utds[0].time_to_decrypt.is_none());
647 }
648 }
649
650 #[cfg(not(target_family = "wasm"))] // wasm32 has no time for that
651 #[async_test]
652 async fn test_delayed_utd() {
653 // If I create a dummy hook,
654 let hook = Arc::new(Dummy::default());
655
656 // And I wrap with the UtdHookManager, configured to delay reporting after 2
657 // seconds.
658 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
659 .with_max_delay(Duration::from_secs(2));
660
661 // And I call the `on_utd` method for an event,
662 wrapper
663 .on_utd(
664 event_id!("$1"),
665 UtdCause::Unknown,
666 MilliSecondsSinceUnixEpoch::now(),
667 user_id!("@a:b"),
668 )
669 .await;
670
671 // Then the UTD is not being reported immediately.
672 assert!(hook.utds.lock().unwrap().is_empty());
673 assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
674
675 // If I wait for 1 second, then it's still not been notified yet.
676 sleep(Duration::from_secs(1)).await;
677
678 assert!(hook.utds.lock().unwrap().is_empty());
679 assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
680
681 // But if I wait just a bit more, then it's getting notified as a definite UTD.
682 sleep(Duration::from_millis(1500)).await;
683
684 {
685 let utds = hook.utds.lock().unwrap();
686 assert_eq!(utds.len(), 1);
687 assert_eq!(utds[0].event_id, event_id!("$1"));
688 assert!(utds[0].time_to_decrypt.is_none());
689 }
690
691 assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
692 }
693
694 #[cfg(not(target_family = "wasm"))] // wasm32 has no time for that
695 #[async_test]
696 async fn test_delayed_late_decryption() {
697 // If I create a dummy hook,
698 let hook = Arc::new(Dummy::default());
699
700 // And I wrap with the UtdHookManager, configured to delay reporting after 2
701 // seconds.
702 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
703 .with_max_delay(Duration::from_secs(2));
704
705 // And I call the `on_utd` method for an event,
706 wrapper
707 .on_utd(
708 event_id!("$1"),
709 UtdCause::Unknown,
710 MilliSecondsSinceUnixEpoch::now(),
711 user_id!("@a:b"),
712 )
713 .await;
714
715 // Then the UTD has not been notified quite yet.
716 assert!(hook.utds.lock().unwrap().is_empty());
717 assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
718
719 // If I wait for 1 second, and mark the event as late-decrypted,
720 sleep(Duration::from_secs(1)).await;
721
722 wrapper.on_late_decrypt(event_id!("$1")).await;
723
724 // Then it's being immediately reported as a late-decryption UTD.
725 {
726 let utds = hook.utds.lock().unwrap();
727 assert_eq!(utds.len(), 1);
728 assert_eq!(utds[0].event_id, event_id!("$1"));
729 assert!(utds[0].time_to_decrypt.is_some());
730 }
731
732 // And there aren't any pending delayed reports anymore.
733 assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
734 }
735}