1use std::{collections::BTreeSet, fmt, sync::Mutex};
19
20use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
21use matrix_sdk_base::{event_cache::store::EventCacheStoreLock, linked_chunk::Position};
22use ruma::{OwnedEventId, OwnedRoomId};
23use tracing::{debug, error};
24
25use super::{
26 room::events::{Event, RoomEvents},
27 EventCacheError,
28};
29
30pub enum Deduplicator {
32 InMemory(BloomFilterDeduplicator),
33 PersistentStore(StoreDeduplicator),
34}
35
36impl Deduplicator {
37 pub fn new_memory_based() -> Self {
47 Self::InMemory(BloomFilterDeduplicator::new())
48 }
49
50 pub fn new_store_based(room_id: OwnedRoomId, store: EventCacheStoreLock) -> Self {
59 Self::PersistentStore(StoreDeduplicator { room_id, store })
60 }
61
62 pub async fn filter_duplicate_events(
66 &self,
67 mut events: Vec<Event>,
68 room_events: &RoomEvents,
69 ) -> Result<DeduplicationOutcome, EventCacheError> {
70 {
74 let mut event_ids = BTreeSet::new();
75
76 events.retain(|event| {
77 let Some(event_id) = event.event_id() else {
78 return false;
80 };
81
82 if event_ids.contains(&event_id) {
84 return false;
85 }
86
87 event_ids.insert(event_id);
88
89 true
91 });
92 }
93
94 Ok(match self {
95 Deduplicator::InMemory(dedup) => dedup.filter_duplicate_events(events, room_events),
96 Deduplicator::PersistentStore(dedup) => {
97 dedup.filter_duplicate_events(events, room_events).await?
98 }
99 })
100 }
101}
102
103pub struct StoreDeduplicator {
109 room_id: OwnedRoomId,
111 store: EventCacheStoreLock,
113}
114
115impl StoreDeduplicator {
116 async fn filter_duplicate_events(
117 &self,
118 events: Vec<Event>,
119 room_events: &RoomEvents,
120 ) -> Result<DeduplicationOutcome, EventCacheError> {
121 let store = self.store.lock().await?;
122
123 let duplicated_event_ids = store
125 .filter_duplicated_events(
126 &self.room_id,
127 events.iter().filter_map(|event| event.event_id()).collect(),
128 )
129 .await?;
130
131 let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
134 let in_memory_chunk_identifiers =
136 room_events.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
137
138 let mut in_memory = vec![];
139 let mut in_store = vec![];
140
141 for (duplicated_event_id, position) in duplicated_event_ids {
142 if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
143 in_memory.push((duplicated_event_id, position));
144 } else {
145 in_store.push((duplicated_event_id, position));
146 }
147 }
148
149 (in_memory, in_store)
150 };
151
152 Ok(DeduplicationOutcome {
153 all_events: events,
154 in_memory_duplicated_event_ids,
155 in_store_duplicated_event_ids,
156 })
157 }
158}
159
160pub struct BloomFilterDeduplicator {
171 bloom_filter: Mutex<GrowableBloom>,
172}
173
174impl fmt::Debug for BloomFilterDeduplicator {
175 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
176 formatter.debug_struct("Deduplicator").finish_non_exhaustive()
177 }
178}
179
180impl BloomFilterDeduplicator {
181 const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 1_000;
184 const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.01;
185
186 fn new() -> Self {
188 let bloom_filter = GrowableBloomBuilder::new()
189 .estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
190 .desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
191 .build();
192 Self { bloom_filter: Mutex::new(bloom_filter) }
193 }
194
195 fn filter_duplicate_events(
199 &self,
200 events: Vec<Event>,
201 room_events: &RoomEvents,
202 ) -> DeduplicationOutcome {
203 let mut duplicated_event_ids = Vec::new();
204
205 let events = self
206 .scan_and_learn(events.into_iter(), room_events)
207 .map(|decorated_event| match decorated_event {
208 Decoration::Unique(event) => event,
209 Decoration::Duplicated((event, position)) => {
210 debug!(event_id = ?event.event_id(), "Found a duplicated event");
211
212 let event_id = event
213 .event_id()
214 .expect("The event has no ID");
218
219 duplicated_event_ids.push((event_id, position));
220
221 event
223 }
224 })
225 .collect::<Vec<_>>();
226
227 DeduplicationOutcome {
228 all_events: events,
229 in_memory_duplicated_event_ids: duplicated_event_ids,
230 in_store_duplicated_event_ids: vec![],
231 }
232 }
233
234 fn scan_and_learn<'a, I>(
245 &'a self,
246 new_events_to_scan: I,
247 existing_events: &'a RoomEvents,
248 ) -> impl Iterator<Item = Decoration<I::Item>> + 'a
249 where
250 I: Iterator<Item = Event> + 'a,
251 {
252 new_events_to_scan.filter_map(move |event| {
253 let Some(event_id) = event.event_id() else {
254 error!(?event, "Found an event with no ID");
257 return None;
258 };
259
260 Some(if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
261 let position_of_the_duplicated_event =
270 existing_events.revents().find_map(|(position, other_event)| {
271 (other_event.event_id().as_ref() == Some(&event_id)).then_some(position)
272 });
273
274 if let Some(position) = position_of_the_duplicated_event {
275 Decoration::Duplicated((event, position))
276 } else {
277 Decoration::Unique(event)
278 }
279 } else {
280 Decoration::Unique(event)
283 })
284 })
285 }
286}
287
288#[derive(Debug)]
290enum Decoration<I> {
291 Unique(I),
293
294 Duplicated((I, Position)),
296}
297
298pub(super) struct DeduplicationOutcome {
299 pub all_events: Vec<Event>,
305
306 pub in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
312
313 pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
320}
321
322#[cfg(test)]
323mod tests {
324 use assert_matches2::{assert_let, assert_matches};
325 use matrix_sdk_base::{deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier};
326 use matrix_sdk_test::{async_test, event_factory::EventFactory};
327 use ruma::{owned_event_id, serde::Raw, user_id, EventId};
328
329 use super::*;
330
331 fn timeline_event(event_id: &EventId) -> TimelineEvent {
332 EventFactory::new()
333 .text_msg("")
334 .sender(user_id!("@mnt_io:matrix.org"))
335 .event_id(event_id)
336 .into_event()
337 }
338
339 #[async_test]
340 async fn test_filter_find_duplicates_in_the_input() {
341 let event_id_0 = owned_event_id!("$ev0");
342 let event_id_1 = owned_event_id!("$ev1");
343
344 let event_0 = timeline_event(&event_id_0);
345 let event_1 = timeline_event(&event_id_1);
346
347 let deduplicator = Deduplicator::new_memory_based();
350 let room_events = RoomEvents::new();
351
352 let outcome = deduplicator
353 .filter_duplicate_events(
354 vec![
355 event_0.clone(), event_1, event_0, ],
359 &room_events,
360 )
361 .await
362 .unwrap();
363
364 assert_eq!(outcome.all_events.len(), 2);
366 assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0));
367 assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1));
368 }
369
370 #[async_test]
371 async fn test_filter_exclude_invalid_events_from_the_input() {
372 let event_id_0 = owned_event_id!("$ev0");
373 let event_id_1 = owned_event_id!("$ev1");
374
375 let event_0 = timeline_event(&event_id_0);
376 let event_1 = timeline_event(&event_id_1);
377 let event_2 = TimelineEvent::new(Raw::from_json_string("{}".to_owned()).unwrap());
379
380 let deduplicator = Deduplicator::new_memory_based();
383 let room_events = RoomEvents::new();
384
385 let outcome = deduplicator
386 .filter_duplicate_events(
387 vec![
388 event_0.clone(), event_1, event_2, ],
392 &room_events,
393 )
394 .await
395 .unwrap();
396
397 assert_eq!(outcome.all_events.len(), 2);
399 assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0));
400 assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1));
401 }
402
403 #[async_test]
404 async fn test_memory_based_duplicated_event_ids_from_in_memory_vs_in_store() {
405 let event_id_0 = owned_event_id!("$ev0");
406 let event_id_1 = owned_event_id!("$ev1");
407
408 let event_0 = timeline_event(&event_id_0);
409 let event_1 = timeline_event(&event_id_1);
410
411 let mut deduplicator = Deduplicator::new_memory_based();
412 let mut room_events = RoomEvents::new();
413 {
416 let Deduplicator::InMemory(bloom_filter) = &mut deduplicator else {
417 panic!("test is broken, but sky is beautiful");
418 };
419 bloom_filter.bloom_filter.lock().unwrap().insert(event_id_0.clone());
420 room_events.push_events([event_0.clone()]);
421 }
422
423 let outcome = deduplicator
424 .filter_duplicate_events(vec![event_0, event_1], &room_events)
425 .await
426 .unwrap();
427
428 assert_eq!(outcome.all_events.len(), 2);
430 assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
431 assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1));
432
433 assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 1);
435 assert_eq!(
436 outcome.in_memory_duplicated_event_ids[0],
437 (event_id_0, Position::new(ChunkIdentifier::new(0), 0))
438 );
439
440 assert!(outcome.in_store_duplicated_event_ids.is_empty());
445 }
446
447 #[cfg(not(target_arch = "wasm32"))] #[async_test]
449 async fn test_store_based_duplicated_event_ids_from_in_memory_vs_in_store() {
450 use std::sync::Arc;
451
452 use matrix_sdk_base::{
453 event_cache::store::{EventCacheStore, MemoryStore},
454 linked_chunk::Update,
455 };
456 use ruma::room_id;
457
458 let event_id_0 = owned_event_id!("$ev0");
459 let event_id_1 = owned_event_id!("$ev1");
460 let event_id_2 = owned_event_id!("$ev2");
461 let event_id_3 = owned_event_id!("$ev3");
462 let event_id_4 = owned_event_id!("$ev4");
463
464 let event_0 = timeline_event(&event_id_0);
469 let event_1 = timeline_event(&event_id_1);
470 let event_2 = timeline_event(&event_id_2);
471 let event_3 = timeline_event(&event_id_3);
472 let event_4 = timeline_event(&event_id_4);
473
474 let event_cache_store = Arc::new(MemoryStore::new());
475 let room_id = room_id!("!fondue:raclette.ch");
476
477 event_cache_store
479 .handle_linked_chunk_updates(
480 room_id,
481 vec![
482 Update::NewItemsChunk {
483 previous: None,
484 new: ChunkIdentifier::new(42),
485 next: None,
486 },
487 Update::PushItems {
488 at: Position::new(ChunkIdentifier::new(42), 0),
489 items: vec![event_0.clone(), event_1.clone()],
490 },
491 Update::NewItemsChunk {
492 previous: Some(ChunkIdentifier::new(42)),
493 new: ChunkIdentifier::new(0), next: None,
495 },
496 Update::PushItems {
497 at: Position::new(ChunkIdentifier::new(0), 0),
498 items: vec![event_2.clone(), event_3.clone()],
499 },
500 ],
501 )
502 .await
503 .unwrap();
504
505 let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
506
507 let deduplicator = Deduplicator::new_store_based(room_id.to_owned(), event_cache_store);
508 let mut room_events = RoomEvents::new();
509 room_events.push_events([event_2.clone(), event_3.clone()]);
510
511 let outcome = deduplicator
512 .filter_duplicate_events(
513 vec![event_0, event_1, event_2, event_3, event_4],
514 &room_events,
515 )
516 .await
517 .unwrap();
518
519 assert_eq!(outcome.all_events.len(), 5);
521 assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
522 assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1.clone()));
523 assert_eq!(outcome.all_events[2].event_id(), Some(event_id_2.clone()));
524 assert_eq!(outcome.all_events[3].event_id(), Some(event_id_3.clone()));
525 assert_eq!(outcome.all_events[4].event_id(), Some(event_id_4.clone()));
526
527 assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
531 assert_eq!(
532 outcome.in_memory_duplicated_event_ids[0],
533 (event_id_2, Position::new(ChunkIdentifier::new(0), 0))
534 );
535 assert_eq!(
536 outcome.in_memory_duplicated_event_ids[1],
537 (event_id_3, Position::new(ChunkIdentifier::new(0), 1))
538 );
539
540 assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
545 assert_eq!(
546 outcome.in_store_duplicated_event_ids[0],
547 (event_id_0, Position::new(ChunkIdentifier::new(42), 0))
548 );
549 assert_eq!(
550 outcome.in_store_duplicated_event_ids[1],
551 (event_id_1, Position::new(ChunkIdentifier::new(42), 1))
552 );
553 }
554
555 #[test]
556 fn test_bloom_filter_no_duplicate() {
557 let event_id_0 = owned_event_id!("$ev0");
558 let event_id_1 = owned_event_id!("$ev1");
559 let event_id_2 = owned_event_id!("$ev2");
560
561 let event_0 = timeline_event(&event_id_0);
562 let event_1 = timeline_event(&event_id_1);
563 let event_2 = timeline_event(&event_id_2);
564
565 let deduplicator = BloomFilterDeduplicator::new();
566 let existing_events = RoomEvents::new();
567
568 let mut events =
569 deduplicator.scan_and_learn([event_0, event_1, event_2].into_iter(), &existing_events);
570
571 assert_let!(Some(Decoration::Unique(event)) = events.next());
572 assert_eq!(event.event_id(), Some(event_id_0));
573
574 assert_let!(Some(Decoration::Unique(event)) = events.next());
575 assert_eq!(event.event_id(), Some(event_id_1));
576
577 assert_let!(Some(Decoration::Unique(event)) = events.next());
578 assert_eq!(event.event_id(), Some(event_id_2));
579
580 assert!(events.next().is_none());
581 }
582
583 #[test]
584 fn test_bloom_filter_growth() {
585 let num_rooms = if let Ok(num_rooms) = std::env::var("ROOMS") {
590 num_rooms.parse().unwrap()
591 } else {
592 10
593 };
594
595 let num_events = if let Ok(num_events) = std::env::var("EVENTS") {
596 num_events.parse().unwrap()
597 } else {
598 100
599 };
600
601 let mut dedups = Vec::with_capacity(num_rooms);
602
603 for _ in 0..num_rooms {
604 let dedup = BloomFilterDeduplicator::new();
605 let existing_events = RoomEvents::new();
606
607 for i in 0..num_events {
608 let event = timeline_event(&EventId::parse(format!("$event{i}")).unwrap());
609 let mut it = dedup.scan_and_learn([event].into_iter(), &existing_events);
610
611 assert_matches!(it.next(), Some(Decoration::Unique(..)));
612 assert_matches!(it.next(), None);
613 }
614
615 dedups.push(dedup);
616 }
617 }
618
619 #[cfg(not(target_arch = "wasm32"))] #[async_test]
621 async fn test_storage_deduplication() {
622 use std::sync::Arc;
623
624 use matrix_sdk_base::{
625 event_cache::store::{EventCacheStore as _, MemoryStore},
626 linked_chunk::{ChunkIdentifier, Position, Update},
627 };
628 use matrix_sdk_test::{ALICE, BOB};
629 use ruma::{event_id, room_id};
630
631 let room_id = room_id!("!galette:saucisse.bzh");
632 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
633
634 let event_cache_store = Arc::new(MemoryStore::new());
635
636 let eid1 = event_id!("$1");
637 let eid2 = event_id!("$2");
638 let eid3 = event_id!("$3");
639
640 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
641 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
642 let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
643 let ev4 = TimelineEvent::new(Raw::from_json_string("{}".to_owned()).unwrap());
645
646 event_cache_store
648 .handle_linked_chunk_updates(
649 room_id,
650 vec![
651 Update::NewItemsChunk {
653 previous: None,
654 new: ChunkIdentifier::new(42),
655 next: None,
656 },
657 Update::PushItems {
658 at: Position::new(ChunkIdentifier::new(42), 0),
659 items: vec![ev1.clone()],
660 },
661 Update::NewItemsChunk {
663 previous: Some(ChunkIdentifier::new(42)),
664 new: ChunkIdentifier::new(43),
665 next: None,
666 },
667 Update::PushItems {
668 at: Position::new(ChunkIdentifier::new(43), 0),
669 items: vec![ev2.clone()],
670 },
671 ],
672 )
673 .await
674 .unwrap();
675
676 let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
678
679 let deduplicator = Deduplicator::new_store_based(room_id.to_owned(), event_cache_store);
680
681 let room_events = RoomEvents::new();
682 let DeduplicationOutcome {
683 all_events: events,
684 in_memory_duplicated_event_ids,
685 in_store_duplicated_event_ids,
686 } = deduplicator
687 .filter_duplicate_events(vec![ev1, ev2, ev3, ev4], &room_events)
688 .await
689 .unwrap();
690
691 assert_eq!(events.len(), 3);
692 assert_eq!(events[0].event_id().as_deref(), Some(eid1));
693 assert_eq!(events[1].event_id().as_deref(), Some(eid2));
694 assert_eq!(events[2].event_id().as_deref(), Some(eid3));
695
696 assert!(in_memory_duplicated_event_ids.is_empty());
697
698 assert_eq!(in_store_duplicated_event_ids.len(), 2);
699 assert_eq!(
700 in_store_duplicated_event_ids[0],
701 (eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
702 );
703 assert_eq!(
704 in_store_duplicated_event_ids[1],
705 (eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
706 );
707 }
708}