1use std::{collections::BTreeSet, fmt, sync::Mutex};
19
20use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
21use matrix_sdk_base::event_cache::store::EventCacheStoreLock;
22use ruma::{OwnedEventId, OwnedRoomId};
23use tracing::{debug, warn};
24
25use super::{
26 room::events::{Event, RoomEvents},
27 EventCacheError,
28};
29
30pub enum Deduplicator {
31 InMemory(BloomFilterDeduplicator),
32 PersistentStore(StoreDeduplicator),
33}
34
35impl Deduplicator {
36 pub fn new_memory_based() -> Self {
46 Self::InMemory(BloomFilterDeduplicator::new())
47 }
48
49 pub fn new_store_based(room_id: OwnedRoomId, store: EventCacheStoreLock) -> Self {
58 Self::PersistentStore(StoreDeduplicator { room_id, store })
59 }
60
61 pub async fn filter_duplicate_events(
65 &self,
66 events: Vec<Event>,
67 room_events: &RoomEvents,
68 ) -> Result<(Vec<Event>, Vec<OwnedEventId>), EventCacheError> {
69 match self {
70 Deduplicator::InMemory(dedup) => Ok(dedup.filter_duplicate_events(events, room_events)),
71 Deduplicator::PersistentStore(dedup) => dedup.filter_duplicate_events(events).await,
72 }
73 }
74}
75
76pub struct StoreDeduplicator {
82 room_id: OwnedRoomId,
84 store: EventCacheStoreLock,
86}
87
88impl StoreDeduplicator {
89 async fn filter_duplicate_events(
90 &self,
91 mut events: Vec<Event>,
92 ) -> Result<(Vec<Event>, Vec<OwnedEventId>), EventCacheError> {
93 let store = self.store.lock().await?;
94
95 let mut event_ids = Vec::new();
98
99 events.retain(|event| {
100 if let Some(event_id) = event.event_id() {
101 event_ids.push(event_id);
102 true
103 } else {
104 false
105 }
106 });
107
108 let duplicates = store.filter_duplicated_events(&self.room_id, event_ids).await?;
110
111 Ok((events, duplicates))
112 }
113}
114
115pub struct BloomFilterDeduplicator {
126 bloom_filter: Mutex<GrowableBloom>,
127}
128
129impl fmt::Debug for BloomFilterDeduplicator {
130 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
131 formatter.debug_struct("Deduplicator").finish_non_exhaustive()
132 }
133}
134
135impl BloomFilterDeduplicator {
136 const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 1_000;
139 const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.01;
140
141 fn new() -> Self {
143 let bloom_filter = GrowableBloomBuilder::new()
144 .estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
145 .desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
146 .build();
147 Self { bloom_filter: Mutex::new(bloom_filter) }
148 }
149
150 fn filter_duplicate_events(
154 &self,
155 events: Vec<Event>,
156 room_events: &RoomEvents,
157 ) -> (Vec<Event>, Vec<OwnedEventId>) {
158 let mut duplicated_event_ids = Vec::new();
159
160 let events = self
161 .scan_and_learn(events.into_iter(), room_events)
162 .filter_map(|decorated_event| match decorated_event {
163 Decoration::Unique(event) => Some(event),
164 Decoration::Duplicated(event) => {
165 debug!(event_id = ?event.event_id(), "Found a duplicated event");
166
167 duplicated_event_ids.push(
168 event
169 .event_id()
170 .expect("The event has no ID"),
174 );
175
176 Some(event)
178 }
179 Decoration::Invalid(event) => {
180 warn!(?event, "Found an event with no ID");
181 None
182 }
183 })
184 .collect::<Vec<_>>();
185
186 (events, duplicated_event_ids)
187 }
188
189 fn scan_and_learn<'a, I>(
200 &'a self,
201 new_events_to_scan: I,
202 existing_events: &'a RoomEvents,
203 ) -> impl Iterator<Item = Decoration<I::Item>> + 'a
204 where
205 I: Iterator<Item = Event> + 'a,
206 {
207 let mut new_scanned_events = BTreeSet::new();
210
211 new_events_to_scan.map(move |event| {
212 let Some(event_id) = event.event_id() else {
213 return Decoration::Invalid(event);
215 };
216
217 if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
218 if new_scanned_events.contains(&event_id) {
228 return Decoration::Duplicated(event);
230 }
231
232 let duplicated = existing_events.revents().any(|(_position, other_event)| {
235 other_event.event_id().as_ref() == Some(&event_id)
236 });
237
238 new_scanned_events.insert(event_id);
239
240 if duplicated {
241 Decoration::Duplicated(event)
242 } else {
243 Decoration::Unique(event)
244 }
245 } else {
246 new_scanned_events.insert(event_id);
247
248 Decoration::Unique(event)
251 }
252 })
253 }
254}
255
256#[derive(Debug)]
258enum Decoration<I> {
259 Unique(I),
261
262 Duplicated(I),
264
265 Invalid(I),
267}
268
269#[cfg(test)]
270mod tests {
271 use assert_matches2::{assert_let, assert_matches};
272 use matrix_sdk_base::deserialized_responses::TimelineEvent;
273 use matrix_sdk_test::event_factory::EventFactory;
274 use ruma::{owned_event_id, user_id, EventId};
275
276 use super::*;
277
278 fn sync_timeline_event(event_id: &EventId) -> TimelineEvent {
279 EventFactory::new()
280 .text_msg("")
281 .sender(user_id!("@mnt_io:matrix.org"))
282 .event_id(event_id)
283 .into_event()
284 }
285
286 #[test]
287 fn test_filter_no_duplicate() {
288 let event_id_0 = owned_event_id!("$ev0");
289 let event_id_1 = owned_event_id!("$ev1");
290 let event_id_2 = owned_event_id!("$ev2");
291
292 let event_0 = sync_timeline_event(&event_id_0);
293 let event_1 = sync_timeline_event(&event_id_1);
294 let event_2 = sync_timeline_event(&event_id_2);
295
296 let deduplicator = BloomFilterDeduplicator::new();
297 let existing_events = RoomEvents::new();
298
299 let mut events =
300 deduplicator.scan_and_learn([event_0, event_1, event_2].into_iter(), &existing_events);
301
302 assert_let!(Some(Decoration::Unique(event)) = events.next());
303 assert_eq!(event.event_id(), Some(event_id_0));
304
305 assert_let!(Some(Decoration::Unique(event)) = events.next());
306 assert_eq!(event.event_id(), Some(event_id_1));
307
308 assert_let!(Some(Decoration::Unique(event)) = events.next());
309 assert_eq!(event.event_id(), Some(event_id_2));
310
311 assert!(events.next().is_none());
312 }
313
314 #[test]
315 fn test_filter_duplicates_in_new_events() {
316 let event_id_0 = owned_event_id!("$ev0");
317 let event_id_1 = owned_event_id!("$ev1");
318
319 let event_0 = sync_timeline_event(&event_id_0);
320 let event_1 = sync_timeline_event(&event_id_1);
321
322 let deduplicator = BloomFilterDeduplicator::new();
323 let existing_events = RoomEvents::new();
324
325 let mut events = deduplicator.scan_and_learn(
326 [
327 event_0.clone(), event_0, event_1, ]
331 .into_iter(),
332 &existing_events,
333 );
334
335 assert_let!(Some(Decoration::Unique(event)) = events.next());
336 assert_eq!(event.event_id(), Some(event_id_0.clone()));
337
338 assert_let!(Some(Decoration::Duplicated(event)) = events.next());
339 assert_eq!(event.event_id(), Some(event_id_0));
340
341 assert_let!(Some(Decoration::Unique(event)) = events.next());
342 assert_eq!(event.event_id(), Some(event_id_1));
343
344 assert!(events.next().is_none());
345 }
346
347 #[test]
348 fn test_filter_duplicates_with_existing_events() {
349 let event_id_0 = owned_event_id!("$ev0");
350 let event_id_1 = owned_event_id!("$ev1");
351 let event_id_2 = owned_event_id!("$ev2");
352
353 let event_0 = sync_timeline_event(&event_id_0);
354 let event_1 = sync_timeline_event(&event_id_1);
355 let event_2 = sync_timeline_event(&event_id_2);
356
357 let deduplicator = BloomFilterDeduplicator::new();
358 let mut existing_events = RoomEvents::new();
359
360 {
362 let mut events =
363 deduplicator.scan_and_learn([event_1.clone()].into_iter(), &existing_events);
364
365 assert_let!(Some(Decoration::Unique(event_1)) = events.next());
366 assert_eq!(event_1.event_id(), Some(event_id_1.clone()));
367
368 assert!(events.next().is_none());
369
370 drop(events); existing_events.push_events([event_1]);
374 }
375
376 {
378 let mut events = deduplicator.scan_and_learn(
379 [
380 event_0, event_1, event_2, ]
384 .into_iter(),
385 &existing_events,
386 );
387
388 assert_let!(Some(Decoration::Unique(event)) = events.next());
389 assert_eq!(event.event_id(), Some(event_id_0));
390
391 assert_let!(Some(Decoration::Duplicated(event)) = events.next());
392 assert_eq!(event.event_id(), Some(event_id_1));
393
394 assert_let!(Some(Decoration::Unique(event)) = events.next());
395 assert_eq!(event.event_id(), Some(event_id_2));
396
397 assert!(events.next().is_none());
398 }
399 }
400
401 #[test]
402 fn test_bloom_filter_growth() {
403 let num_rooms = if let Ok(num_rooms) = std::env::var("ROOMS") {
408 num_rooms.parse().unwrap()
409 } else {
410 10
411 };
412
413 let num_events = if let Ok(num_events) = std::env::var("EVENTS") {
414 num_events.parse().unwrap()
415 } else {
416 100
417 };
418
419 let mut dedups = Vec::with_capacity(num_rooms);
420
421 for _ in 0..num_rooms {
422 let dedup = BloomFilterDeduplicator::new();
423 let existing_events = RoomEvents::new();
424
425 for i in 0..num_events {
426 let event = sync_timeline_event(&EventId::parse(format!("$event{i}")).unwrap());
427 let mut it = dedup.scan_and_learn([event].into_iter(), &existing_events);
428
429 assert_matches!(it.next(), Some(Decoration::Unique(..)));
430 assert_matches!(it.next(), None);
431 }
432
433 dedups.push(dedup);
434 }
435 }
436
437 #[cfg(not(target_arch = "wasm32"))] #[matrix_sdk_test::async_test]
439 async fn test_storage_deduplication() {
440 use std::sync::Arc;
441
442 use matrix_sdk_base::{
443 event_cache::store::{EventCacheStore as _, MemoryStore},
444 linked_chunk::{ChunkIdentifier, Position, Update},
445 };
446 use matrix_sdk_test::{ALICE, BOB};
447 use ruma::{event_id, room_id, serde::Raw};
448
449 let room_id = room_id!("!galette:saucisse.bzh");
450 let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
451
452 let event_cache_store = Arc::new(MemoryStore::new());
453
454 let eid1 = event_id!("$1");
455 let eid2 = event_id!("$2");
456 let eid3 = event_id!("$3");
457
458 let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
459 let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
460 let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
461 let ev4 = TimelineEvent::new(Raw::from_json_string("{}".to_owned()).unwrap());
463
464 event_cache_store
466 .handle_linked_chunk_updates(
467 room_id,
468 vec![
469 Update::NewItemsChunk {
471 previous: None,
472 new: ChunkIdentifier::new(0),
473 next: None,
474 },
475 Update::PushItems {
476 at: Position::new(ChunkIdentifier::new(0), 0),
477 items: vec![ev1.clone()],
478 },
479 Update::NewItemsChunk {
481 previous: Some(ChunkIdentifier::new(0)),
482 new: ChunkIdentifier::new(1),
483 next: None,
484 },
485 Update::PushItems {
486 at: Position::new(ChunkIdentifier::new(1), 0),
487 items: vec![ev2.clone()],
488 },
489 ],
490 )
491 .await
492 .unwrap();
493
494 let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
496
497 let deduplicator =
498 StoreDeduplicator { room_id: room_id.to_owned(), store: event_cache_store };
499
500 let (valid_events, duplicates) =
501 deduplicator.filter_duplicate_events(vec![ev1, ev2, ev3, ev4]).await.unwrap();
502
503 assert_eq!(valid_events.len(), 3);
504 assert_eq!(valid_events[0].event_id().as_deref(), Some(eid1));
505 assert_eq!(valid_events[1].event_id().as_deref(), Some(eid2));
506 assert_eq!(valid_events[2].event_id().as_deref(), Some(eid3));
507
508 assert_eq!(duplicates.len(), 2);
509 assert_eq!(duplicates[0], eid1);
510 assert_eq!(duplicates[1], eid2);
511 }
512}