matrix_sdk_ui/unable_to_decrypt_hook.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! This module provides a generic interface to subscribe to unable-to-decrypt
16//! events, and notable updates to such events.
17//!
18//! This provides a general trait that a consumer may implement, as well as
19//! utilities to simplify usage of this trait.
20
21use std::{
22 collections::HashMap,
23 sync::{Arc, Mutex},
24};
25
26use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
27use matrix_sdk::{
28 crypto::types::events::UtdCause,
29 executor::{spawn, JoinHandle},
30 sleep::sleep,
31 Client,
32};
33use matrix_sdk_base::{StateStoreDataKey, StateStoreDataValue, StoreError};
34use ruma::{
35 time::{Duration, Instant},
36 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedServerName, UserId,
37};
38use tokio::sync::{Mutex as AsyncMutex, MutexGuard};
39use tracing::error;
40
41/// A generic interface which methods get called whenever we observe a
42/// unable-to-decrypt (UTD) event.
43pub trait UnableToDecryptHook: std::fmt::Debug + Send + Sync {
44 /// Called every time the hook observes an encrypted event that couldn't be
45 /// decrypted.
46 ///
47 /// If the hook manager was configured with a max delay, this could also
48 /// contain extra information for late-decrypted events. See details in
49 /// [`UnableToDecryptInfo::time_to_decrypt`].
50 fn on_utd(&self, info: UnableToDecryptInfo);
51}
52
53/// Information about an event we were unable to decrypt (UTD).
54#[derive(Clone, Debug, Hash, PartialEq, Eq)]
55pub struct UnableToDecryptInfo {
56 /// The identifier of the event that couldn't get decrypted.
57 pub event_id: OwnedEventId,
58
59 /// If the event could be decrypted late (that is, the event was encrypted
60 /// at first, but could be decrypted later on), then this indicates the
61 /// time it took to decrypt the event. If it is not set, this is
62 /// considered a definite UTD.
63 pub time_to_decrypt: Option<Duration>,
64
65 /// What we know about what caused this UTD. E.g. was this event sent when
66 /// we were not a member of this room?
67 pub cause: UtdCause,
68
69 /// The difference between the event creation time (`origin_server_ts`) and
70 /// the time our device was created. If negative, this event was sent
71 /// *before* our device was created.
72 pub event_local_age_millis: i64,
73
74 /// Whether the user had verified their own identity at the point they
75 /// received the UTD event.
76 pub user_trusts_own_identity: bool,
77
78 /// The homeserver of the user that sent the undecryptable event.
79 pub sender_homeserver: OwnedServerName,
80
81 /// Our local user's own homeserver, or `None` if the client is not logged
82 /// in.
83 pub own_homeserver: Option<OwnedServerName>,
84}
85
86/// Data about a UTD event which we are waiting to report to the parent hook.
87#[derive(Debug)]
88struct PendingUtdReport {
89 /// The time that we received the UTD report from the timeline code.
90 marked_utd_at: Instant,
91
92 /// The task that will report this UTD to the parent hook.
93 report_task: JoinHandle<()>,
94
95 /// The UnableToDecryptInfo structure for this UTD event.
96 utd_info: UnableToDecryptInfo,
97}
98
99/// A manager over an existing [`UnableToDecryptHook`] that deduplicates UTDs
100/// on similar events, and adds basic consistency checks.
101///
102/// It can also implement a grace period before reporting an event as a UTD, if
103/// configured with [`Self::with_max_delay`]. Instead of immediately reporting
104/// the UTD, the reporting will be delayed by the max delay at most; if the
105/// event could eventually get decrypted, it may be reported before the end of
106/// that delay.
107#[derive(Debug)]
108pub struct UtdHookManager {
109 /// A Client associated with the UTD hook. This is used to access the store
110 /// which we persist our data to.
111 client: Client,
112
113 /// The parent hook we'll call, when we have found a unique UTD.
114 parent: Arc<dyn UnableToDecryptHook>,
115
116 /// An optional delay before marking the event as UTD ("grace period").
117 max_delay: Option<Duration>,
118
119 /// A mapping of events we're going to report as UTDs, to the tasks to do
120 /// so.
121 ///
122 /// Note: this is empty if no [`Self::max_delay`] is set.
123 ///
124 /// Note: this is theoretically unbounded in size, although this set of
125 /// tasks will degrow over time, as tasks expire after the max delay.
126 pending_delayed: Arc<Mutex<HashMap<OwnedEventId, PendingUtdReport>>>,
127
128 /// Bloom filter containing the event IDs of events which have been reported
129 /// as UTDs
130 reported_utds: Arc<AsyncMutex<GrowableBloom>>,
131}
132
133impl UtdHookManager {
134 /// Create a new [`UtdHookManager`] for the given hook.
135 ///
136 /// A [`Client`] must also be provided; this provides a link to the
137 /// [`matrix_sdk_base::StateStore`] which is used to load and store the
138 /// persistent data.
139 pub fn new(parent: Arc<dyn UnableToDecryptHook>, client: Client) -> Self {
140 let bloom_filter =
141 // Some slightly arbitrarily-chosen parameters here. We specify that, after 1000
142 // UTDs, we want to have a false-positive rate of 1%.
143 //
144 // The GrowableBloomFilter is based on a series of (partitioned) Bloom filters;
145 // once the first starts getting full (the expected false-positive
146 // rate gets too high), it adds another Bloom filter. Each new entry
147 // is recorded in the most recent Bloom filter; when querying, if
148 // *any* of the component filters show a match, that shows
149 // an overall match.
150 //
151 // The first component filter is created based on the parameters we give. For
152 // reasons derived in the paper [1], a partitioned Bloom filter with
153 // target false-positive rate `P` after `n` insertions requires a
154 // number of slices `k` given by:
155 //
156 // k = log2(1/P) = -ln(P) / ln(2)
157 //
158 // ... where each slice has a number of bits `m` given by
159 //
160 // m = n / ln(2)
161 //
162 // We have to have a whole number of slices and bits, so the total number of
163 // bits M is:
164 //
165 // M = ceil(k) * ceil(m)
166 // = ceil(-ln(P) / ln(2)) * ceil(n / ln(2))
167 //
168 // In other words, our FP rate of 1% after 1000 insertions requires:
169 //
170 // M = ceil(-ln(0.01) / ln(2)) * ceil(1000 / ln(2))
171 // = 7 * 1443 = 10101 bits
172 //
173 // So our filter starts off with 1263 bytes of data (plus a little overhead).
174 // Once we hit 1000 UTDs, we add a second component filter with a capacity
175 // double that of the original and target error rate 85% of the
176 // original (another 2526 bytes), which then lasts us until a total
177 // of 3000 UTDs.
178 //
179 // [1]: https://gsd.di.uminho.pt/members/cbm/ps/dbloom.pdf
180 GrowableBloomBuilder::new().estimated_insertions(1000).desired_error_ratio(0.01).build();
181
182 Self {
183 client,
184 parent,
185 max_delay: None,
186 pending_delayed: Default::default(),
187 reported_utds: Arc::new(AsyncMutex::new(bloom_filter)),
188 }
189 }
190
191 /// Reports UTDs with the given max delay.
192 ///
193 /// Note: late decryptions are always reported, even if there was a grace
194 /// period set for the reporting of the UTD.
195 pub fn with_max_delay(mut self, delay: Duration) -> Self {
196 self.max_delay = Some(delay);
197 self
198 }
199
200 /// Load the persistent data for the UTD hook from the store.
201 ///
202 /// If the client previously used a UtdHookManager, and UTDs were
203 /// encountered, the data on the reported UTDs is loaded from the store.
204 /// Otherwise, there is no effect.
205 pub async fn reload_from_store(&mut self) -> Result<(), StoreError> {
206 let existing_data =
207 self.client.store().get_kv_data(StateStoreDataKey::UtdHookManagerData).await?;
208
209 if let Some(existing_data) = existing_data {
210 let bloom_filter = existing_data
211 .into_utd_hook_manager_data()
212 .expect("StateStore::get_kv_data should return data of the right type");
213 self.reported_utds = Arc::new(AsyncMutex::new(bloom_filter));
214 }
215 Ok(())
216 }
217
218 /// The function to call whenever a UTD is seen for the first time.
219 ///
220 /// Pipe in any information that needs to be included in the final report.
221 ///
222 /// # Arguments
223 /// * `event_id` - The ID of the event that could not be decrypted.
224 /// * `cause` - Our best guess at the reason why the event can't be
225 /// decrypted.
226 /// * `event_timestamp` - The event's `origin_server_ts` field (or creation
227 /// time for local echo).
228 /// * `sender_user_id` - The Matrix user ID of the user that sent the
229 /// undecryptable message.
230 pub(crate) async fn on_utd(
231 &self,
232 event_id: &EventId,
233 cause: UtdCause,
234 event_timestamp: MilliSecondsSinceUnixEpoch,
235 sender_user_id: &UserId,
236 ) {
237 // Hold the lock on `reported_utds` throughout, to avoid races with other
238 // threads.
239 let mut reported_utds_lock = self.reported_utds.lock().await;
240
241 // Check if this, or a previous instance of UtdHookManager, has already reported
242 // this UTD, and bail out if not.
243 if reported_utds_lock.contains(event_id) {
244 return;
245 }
246
247 // Otherwise, check if we already have a task to handle this UTD.
248 if self.pending_delayed.lock().unwrap().contains_key(event_id) {
249 return;
250 }
251
252 let event_local_age_millis = i64::from(event_timestamp.get()).saturating_sub_unsigned(
253 self.client.encryption().device_creation_timestamp().await.get().into(),
254 );
255
256 let own_user_id = self.client.user_id();
257 let user_trusts_own_identity = if let Some(own_user_id) = own_user_id {
258 if let Ok(Some(own_id)) = self.client.encryption().get_user_identity(own_user_id).await
259 {
260 own_id.is_verified()
261 } else {
262 false
263 }
264 } else {
265 false
266 };
267
268 let own_homeserver = own_user_id.map(|id| id.server_name().to_owned());
269 let sender_homeserver = sender_user_id.server_name().to_owned();
270
271 let info = UnableToDecryptInfo {
272 event_id: event_id.to_owned(),
273 time_to_decrypt: None,
274 cause,
275 event_local_age_millis,
276 user_trusts_own_identity,
277 own_homeserver,
278 sender_homeserver,
279 };
280
281 let Some(max_delay) = self.max_delay else {
282 // No delay: immediately report the event to the parent hook.
283 Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
284 return;
285 };
286
287 // Clone data shared with the task below.
288 let pending_delayed = self.pending_delayed.clone();
289 let reported_utds = self.reported_utds.clone();
290 let parent = self.parent.clone();
291 let client = self.client.clone();
292 let owned_event_id = event_id.to_owned();
293
294 // Spawn a task that will wait for the given delay, and maybe call the parent
295 // hook then.
296 let handle = spawn(async move {
297 // Wait for the given delay.
298 sleep(max_delay).await;
299
300 // Make sure we take out the lock on `reported_utds` before removing the entry
301 // from `pending_delayed`, to ensure we don't race against another call to
302 // `on_utd` (which could otherwise see that the entry has been
303 // removed from `pending_delayed` but not yet added to
304 // `reported_utds`).
305 let mut reported_utds_lock = reported_utds.lock().await;
306
307 // Remove the task from the outstanding set. But if it's already been removed,
308 // it's been decrypted since the task was added!
309 let pending_report = pending_delayed.lock().unwrap().remove(&owned_event_id);
310 if let Some(pending_report) = pending_report {
311 Self::report_utd(
312 pending_report.utd_info,
313 &parent,
314 &client,
315 &mut reported_utds_lock,
316 )
317 .await;
318 }
319 });
320
321 // Add the task to the set of pending tasks.
322 self.pending_delayed.lock().unwrap().insert(
323 event_id.to_owned(),
324 PendingUtdReport { marked_utd_at: Instant::now(), report_task: handle, utd_info: info },
325 );
326 }
327
328 /// The function to call whenever an event that was marked as a UTD has
329 /// eventually been decrypted.
330 ///
331 /// Note: if this is called for an event that was never marked as a UTD
332 /// before, it has no effect.
333 pub(crate) async fn on_late_decrypt(&self, event_id: &EventId) {
334 // Hold the lock on `reported_utds` throughout, to avoid races with other
335 // threads.
336 let mut reported_utds_lock = self.reported_utds.lock().await;
337
338 // Only let the parent hook know about the late decryption if the event is
339 // a pending UTD. If so, remove the event from the pending list —
340 // doing so will cause the reporting task to no-op if it runs.
341 let Some(pending_utd_report) = self.pending_delayed.lock().unwrap().remove(event_id) else {
342 return;
343 };
344
345 // We can also cancel the reporting task.
346 pending_utd_report.report_task.abort();
347
348 // Update the UTD Info struct with new data, then report it
349 let mut info = pending_utd_report.utd_info;
350 info.time_to_decrypt = Some(pending_utd_report.marked_utd_at.elapsed());
351 Self::report_utd(info, &self.parent, &self.client, &mut reported_utds_lock).await;
352 }
353
354 /// Helper for [`UtdHookManager::on_utd`] and
355 /// [`UtdHookManager.on_late_decrypt`]: reports the UTD to the parent,
356 /// and records the event as reported.
357 ///
358 /// Must be called with the lock held on [`UtdHookManager::reported_utds`],
359 /// and takes a `MutexGuard` to enforce that.
360 async fn report_utd(
361 info: UnableToDecryptInfo,
362 parent_hook: &Arc<dyn UnableToDecryptHook>,
363 client: &Client,
364 reported_utds_lock: &mut MutexGuard<'_, GrowableBloom>,
365 ) {
366 let event_id = info.event_id.clone();
367 parent_hook.on_utd(info);
368 reported_utds_lock.insert(event_id);
369 if let Err(e) = client
370 .store()
371 .set_kv_data(
372 StateStoreDataKey::UtdHookManagerData,
373 StateStoreDataValue::UtdHookManagerData(reported_utds_lock.clone()),
374 )
375 .await
376 {
377 error!("Unable to persist UTD report data: {}", e);
378 }
379 }
380}
381
382impl Drop for UtdHookManager {
383 fn drop(&mut self) {
384 // Cancel all the outstanding delayed tasks to report UTDs.
385 //
386 // Here, we don't take the lock on `reported_utd`s (indeed, we can't, since
387 // `reported_utds` has an async mutex, and `drop` has to be sync), but
388 // that's ok. We can't race against `on_utd` or `on_late_decrypt`, since
389 // they both have `&self` references which mean `drop` can't be called.
390 // We *could* race against one of the actual tasks to report
391 // UTDs, but that's ok too: either the report task will bail out when it sees
392 // the entry has been removed from `pending_delayed` (which is fine), or the
393 // report task will successfully report the UTD (which is fine).
394 let mut pending_delayed = self.pending_delayed.lock().unwrap();
395 for (_, pending_utd_report) in pending_delayed.drain() {
396 pending_utd_report.report_task.abort();
397 }
398 }
399}
400
401#[cfg(test)]
402mod tests {
403 use matrix_sdk::test_utils::{logged_in_client, no_retry_test_client};
404 use matrix_sdk_test::async_test;
405 use ruma::{event_id, server_name, user_id};
406
407 use super::*;
408
409 #[derive(Debug, Default)]
410 struct Dummy {
411 utds: Mutex<Vec<UnableToDecryptInfo>>,
412 }
413
414 impl UnableToDecryptHook for Dummy {
415 fn on_utd(&self, info: UnableToDecryptInfo) {
416 self.utds.lock().unwrap().push(info);
417 }
418 }
419
420 #[async_test]
421 async fn test_deduplicates_utds() {
422 // If I create a dummy hook,
423 let hook = Arc::new(Dummy::default());
424
425 // And I wrap with the UtdHookManager,
426 let wrapper = UtdHookManager::new(hook.clone(), logged_in_client(None).await);
427
428 // And I call the `on_utd` method multiple times, sometimes on the same event,
429 let event_timestamp = MilliSecondsSinceUnixEpoch::now();
430 let sender_user = user_id!("@example2:localhost");
431 let federated_user = user_id!("@example2:example.com");
432 wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
433 wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
434 wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
435 wrapper.on_utd(event_id!("$1"), UtdCause::Unknown, event_timestamp, sender_user).await;
436 wrapper.on_utd(event_id!("$2"), UtdCause::Unknown, event_timestamp, federated_user).await;
437 wrapper.on_utd(event_id!("$3"), UtdCause::Unknown, event_timestamp, sender_user).await;
438
439 // Then the event ids have been deduplicated,
440 {
441 let utds = hook.utds.lock().unwrap();
442 assert_eq!(utds.len(), 3);
443 assert_eq!(utds[0].event_id, event_id!("$1"));
444 assert_eq!(utds[1].event_id, event_id!("$2"));
445 assert_eq!(utds[2].event_id, event_id!("$3"));
446
447 // No event is a late-decryption event.
448 assert!(utds[0].time_to_decrypt.is_none());
449 assert!(utds[1].time_to_decrypt.is_none());
450 assert!(utds[2].time_to_decrypt.is_none());
451
452 // event_local_age_millis should be a small positive number, because the
453 // timestamp we used was after we created the device
454 let utd_local_age = utds[0].event_local_age_millis;
455 assert!(utd_local_age >= 0);
456 assert!(utd_local_age <= 1000);
457
458 assert_eq!(utds[0].sender_homeserver, server_name!("localhost"));
459 assert_eq!(utds[0].own_homeserver, Some(server_name!("localhost").to_owned()));
460
461 assert_eq!(utds[1].sender_homeserver, server_name!("example.com"));
462 assert_eq!(utds[1].own_homeserver, Some(server_name!("localhost").to_owned()));
463 }
464 }
465
466 #[async_test]
467 async fn test_deduplicates_utds_from_previous_session() {
468 // Use a single client for both hooks, so that both hooks are backed by the same
469 // memorystore.
470 let client = no_retry_test_client(None).await;
471
472 // Dummy hook 1, with the first UtdHookManager
473 {
474 let hook = Arc::new(Dummy::default());
475 let wrapper = UtdHookManager::new(hook.clone(), client.clone());
476
477 // I call it a couple of times with different events
478 wrapper
479 .on_utd(
480 event_id!("$1"),
481 UtdCause::Unknown,
482 MilliSecondsSinceUnixEpoch::now(),
483 user_id!("@a:b"),
484 )
485 .await;
486 wrapper
487 .on_utd(
488 event_id!("$2"),
489 UtdCause::Unknown,
490 MilliSecondsSinceUnixEpoch::now(),
491 user_id!("@a:b"),
492 )
493 .await;
494
495 // Sanity-check the reported event IDs
496 {
497 let utds = hook.utds.lock().unwrap();
498 assert_eq!(utds.len(), 2);
499 assert_eq!(utds[0].event_id, event_id!("$1"));
500 assert!(utds[0].time_to_decrypt.is_none());
501 assert_eq!(utds[1].event_id, event_id!("$2"));
502 assert!(utds[1].time_to_decrypt.is_none());
503 }
504 }
505
506 // Now, create a *new* hook, with a *new* UtdHookManager
507 {
508 let hook = Arc::new(Dummy::default());
509 let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
510 wrapper.reload_from_store().await.unwrap();
511
512 // Call it with more events, some of which match the previous instance
513 wrapper
514 .on_utd(
515 event_id!("$1"),
516 UtdCause::Unknown,
517 MilliSecondsSinceUnixEpoch::now(),
518 user_id!("@a:b"),
519 )
520 .await;
521 wrapper
522 .on_utd(
523 event_id!("$3"),
524 UtdCause::Unknown,
525 MilliSecondsSinceUnixEpoch::now(),
526 user_id!("@a:b"),
527 )
528 .await;
529
530 // Only the *new* ones should be reported
531 let utds = hook.utds.lock().unwrap();
532 assert_eq!(utds.len(), 1);
533 assert_eq!(utds[0].event_id, event_id!("$3"));
534 }
535 }
536
537 /// Test that UTD events which had not yet been reported in a previous
538 /// session, are reported in the next session.
539 #[async_test]
540 async fn test_does_not_deduplicate_late_utds_from_previous_session() {
541 // Use a single client for both hooks, so that both hooks are backed by the same
542 // memorystore.
543 let client = no_retry_test_client(None).await;
544
545 // Dummy hook 1, with the first UtdHookManager
546 {
547 let hook = Arc::new(Dummy::default());
548 let wrapper = UtdHookManager::new(hook.clone(), client.clone())
549 .with_max_delay(Duration::from_secs(2));
550
551 // a UTD event
552 wrapper
553 .on_utd(
554 event_id!("$1"),
555 UtdCause::Unknown,
556 MilliSecondsSinceUnixEpoch::now(),
557 user_id!("@a:b"),
558 )
559 .await;
560
561 // The event ID should not yet have been reported.
562 {
563 let utds = hook.utds.lock().unwrap();
564 assert_eq!(utds.len(), 0);
565 }
566 }
567
568 // Now, create a *new* hook, with a *new* UtdHookManager
569 {
570 let hook = Arc::new(Dummy::default());
571 let mut wrapper = UtdHookManager::new(hook.clone(), client.clone());
572 wrapper.reload_from_store().await.unwrap();
573
574 // Call the new hook with the same event
575 wrapper
576 .on_utd(
577 event_id!("$1"),
578 UtdCause::Unknown,
579 MilliSecondsSinceUnixEpoch::now(),
580 user_id!("@a:b"),
581 )
582 .await;
583
584 // And it should be reported.
585 sleep(Duration::from_millis(2500)).await;
586
587 let utds = hook.utds.lock().unwrap();
588 assert_eq!(utds.len(), 1);
589 assert_eq!(utds[0].event_id, event_id!("$1"));
590 }
591 }
592
593 #[async_test]
594 async fn test_on_late_decrypted_no_effect() {
595 // If I create a dummy hook,
596 let hook = Arc::new(Dummy::default());
597
598 // And I wrap with the UtdHookManager,
599 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
600
601 // And I call the `on_late_decrypt` method before the event had been marked as
602 // utd,
603 wrapper.on_late_decrypt(event_id!("$1")).await;
604
605 // Then nothing is registered in the parent hook.
606 assert!(hook.utds.lock().unwrap().is_empty());
607 }
608
609 #[async_test]
610 async fn test_on_late_decrypted_after_utd_no_grace_period() {
611 // If I create a dummy hook,
612 let hook = Arc::new(Dummy::default());
613
614 // And I wrap with the UtdHookManager,
615 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await);
616
617 // And I call the `on_utd` method for an event,
618 wrapper
619 .on_utd(
620 event_id!("$1"),
621 UtdCause::Unknown,
622 MilliSecondsSinceUnixEpoch::now(),
623 user_id!("@a:b"),
624 )
625 .await;
626
627 // Then the UTD has been notified, but not as late-decrypted event.
628 {
629 let utds = hook.utds.lock().unwrap();
630 assert_eq!(utds.len(), 1);
631 assert_eq!(utds[0].event_id, event_id!("$1"));
632 assert!(utds[0].time_to_decrypt.is_none());
633 }
634
635 // And when I call the `on_late_decrypt` method,
636 wrapper.on_late_decrypt(event_id!("$1")).await;
637
638 // Then the event is not reported again as a late-decryption.
639 {
640 let utds = hook.utds.lock().unwrap();
641 assert_eq!(utds.len(), 1);
642
643 // The previous report is still there. (There was no grace period.)
644 assert_eq!(utds[0].event_id, event_id!("$1"));
645 assert!(utds[0].time_to_decrypt.is_none());
646 }
647 }
648
649 #[cfg(not(target_arch = "wasm32"))] // wasm32 has no time for that
650 #[async_test]
651 async fn test_delayed_utd() {
652 // If I create a dummy hook,
653 let hook = Arc::new(Dummy::default());
654
655 // And I wrap with the UtdHookManager, configured to delay reporting after 2
656 // seconds.
657 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
658 .with_max_delay(Duration::from_secs(2));
659
660 // And I call the `on_utd` method for an event,
661 wrapper
662 .on_utd(
663 event_id!("$1"),
664 UtdCause::Unknown,
665 MilliSecondsSinceUnixEpoch::now(),
666 user_id!("@a:b"),
667 )
668 .await;
669
670 // Then the UTD is not being reported immediately.
671 assert!(hook.utds.lock().unwrap().is_empty());
672 assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
673
674 // If I wait for 1 second, then it's still not been notified yet.
675 sleep(Duration::from_secs(1)).await;
676
677 assert!(hook.utds.lock().unwrap().is_empty());
678 assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
679
680 // But if I wait just a bit more, then it's getting notified as a definite UTD.
681 sleep(Duration::from_millis(1500)).await;
682
683 {
684 let utds = hook.utds.lock().unwrap();
685 assert_eq!(utds.len(), 1);
686 assert_eq!(utds[0].event_id, event_id!("$1"));
687 assert!(utds[0].time_to_decrypt.is_none());
688 }
689
690 assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
691 }
692
693 #[cfg(not(target_arch = "wasm32"))] // wasm32 has no time for that
694 #[async_test]
695 async fn test_delayed_late_decryption() {
696 // If I create a dummy hook,
697 let hook = Arc::new(Dummy::default());
698
699 // And I wrap with the UtdHookManager, configured to delay reporting after 2
700 // seconds.
701 let wrapper = UtdHookManager::new(hook.clone(), no_retry_test_client(None).await)
702 .with_max_delay(Duration::from_secs(2));
703
704 // And I call the `on_utd` method for an event,
705 wrapper
706 .on_utd(
707 event_id!("$1"),
708 UtdCause::Unknown,
709 MilliSecondsSinceUnixEpoch::now(),
710 user_id!("@a:b"),
711 )
712 .await;
713
714 // Then the UTD has not been notified quite yet.
715 assert!(hook.utds.lock().unwrap().is_empty());
716 assert_eq!(wrapper.pending_delayed.lock().unwrap().len(), 1);
717
718 // If I wait for 1 second, and mark the event as late-decrypted,
719 sleep(Duration::from_secs(1)).await;
720
721 wrapper.on_late_decrypt(event_id!("$1")).await;
722
723 // Then it's being immediately reported as a late-decryption UTD.
724 {
725 let utds = hook.utds.lock().unwrap();
726 assert_eq!(utds.len(), 1);
727 assert_eq!(utds[0].event_id, event_id!("$1"));
728 assert!(utds[0].time_to_decrypt.is_some());
729 }
730
731 // And there aren't any pending delayed reports anymore.
732 assert!(wrapper.pending_delayed.lock().unwrap().is_empty());
733 }
734}