matrix_sdk_ui/unable_to_decrypt_hook.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a generic interface to subscribe to unable-to-decrypt
16//! events, and notable updates to such events.
17//!
18//! This provides a general trait that a consumer may implement, as well as
19//! utilities to simplify usage of this trait.
20
21use std::{
22 collections::HashMap,
23 sync::{Arc, Mutex},
24};
25
26use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
27use matrix_sdk::{
28 Client,
29 crypto::types::events::UtdCause,
30 executor::{JoinHandle, spawn},
31 sleep::sleep,
32};
33use matrix_sdk_base::{
34 SendOutsideWasm, StateStoreDataKey, StateStoreDataValue, StoreError, SyncOutsideWasm,
35};
36use ruma::{
37 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedServerName, UserId,
38 time::{Duration, Instant},
39};
40use tokio::sync::{Mutex as AsyncMutex, MutexGuard};
41use tracing::{error, trace};
42
43/// A generic interface which methods get called whenever we observe a
44/// unable-to-decrypt (UTD) event.
45pub trait UnableToDecryptHook: std::fmt::Debug + SendOutsideWasm + SyncOutsideWasm {
46 /// Called every time the hook observes an encrypted event that couldn't be
47 /// decrypted.
48 ///
49 /// If the hook manager was configured with a max delay, this could also
50 /// contain extra information for late-decrypted events. See details in
51 /// [`UnableToDecryptInfo::time_to_decrypt`].
52 fn on_utd(&self, info: UnableToDecryptInfo);
53}
54
55/// Information about an event we were unable to decrypt (UTD).
56#[derive(Clone, Debug, Hash, PartialEq, Eq)]
57pub struct UnableToDecryptInfo {
58 /// The identifier of the event that couldn't get decrypted.
59 pub event_id: OwnedEventId,
60
61 /// If the event could be decrypted late (that is, the event was encrypted
62 /// at first, but could be decrypted later on), then this indicates the
63 /// time it took to decrypt the event. If it is not set, this is
64 /// considered a definite UTD.
65 pub time_to_decrypt: Option<Duration>,
66
67 /// What we know about what caused this UTD. E.g. was this event sent when
68 /// we were not a member of this room?
69 pub cause: UtdCause,
70
71 /// The difference between the event creation time (`origin_server_ts`) and
72 /// the time our device was created. If negative, this event was sent
73 /// *before* our device was created.
74 pub event_local_age_millis: i64,
75
76 /// Whether the user had verified their own identity at the point they
77 /// received the UTD event.
78 pub user_trusts_own_identity: bool,
79
80 /// The homeserver of the user that sent the undecryptable event.
81 pub sender_homeserver: OwnedServerName,
82
83 /// Our local user's own homeserver, or `None` if the client is not logged
84 /// in.
85 pub own_homeserver: Option<OwnedServerName>,
86}
87
88/// Data about a UTD event which we are waiting to report to the parent hook.
89#[derive(Debug)]
90struct PendingUtdReport {
91 /// The time that we received the UTD report from the timeline code.
92 marked_utd_at: Instant,
93
94 /// The task that will report this UTD to the parent hook.
95 report_task: JoinHandle<()>,
96
97 /// The UnableToDecryptInfo structure for this UTD event.
98 utd_info: UnableToDecryptInfo,
99}
100
101/// A manager over an existing [`UnableToDecryptHook`] that deduplicates UTDs
102/// on similar events, and adds basic consistency checks.
103///
104/// It can also implement a grace period before reporting an event as a UTD, if
105/// configured with [`Self::with_max_delay`]. Instead of immediately reporting
106/// the UTD, the reporting will be delayed by the max delay at most; if the
107/// event could eventually get decrypted, it may be reported before the end of
108/// that delay.
109#[derive(Debug)]
110pub struct UtdHookManager {
111 /// A Client associated with the UTD hook. This is used to access the store
112 /// which we persist our data to.
113 client: Client,
114
115 /// The parent hook we'll call, when we have found a unique UTD.
116 parent: Arc<dyn UnableToDecryptHook>,
117
118 /// An optional delay before marking the event as UTD ("grace period").
119 max_delay: Option<Duration>,
120
121 /// A mapping of events we're going to report as UTDs, to the tasks to do
122 /// so.
123 ///
124 /// Note: this is empty if no [`Self::max_delay`] is set.
125 ///
126 /// Note: this is theoretically unbounded in size, although this set of
127 /// tasks will degrow over time, as tasks expire after the max delay.
128 pending_delayed: Arc<Mutex<HashMap<OwnedEventId, PendingUtdReport>>>,
129
130 /// Bloom filter containing the event IDs of events which have been reported
131 /// as UTDs
132 reported_utds: Arc<AsyncMutex<GrowableBloom>>,
133}
134
135impl UtdHookManager {
136 /// Create a new [`UtdHookManager`] for the given hook.
137 ///
138 /// A [`Client`] must also be provided; this provides a link to the
139 /// [`matrix_sdk_base::StateStore`] which is used to load and store the
140 /// persistent data.
141 pub fn new(parent: Arc<dyn UnableToDecryptHook>, client: Client) -> Self {
142 let bloom_filter =
143 // Some slightly arbitrarily-chosen parameters here. We specify that, after 1000
144 // UTDs, we want to have a false-positive rate of 1%.
145 //
146 // The GrowableBloomFilter is based on a series of (partitioned) Bloom filters;
147 // once the first starts getting full (the expected false-positive
148 // rate gets too high), it adds another Bloom filter. Each new entry
149 // is recorded in the most recent Bloom filter; when querying, if
150 // *any* of the component filters show a match, that shows
151 // an overall match.
152 //
153 // The first component filter is created based on the parameters we give. For
154 // reasons derived in the paper [1], a partitioned Bloom filter with
155 // target false-positive rate `P` after `n` insertions requires a
156 // number of slices `k` given by:
157 //
158 // k = log2(1/P) = -ln(P) / ln(2)
159 //
160 // ... where each slice has a number of bits `m` given by
161 //
162 // m = n / ln(2)
163 //
164 // We have to have a whole number of slices and bits, so the total number of
165 // bits M is:
166 //
167 // M = ceil(k) * ceil(m)
168 // = ceil(-ln(P) / ln(2)) * ceil(n / ln(2))
169 //
170 // In other words, our FP rate of 1% after 1000 insertions requires:
171 //
172 // M = ceil(-ln(0.01) / ln(2)) * ceil(1000 / ln(2))
173 // = 7 * 1443 = 10101 bits
174 //
175 // So our filter starts off with 1263 bytes of data (plus a little overhead).
176 // Once we hit 1000 UTDs, we add a second component filter with a capacity
177 // double that of the original and target error rate 85% of the
178 // original (another 2526 bytes), which then lasts us until a total
179 // of 3000 UTDs.
180 //
181 // [1]: https://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf
182 GrowableBloomBuilder::new().estimated_insertions(1000).desired_error_ratio(0.01).build();
183
184 Self {
185 client,
186 parent,
187 max_delay: None,
188 pending_delayed: Default::default(),
189 reported_utds: Arc::new(AsyncMutex::new(bloom_filter)),
190 }
191 }
192
193 /// Reports UTDs with the given max delay.
194 ///
195 /// Note: late decryptions are always reported, even if there was a grace
196 /// period set for the reporting of the UTD.
197 pub fn with_max_delay(mut self, delay: Duration) -> Self {
198 self.max_delay = Some(delay);
199 self
200 }
201
202 /// Load the persistent data for the UTD hook from the store.
203 ///
204 /// If the client previously used a UtdHookManager, and UTDs were
205 /// encountered, the data on the reported UTDs is loaded from the store.
206 /// Otherwise, there is no effect.
207 pub async fn reload_from_store(&mut self) -> Result<(), StoreError> {
208 let existing_data =
209 self.client.state_store().get_kv_data(StateStoreDataKey::UtdHookManagerData).await?;
210
211 if let Some(existing_data) = existing_data {
212 let bloom_filter = existing_data
213 .into_utd_hook_manager_data()
214 .expect("StateStore::get_kv_data should return data of the right type");
215 self.reported_utds = Arc::new(AsyncMutex::new(bloom_filter));
216 }
217 Ok(())
218 }
219
220 /// The function to call whenever a UTD is seen for the first time.
221 ///
222 /// Pipe in any information that needs to be included in the final report.
223 ///
224 /// # Arguments
225 /// * `event_id` - The ID of the event that could not be decrypted.
226 /// * `cause` - Our best guess at the reason why the event can't be
227 /// decrypted.
228 /// * `event_timestamp` - The event's `origin_server_ts` field (or creation
229 /// time for local echo).
230 /// * `sender_user_id` - The Matrix user ID of the user that sent the
231 /// undecryptable message.
232 pub(crate) async fn on_utd(
233 &self,
234 event_id: &EventId,
235 cause: UtdCause,
236 event_timestamp: MilliSecondsSinceUnixEpoch,
237 sender_user_id: &UserId,
238 ) {
239 trace!(%event_id, "UtdHookManager: Observed UTD");
240 // Hold the lock on `reported_utds` throughout, to avoid races with other
241 // threads.
242 let mut reported_utds_lock = self.reported_utds.lock().await;
243
244 // Check if this, or a previous instance of UtdHookManager, has already reported
245 // this UTD, and bail out if not.
246 if reported_utds_lock.contains(event_id) {
247 return;
248 }
249
250 // Otherwise, check if we already have a task to handle this UTD.
251 if self.pending_delayed.lock().unwrap().contains_key(event_id) {
252 return;
253 }
254
255 let event_local_age_millis = i64::from(event_timestamp.get()).saturating_sub_unsigned(
256 self.client.encryption().device_creation_timestamp().await.get().into(),
257 );
258
259 let own_user_id = self.client.user_id();
260 let user_trusts_own_identity = if let Some(own_user_id) = own_user_id {
261 if let Ok(Some(own_id)) = self.client.encryption().get_user_identity(own_user_id).await
262 {
263 own_id.is_verified()
264 } else {
265 false
266 }
267 } else {
268 false
269 };
270
271 let own_homeserver = own_user_id.map(|id| id.server_name().to_owned());
272 let sender_homeserver = sender_user_id.server_name().to_owned();
273
274 let info = UnableToDecryptInfo {
275 event_id: event_id.to_owned(),
276 time_to_decrypt: None,
277 cause,
278 event_local_age_millis,
279 user_trusts_own_identity,
280 own_homeserver,
281 sender_homeserver,
282 };
283
284 let Some(max_delay) = self.max_delay else {
285 // No delay: immediately report the event to the parent hook.
286 Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
287 return;
288 };
289
290 // Clone data shared with the task below.
291 let pending_delayed = self.pending_delayed.clone();
292 let reported_utds = self.reported_utds.clone();
293 let parent = self.parent.clone();
294 let client = self.client.clone();
295 let owned_event_id = event_id.to_owned();
296
297 // Spawn a task that will wait for the given delay, and maybe call the parent
298 // hook then.
299 let handle = spawn(async move {
300 // Wait for the given delay.
301 sleep(max_delay).await;
302
303 // Make sure we take out the lock on `reported_utds` before removing the entry
304 // from `pending_delayed`, to ensure we don't race against another call to
305 // `on_utd` (which could otherwise see that the entry has been
306 // removed from `pending_delayed` but not yet added to
307 // `reported_utds`).
308 let mut reported_utds_lock = reported_utds.lock().await;
309
310 // Remove the task from the outstanding set. But if it's already been removed,
311 // it's been decrypted since the task was added!
312 let pending_report = pending_delayed.lock().unwrap().remove(&owned_event_id);
313 if let Some(pending_report) = pending_report {
314 Self::report_utd(
315 pending_report.utd_info,
316 &parent,
317 &client,
318 &mut reported_utds_lock,
319 )
320 .await;
321 }
322 });
323
324 // Add the task to the set of pending tasks.
325 self.pending_delayed.lock().unwrap().insert(
326 event_id.to_owned(),
327 PendingUtdReport { marked_utd_at: Instant::now(), report_task: handle, utd_info: info },
328 );
329 }
330
331 /// The function to call whenever an event that was marked as a UTD has
332 /// eventually been decrypted.
333 ///
334 /// Note: if this is called for an event that was never marked as a UTD
335 /// before, it has no effect.
336 pub(crate) async fn on_late_decrypt(&self, event_id: &EventId) {
337 trace!(%event_id, "UtdHookManager: On late decrypt");
338 // Hold the lock on `reported_utds` throughout, to avoid races with other
339 // threads.
340 let mut reported_utds_lock = self.reported_utds.lock().await;
341
342 // Only let the parent hook know about the late decryption if the event is
343 // a pending UTD. If so, remove the event from the pending list —
344 // doing so will cause the reporting task to no-op if it runs.
345 let Some(pending_utd_report) = self.pending_delayed.lock().unwrap().remove(event_id) else {
346 trace!(%event_id, "UtdHookManager: received a late decrypt report for an unknown utd");
347 return;
348 };
349
350 // We can also cancel the reporting task.
351 pending_utd_report.report_task.abort();
352
353 // Update the UTD Info struct with new data, then report it
354 let mut info = pending_utd_report.utd_info;
355 info.time_to_decrypt = Some(pending_utd_report.marked_utd_at.elapsed());
356 Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
357 }
358
359 /// Helper for [`UtdHookManager::on_utd`] and
360 /// [`UtdHookManager.on_late_decrypt`]: reports the UTD to the parent,
361 /// and records the event as reported.
362 ///
363 /// Must be called with the lock held on [`UtdHookManager::reported_utds`],
364 /// and takes a `MutexGuard` to enforce that.
365 async fn report_utd(
366 info: UnableToDecryptInfo,
367 parent_hook: &Arc<dyn UnableToDecryptHook>,
368 client: &Client,
369 reported_utds_lock: &mut MutexGuard<'_, GrowableBloom>,
370 ) {
371 let event_id = info.event_id.clone();
372 parent_hook.on_utd(info);
373 reported_utds_lock.insert(event_id);
374 if let Err(e) = client
375 .state_store()
376 .set_kv_data(
377 StateStoreDataKey::UtdHookManagerData,
378 StateStoreDataValue::UtdHookManagerData(reported_utds_lock.clone()),
379 )
380 .await
381 {
382 error!("Unable to persist UTD report data: {}", e);
383 }
384 }
385}
386
387impl Drop for UtdHookManager {
388 fn drop(&mut self) {
389 // Cancel all the outstanding delayed tasks to report UTDs.
390 //
391 // Here, we don't take the lock on `reported_utd`s (indeed, we can't, since
392 // `reported_utds` has an async mutex, and `drop` has to be sync), but
393 // that's ok. We can't race against `on_utd` or `on_late_decrypt`, since
394 // they both have `&self` references which mean `drop` can't be called.
395 // We *could* race against one of the actual tasks to report
396 // UTDs, but that's ok too: either the report task will bail out when it sees
397 // the entry has been removed from `pending_delayed` (which is fine), or the
398 // report task will successfully report the UTD (which is fine).
399 let mut pending_delayed = self.pending_delayed.lock().unwrap();
400 for (_, pending_utd_report) in pending_delayed.drain() {
401 pending_utd_report.report_task.abort();
402 }
403 }
404}
405
406#[cfg(test)]
407mod tests {
408 use matrix_sdk::test_utils::{logged_in_client, no_retry_test_client};
409 use matrix_sdk_test::async_test;
410 use ruma::{event_id, server_name, user_id};
411
412 use super::*;
413
414 #[derive(Debug, Default)]
415 struct Dummy {
416 utds: Mutex<Vec<UnableToDecryptInfo>>,
417 }
418
419 impl UnableToDecryptHook for Dummy {
420 fn on_utd(&self, info: UnableToDecryptInfo) {
421 self.utds.lock().unwrap().push(info);
422 }
423 }
424
425 #[async_test]
426 async fn test_deduplicates_utds() {
427 // If I create a dummy hook,
428 let hook = Arc::new(Dummy::default());
429
430 // And I wrap with the UtdHookManager,
431 let wrapper = UtdHookManager::new(hook.clone(), logged_in_client(None).await);
432
433 // And I call the `on_utd` method multiple times, sometimes on the same event,
434 let event_timestamp = MilliSecondsSinceUnixEpoch::now();
435 let sender_user = user_id!("@example2:localhost");
436 let federated_user = user_id!("@example2:example.com");
437 wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
438 wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
439 wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
440 wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
441 wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
442 wrapper.on_utd(event_id!("$3"), UtdCause::Unknown, event_timestamp, sender_user).await;
443
444 // Then the event ids have been deduplicated,
445 {
446 let utds = hook.utds.lock().unwrap();
447 assert_eq!(utds.len(), 3);
448 assert_eq!(utds[0].event_id, event_id!("$1"));
449 assert_eq!(utds[1].event_id, event_id!("$2"));
450 assert_eq!(utds[2].event_id, event_id!("$3"));
451
452 // No event is a late-decryption event.
453 assert!(utds[0].time_to_decrypt.is_none());
454 assert!(utds[1].time_to_decrypt.is_none());
455 assert!(utds[2].time_to_decrypt.is_none());
456
457 // event_local_age_millis should be a small positive number, because the
458 // timestamp we used was after we created the device
459 let utd_local_age = utds[0].event_local_age_millis;
460 assert!(utd_local_age >= 0);
461 assert!(utd_local_age <= 1000);
462
463 assert_eq!(utds[0].sender_homeserver, server_name!("localhost"));
464 assert_eq!(utds[0].own_homeserver, Some(server_name!("localhost").to_owned()));
465
466 assert_eq!(utds[1].sender_homeserver, server_name!("example.com"));
467 assert_eq!(utds[1].own_homeserver, Some(server_name!("localhost").to_owned()));
468 }
469 }
470
471 #[async_test]
472 async fn test_deduplicates_utds_from_previous_session() {
473 // Use a single client for both hooks, so that both hooks are backed by the same
474 // memorystore.
475 let client = no_retry_test_client(None).await;
476
477 // Dummy hook 1, with the first UtdHookManager
478 {
479 let hook = Arc::new(Dummy::default());
480 let wrapper = UtdHookManager::new(hook.clone(), client.clone());
481
482 // I call it a couple of times with different events
483 wrapper
484 .on_utd(
485 event_id!("$1"),
486 UtdCause::Unknown,
487 MilliSecondsSinceUnixEpoch::now(),
488 user_id!("@a:b"),
489 )
490 .await;
491 wrapper
492 .on_utd(
493 event_id!("$2"),
494 UtdCause::Unknown,
495 MilliSecondsSinceUnixEpoch::now(),
496 user_id!("@a:b"),
497 )
498 .await;
499
500 // Sanity-check the reported event IDs
501 {
502 let utds = hook.utds.lock().unwrap();
503 assert_eq!(utds.len(), 2);
504 assert_eq!(utds[0].event_id, event_id!("$1"));
505 assert!(utds[0].time_to_decrypt.is_none());
506 assert_eq!(utds[1].event_id, event_id!("$2"));
507 assert!(utds[1].time_to_decrypt.is_none());
508 }
509 }
510
511 // Now, create a *new* hook, with a *new* UtdHookManager
512 {
513 let hook = Arc::new(Dummy::default());
514 let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
515 wrapper.reload_from_store().await.unwrap();
516
517 // Call it with more events, some of which match the previous instance
518 wrapper
519 .on_utd(
520 event_id!("$1"),
521 UtdCause::Unknown,
522 MilliSecondsSinceUnixEpoch::now(),
523 user_id!("@a:b"),
524 )
525 .await;
526 wrapper
527 .on_utd(
528 event_id!("$3"),
529 UtdCause::Unknown,
530 MilliSecondsSinceUnixEpoch::now(),
531 user_id!("@a:b"),
532 )
533 .await;
534
535 // Only the *new* ones should be reported
536 let utds = hook.utds.lock().unwrap();
537 assert_eq!(utds.len(), 1);
538 assert_eq!(utds[0].event_id, event_id!("$3"));
539 }
540 }
541
542 /// Test that UTD events which had not yet been reported in a previous
543 /// session, are reported in the next session.
544 #[async_test]
545 async fn test_does_not_deduplicate_late_utds_from_previous_session() {
546 // Use a single client for both hooks, so that both hooks are backed by the same
547 // memorystore.
548 let client = no_retry_test_client(None).await;
549
550 // Dummy hook 1, with the first UtdHookManager
551 {
552 let hook = Arc::new(Dummy::default());
553 let wrapper = UtdHookManager::new(hook.clone(), client.clone())
554 .with_max_delay(Duration::from_secs(2));
555
556 // a UTD event
557 wrapper
558 .on_utd(
559 event_id!("$1"),
560 UtdCause::Unknown,
561 MilliSecondsSinceUnixEpoch::now(),
562 user_id!("@a:b"),
563 )
564 .await;
565
566 // The event ID should not yet have been reported.
567 {
568 let utds = hook.utds.lock().unwrap();
569 assert_eq!(utds.len(), 0);
570 }
571 }
572
573 // Now, create a *new* hook, with a *new* UtdHookManager
574 {
575 let hook = Arc::new(Dummy::default());
576 let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
577 wrapper.reload_from_store().await.unwrap();
578
579 // Call the new hook with the same event
580 wrapper
581 .on_utd(
582 event_id!("$1"),
583 UtdCause::Unknown,
584 MilliSecondsSinceUnixEpoch::now(),
585 user_id!("@a:b"),
586 )
587 .await;
588
589 // And it should be reported.
590 sleep(Duration::from_millis(2500)).await;
591
592 let utds = hook.utds.lock().unwrap();
593 assert_eq!(utds.len(), 1);
594 assert_eq!(utds[0].event_id, event_id!("$1"));
595 }
596 }
597
598 #[async_test]
599 async fn test_on_late_decrypted_no_effect() {
600 // If I create a dummy hook,
601 let hook = Arc::new(Dummy::default());
602
603 // And I wrap with the UtdHookManager,
604 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
605
606 // And I call the `on_late_decrypt` method before the event had been marked as
607 // utd,
608 wrapper.on_late_decrypt(event_id!("$1")).await;
609
610 // Then nothing is registered in the parent hook.
611 assert!(hook.utds.lock().unwrap().is_empty());
612 }
613
614 #[async_test]
615 async fn test_on_late_decrypted_after_utd_no_grace_period() {
616 // If I create a dummy hook,
617 let hook = Arc::new(Dummy::default());
618
619 // And I wrap with the UtdHookManager,
620 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
621
622 // And I call the `on_utd` method for an event,
623 wrapper
624 .on_utd(
625 event_id!("$1"),
626 UtdCause::Unknown,
627 MilliSecondsSinceUnixEpoch::now(),
628 user_id!("@a:b"),
629 )
630 .await;
631
632 // Then the UTD has been notified, but not as late-decrypted event.
633 {
634 let utds = hook.utds.lock().unwrap();
635 assert_eq!(utds.len(), 1);
636 assert_eq!(utds[0].event_id, event_id!("$1"));
637 assert!(utds[0].time_to_decrypt.is_none());
638 }
639
640 // And when I call the `on_late_decrypt` method,
641 wrapper.on_late_decrypt(event_id!("$1")).await;
642
643 // Then the event is not reported again as a late-decryption.
644 {
645 let utds = hook.utds.lock().unwrap();
646 assert_eq!(utds.len(), 1);
647
648 // The previous report is still there. (There was no grace period.)
649 assert_eq!(utds[0].event_id, event_id!("$1"));
650 assert!(utds[0].time_to_decrypt.is_none());
651 }
652 }
653
654 #[cfg(not(target_family = "wasm"))] // wasm32 has no time for that
655 #[async_test]
656 async fn test_delayed_utd() {
657 // If I create a dummy hook,
658 let hook = Arc::new(Dummy::default());
659
660 // And I wrap with the UtdHookManager, configured to delay reporting after 2
661 // seconds.
662 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
663 .with_max_delay(Duration::from_secs(2));
664
665 // And I call the `on_utd` method for an event,
666 wrapper
667 .on_utd(
668 event_id!("$1"),
669 UtdCause::Unknown,
670 MilliSecondsSinceUnixEpoch::now(),
671 user_id!("@a:b"),
672 )
673 .await;
674
675 // Then the UTD is not being reported immediately.
676 assert!(hook.utds.lock().unwrap().is_empty());
677 assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
678
679 // If I wait for 1 second, then it's still not been notified yet.
680 sleep(Duration::from_secs(1)).await;
681
682 assert!(hook.utds.lock().unwrap().is_empty());
683 assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
684
685 // But if I wait just a bit more, then it's getting notified as a definite UTD.
686 sleep(Duration::from_millis(1500)).await;
687
688 {
689 let utds = hook.utds.lock().unwrap();
690 assert_eq!(utds.len(), 1);
691 assert_eq!(utds[0].event_id, event_id!("$1"));
692 assert!(utds[0].time_to_decrypt.is_none());
693 }
694
695 assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
696 }
697
698 #[cfg(not(target_family = "wasm"))] // wasm32 has no time for that
699 #[async_test]
700 async fn test_delayed_late_decryption() {
701 // If I create a dummy hook,
702 let hook = Arc::new(Dummy::default());
703
704 // And I wrap with the UtdHookManager, configured to delay reporting after 2
705 // seconds.
706 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
707 .with_max_delay(Duration::from_secs(2));
708
709 // And I call the `on_utd` method for an event,
710 wrapper
711 .on_utd(
712 event_id!("$1"),
713 UtdCause::Unknown,
714 MilliSecondsSinceUnixEpoch::now(),
715 user_id!("@a:b"),
716 )
717 .await;
718
719 // Then the UTD has not been notified quite yet.
720 assert!(hook.utds.lock().unwrap().is_empty());
721 assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
722
723 // If I wait for 1 second, and mark the event as late-decrypted,
724 sleep(Duration::from_secs(1)).await;
725
726 wrapper.on_late_decrypt(event_id!("$1")).await;
727
728 // Then it's being immediately reported as a late-decryption UTD.
729 {
730 let utds = hook.utds.lock().unwrap();
731 assert_eq!(utds.len(), 1);
732 assert_eq!(utds[0].event_id, event_id!("$1"));
733 assert!(utds[0].time_to_decrypt.is_some());
734 }
735
736 // And there aren't any pending delayed reports anymore.
737 assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
738 }
739}