matrix_sdk/event_cache/
deduplicator.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Simple but efficient types to find duplicated events. See [`Deduplicator`]
16//! to learn more.
17
18use std::{collections::BTreeSet, fmt, sync::Mutex};
19
20use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
21use matrix_sdk_base::event_cache::store::EventCacheStoreLock;
22use ruma::{OwnedEventId, OwnedRoomId};
23use tracing::{debug, warn};
24
25use super::{
26    room::events::{Event, RoomEvents},
27    EventCacheError,
28};
29
30pub enum Deduplicator {
31    InMemory(BloomFilterDeduplicator),
32    PersistentStore(StoreDeduplicator),
33}
34
35impl Deduplicator {
36    /// Create an empty deduplicator instance that uses an internal Bloom
37    /// filter.
38    ///
39    /// Such a deduplicator is stateful, with no initial known events, and it
40    /// will learn over time by using a Bloom filter which events are
41    /// duplicates or not.
42    ///
43    /// When the persistent storage of the event cache is enabled by default,
44    /// this constructor (and the associated variant) will be removed.
45    pub fn new_memory_based() -> Self {
46        Self::InMemory(BloomFilterDeduplicator::new())
47    }
48
49    /// Create new store-based deduplicator that will run queries against the
50    /// store to find if any event is deduplicated or not.
51    ///
52    /// This deduplicator is stateless.
53    ///
54    /// When the persistent storage of the event cache is enabled by default,
55    /// this will become the default, and [`Deduplicator`] will be replaced
56    /// with [`StoreDeduplicator`].
57    pub fn new_store_based(room_id: OwnedRoomId, store: EventCacheStoreLock) -> Self {
58        Self::PersistentStore(StoreDeduplicator { room_id, store })
59    }
60
61    /// Find duplicates in the given collection of events, and return both
62    /// valid events (those with an event id) as well as the event ids of
63    /// duplicate events.
64    pub async fn filter_duplicate_events(
65        &self,
66        events: Vec<Event>,
67        room_events: &RoomEvents,
68    ) -> Result<(Vec<Event>, Vec<OwnedEventId>), EventCacheError> {
69        match self {
70            Deduplicator::InMemory(dedup) => Ok(dedup.filter_duplicate_events(events, room_events)),
71            Deduplicator::PersistentStore(dedup) => dedup.filter_duplicate_events(events).await,
72        }
73    }
74}
75
76/// A deduplication mechanism based on the persistent storage associated to the
77/// event cache.
78///
79/// It will use queries to the persistent storage to figure when events are
80/// duplicates or not, making it entirely stateless.
81pub struct StoreDeduplicator {
82    /// The room this deduplicator applies to.
83    room_id: OwnedRoomId,
84    /// The actual event cache store implementation used to query events.
85    store: EventCacheStoreLock,
86}
87
88impl StoreDeduplicator {
89    async fn filter_duplicate_events(
90        &self,
91        mut events: Vec<Event>,
92    ) -> Result<(Vec<Event>, Vec<OwnedEventId>), EventCacheError> {
93        let store = self.store.lock().await?;
94
95        // Collect event ids as we "validate" events (i.e. check they have a valid event
96        // id.)
97        let mut event_ids = Vec::new();
98
99        events.retain(|event| {
100            if let Some(event_id) = event.event_id() {
101                event_ids.push(event_id);
102                true
103            } else {
104                false
105            }
106        });
107
108        // Let the store do its magic ✨
109        let duplicates = store.filter_duplicated_events(&self.room_id, event_ids).await?;
110
111        Ok((events, duplicates))
112    }
113}
114
115/// `BloomFilterDeduplicator` is an efficient type to find duplicated events,
116/// using an in-memory cache.
117///
118/// It uses a [bloom filter] to provide a memory efficient probabilistic answer
119/// to: “has event E been seen already?”. False positives are possible, while
120/// false negatives are impossible. In the case of a positive reply, we fallback
121/// to a linear (backward) search on all events to check whether it's a false
122/// positive or not
123///
124/// [bloom filter]: https://en.wikipedia.org/wiki/Bloom_filter
125pub struct BloomFilterDeduplicator {
126    bloom_filter: Mutex<GrowableBloom>,
127}
128
129impl fmt::Debug for BloomFilterDeduplicator {
130    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
131        formatter.debug_struct("Deduplicator").finish_non_exhaustive()
132    }
133}
134
135impl BloomFilterDeduplicator {
136    // Note: don't use too high numbers here, or the amount of allocated memory will
137    // explode. See https://github.com/matrix-org/matrix-rust-sdk/pull/4231 for details.
138    const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 1_000;
139    const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.01;
140
141    /// Create a new `Deduplicator` with no prior knowledge of known events.
142    fn new() -> Self {
143        let bloom_filter = GrowableBloomBuilder::new()
144            .estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
145            .desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
146            .build();
147        Self { bloom_filter: Mutex::new(bloom_filter) }
148    }
149
150    /// Find duplicates in the given collection of events, and return both
151    /// valid events (those with an event id) as well as the event ids of
152    /// duplicate events.
153    fn filter_duplicate_events(
154        &self,
155        events: Vec<Event>,
156        room_events: &RoomEvents,
157    ) -> (Vec<Event>, Vec<OwnedEventId>) {
158        let mut duplicated_event_ids = Vec::new();
159
160        let events = self
161            .scan_and_learn(events.into_iter(), room_events)
162            .filter_map(|decorated_event| match decorated_event {
163                Decoration::Unique(event) => Some(event),
164                Decoration::Duplicated(event) => {
165                    debug!(event_id = ?event.event_id(), "Found a duplicated event");
166
167                    duplicated_event_ids.push(
168                        event
169                            .event_id()
170                            // SAFETY: An event with no ID is decorated as
171                            // `Decoration::Invalid`. Thus, it's
172                            // safe to unwrap the `Option<OwnedEventId>` here.
173                            .expect("The event has no ID"),
174                    );
175
176                    // Keep the new event!
177                    Some(event)
178                }
179                Decoration::Invalid(event) => {
180                    warn!(?event, "Found an event with no ID");
181                    None
182                }
183            })
184            .collect::<Vec<_>>();
185
186        (events, duplicated_event_ids)
187    }
188
189    /// Scan a collection of events and detect duplications.
190    ///
191    /// This method takes a collection of events `new_events_to_scan` and
192    /// returns a new collection of events, where each event is decorated by
193    /// a [`Decoration`], so that the caller can decide what to do with
194    /// these events.
195    ///
196    /// Each scanned event will update `Self`'s internal state.
197    ///
198    /// `existing_events` represents all events of a room that already exist.
199    fn scan_and_learn<'a, I>(
200        &'a self,
201        new_events_to_scan: I,
202        existing_events: &'a RoomEvents,
203    ) -> impl Iterator<Item = Decoration<I::Item>> + 'a
204    where
205        I: Iterator<Item = Event> + 'a,
206    {
207        // `new_scanned_events` is not a field of `Self` because it is used only detect
208        // duplicates in `new_events_to_scan`.
209        let mut new_scanned_events = BTreeSet::new();
210
211        new_events_to_scan.map(move |event| {
212            let Some(event_id) = event.event_id() else {
213                // The event has no `event_id`.
214                return Decoration::Invalid(event);
215            };
216
217            if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
218                // Oh oh, it looks like we have found a duplicate!
219                //
220                // However, bloom filters have false positives. We are NOT sure the event is NOT
221                // present. Even if the false positive rate is low, we need to
222                // iterate over all events to ensure it isn't present.
223
224                // First, let's ensure `event` is not a duplicate from `new_events_to_scan`,
225                // i.e. if the iterator itself contains duplicated events! We use a `BTreeSet`,
226                // otherwise using a bloom filter again may generate false positives.
227                if new_scanned_events.contains(&event_id) {
228                    // The iterator contains a duplicated `event`.
229                    return Decoration::Duplicated(event);
230                }
231
232                // Second, we can iterate over all events to ensure `event` is not present in
233                // `existing_events`.
234                let duplicated = existing_events.revents().any(|(_position, other_event)| {
235                    other_event.event_id().as_ref() == Some(&event_id)
236                });
237
238                new_scanned_events.insert(event_id);
239
240                if duplicated {
241                    Decoration::Duplicated(event)
242                } else {
243                    Decoration::Unique(event)
244                }
245            } else {
246                new_scanned_events.insert(event_id);
247
248                // Bloom filter has no false negatives. We are sure the event is NOT present: we
249                // can keep it in the iterator.
250                Decoration::Unique(event)
251            }
252        })
253    }
254}
255
256/// Information about the scanned collection of events.
257#[derive(Debug)]
258enum Decoration<I> {
259    /// This event is not duplicated.
260    Unique(I),
261
262    /// This event is duplicated.
263    Duplicated(I),
264
265    /// This event is invalid (i.e. not well formed).
266    Invalid(I),
267}
268
269#[cfg(test)]
270mod tests {
271    use assert_matches2::{assert_let, assert_matches};
272    use matrix_sdk_base::deserialized_responses::TimelineEvent;
273    use matrix_sdk_test::event_factory::EventFactory;
274    use ruma::{owned_event_id, user_id, EventId};
275
276    use super::*;
277
278    fn sync_timeline_event(event_id: &EventId) -> TimelineEvent {
279        EventFactory::new()
280            .text_msg("")
281            .sender(user_id!("@mnt_io:matrix.org"))
282            .event_id(event_id)
283            .into_event()
284    }
285
286    #[test]
287    fn test_filter_no_duplicate() {
288        let event_id_0 = owned_event_id!("$ev0");
289        let event_id_1 = owned_event_id!("$ev1");
290        let event_id_2 = owned_event_id!("$ev2");
291
292        let event_0 = sync_timeline_event(&event_id_0);
293        let event_1 = sync_timeline_event(&event_id_1);
294        let event_2 = sync_timeline_event(&event_id_2);
295
296        let deduplicator = BloomFilterDeduplicator::new();
297        let existing_events = RoomEvents::new();
298
299        let mut events =
300            deduplicator.scan_and_learn([event_0, event_1, event_2].into_iter(), &existing_events);
301
302        assert_let!(Some(Decoration::Unique(event)) = events.next());
303        assert_eq!(event.event_id(), Some(event_id_0));
304
305        assert_let!(Some(Decoration::Unique(event)) = events.next());
306        assert_eq!(event.event_id(), Some(event_id_1));
307
308        assert_let!(Some(Decoration::Unique(event)) = events.next());
309        assert_eq!(event.event_id(), Some(event_id_2));
310
311        assert!(events.next().is_none());
312    }
313
314    #[test]
315    fn test_filter_duplicates_in_new_events() {
316        let event_id_0 = owned_event_id!("$ev0");
317        let event_id_1 = owned_event_id!("$ev1");
318
319        let event_0 = sync_timeline_event(&event_id_0);
320        let event_1 = sync_timeline_event(&event_id_1);
321
322        let deduplicator = BloomFilterDeduplicator::new();
323        let existing_events = RoomEvents::new();
324
325        let mut events = deduplicator.scan_and_learn(
326            [
327                event_0.clone(), // OK
328                event_0,         // Not OK
329                event_1,         // OK
330            ]
331            .into_iter(),
332            &existing_events,
333        );
334
335        assert_let!(Some(Decoration::Unique(event)) = events.next());
336        assert_eq!(event.event_id(), Some(event_id_0.clone()));
337
338        assert_let!(Some(Decoration::Duplicated(event)) = events.next());
339        assert_eq!(event.event_id(), Some(event_id_0));
340
341        assert_let!(Some(Decoration::Unique(event)) = events.next());
342        assert_eq!(event.event_id(), Some(event_id_1));
343
344        assert!(events.next().is_none());
345    }
346
347    #[test]
348    fn test_filter_duplicates_with_existing_events() {
349        let event_id_0 = owned_event_id!("$ev0");
350        let event_id_1 = owned_event_id!("$ev1");
351        let event_id_2 = owned_event_id!("$ev2");
352
353        let event_0 = sync_timeline_event(&event_id_0);
354        let event_1 = sync_timeline_event(&event_id_1);
355        let event_2 = sync_timeline_event(&event_id_2);
356
357        let deduplicator = BloomFilterDeduplicator::new();
358        let mut existing_events = RoomEvents::new();
359
360        // Simulate `event_1` is inserted inside `existing_events`.
361        {
362            let mut events =
363                deduplicator.scan_and_learn([event_1.clone()].into_iter(), &existing_events);
364
365            assert_let!(Some(Decoration::Unique(event_1)) = events.next());
366            assert_eq!(event_1.event_id(), Some(event_id_1.clone()));
367
368            assert!(events.next().is_none());
369
370            drop(events); // make the borrow checker happy.
371
372            // Now we can push `event_1` inside `existing_events`.
373            existing_events.push_events([event_1]);
374        }
375
376        // `event_1` will be duplicated.
377        {
378            let mut events = deduplicator.scan_and_learn(
379                [
380                    event_0, // OK
381                    event_1, // Not OK
382                    event_2, // Ok
383                ]
384                .into_iter(),
385                &existing_events,
386            );
387
388            assert_let!(Some(Decoration::Unique(event)) = events.next());
389            assert_eq!(event.event_id(), Some(event_id_0));
390
391            assert_let!(Some(Decoration::Duplicated(event)) = events.next());
392            assert_eq!(event.event_id(), Some(event_id_1));
393
394            assert_let!(Some(Decoration::Unique(event)) = events.next());
395            assert_eq!(event.event_id(), Some(event_id_2));
396
397            assert!(events.next().is_none());
398        }
399    }
400
401    #[test]
402    fn test_bloom_filter_growth() {
403        // This test was used as a testbed to observe, using `valgrind --tool=massive`,
404        // the total memory allocated by the deduplicator. We keep it checked in
405        // to revive this experiment in the future, if needs be.
406
407        let num_rooms = if let Ok(num_rooms) = std::env::var("ROOMS") {
408            num_rooms.parse().unwrap()
409        } else {
410            10
411        };
412
413        let num_events = if let Ok(num_events) = std::env::var("EVENTS") {
414            num_events.parse().unwrap()
415        } else {
416            100
417        };
418
419        let mut dedups = Vec::with_capacity(num_rooms);
420
421        for _ in 0..num_rooms {
422            let dedup = BloomFilterDeduplicator::new();
423            let existing_events = RoomEvents::new();
424
425            for i in 0..num_events {
426                let event = sync_timeline_event(&EventId::parse(format!("$event{i}")).unwrap());
427                let mut it = dedup.scan_and_learn([event].into_iter(), &existing_events);
428
429                assert_matches!(it.next(), Some(Decoration::Unique(..)));
430                assert_matches!(it.next(), None);
431            }
432
433            dedups.push(dedup);
434        }
435    }
436
437    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
438    #[matrix_sdk_test::async_test]
439    async fn test_storage_deduplication() {
440        use std::sync::Arc;
441
442        use matrix_sdk_base::{
443            event_cache::store::{EventCacheStore as _, MemoryStore},
444            linked_chunk::{ChunkIdentifier, Position, Update},
445        };
446        use matrix_sdk_test::{ALICE, BOB};
447        use ruma::{event_id, room_id, serde::Raw};
448
449        let room_id = room_id!("!galette:saucisse.bzh");
450        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
451
452        let event_cache_store = Arc::new(MemoryStore::new());
453
454        let eid1 = event_id!("$1");
455        let eid2 = event_id!("$2");
456        let eid3 = event_id!("$3");
457
458        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
459        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
460        let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
461        // An invalid event (doesn't have an event id.).
462        let ev4 = TimelineEvent::new(Raw::from_json_string("{}".to_owned()).unwrap());
463
464        // Prefill the store with ev1 and ev2.
465        event_cache_store
466            .handle_linked_chunk_updates(
467                room_id,
468                vec![
469                    // Non empty items chunk.
470                    Update::NewItemsChunk {
471                        previous: None,
472                        new: ChunkIdentifier::new(0),
473                        next: None,
474                    },
475                    Update::PushItems {
476                        at: Position::new(ChunkIdentifier::new(0), 0),
477                        items: vec![ev1.clone()],
478                    },
479                    // And another items chunk, non-empty again.
480                    Update::NewItemsChunk {
481                        previous: Some(ChunkIdentifier::new(0)),
482                        new: ChunkIdentifier::new(1),
483                        next: None,
484                    },
485                    Update::PushItems {
486                        at: Position::new(ChunkIdentifier::new(1), 0),
487                        items: vec![ev2.clone()],
488                    },
489                ],
490            )
491            .await
492            .unwrap();
493
494        // Wrap the store into its lock.
495        let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
496
497        let deduplicator =
498            StoreDeduplicator { room_id: room_id.to_owned(), store: event_cache_store };
499
500        let (valid_events, duplicates) =
501            deduplicator.filter_duplicate_events(vec![ev1, ev2, ev3, ev4]).await.unwrap();
502
503        assert_eq!(valid_events.len(), 3);
504        assert_eq!(valid_events[0].event_id().as_deref(), Some(eid1));
505        assert_eq!(valid_events[1].event_id().as_deref(), Some(eid2));
506        assert_eq!(valid_events[2].event_id().as_deref(), Some(eid3));
507
508        assert_eq!(duplicates.len(), 2);
509        assert_eq!(duplicates[0], eid1);
510        assert_eq!(duplicates[1], eid2);
511    }
512}