matrix_sdk/event_cache/
deduplicator.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Simple but efficient types to find duplicated events. See [`Deduplicator`]
16//! to learn more.
17
18use std::{collections::BTreeSet, fmt, sync::Mutex};
19
20use growable_bloom_filter::{GrowableBloom, GrowableBloomBuilder};
21use matrix_sdk_base::{event_cache::store::EventCacheStoreLock, linked_chunk::Position};
22use ruma::{OwnedEventId, OwnedRoomId};
23use tracing::{debug, error};
24
25use super::{
26    room::events::{Event, RoomEvents},
27    EventCacheError,
28};
29
30/// A `Deduplicator` helps to find duplicate events.
31pub enum Deduplicator {
32    InMemory(BloomFilterDeduplicator),
33    PersistentStore(StoreDeduplicator),
34}
35
36impl Deduplicator {
37    /// Create an empty deduplicator instance that uses an internal Bloom
38    /// filter.
39    ///
40    /// Such a deduplicator is stateful, with no initial known events, and it
41    /// will learn over time by using a Bloom filter which events are
42    /// duplicates or not.
43    ///
44    /// When the persistent storage of the event cache is enabled by default,
45    /// this constructor (and the associated variant) will be removed.
46    pub fn new_memory_based() -> Self {
47        Self::InMemory(BloomFilterDeduplicator::new())
48    }
49
50    /// Create new store-based deduplicator that will run queries against the
51    /// store to find if any event is deduplicated or not.
52    ///
53    /// This deduplicator is stateless.
54    ///
55    /// When the persistent storage of the event cache is enabled by default,
56    /// this will become the default, and [`Deduplicator`] will be replaced
57    /// with [`StoreDeduplicator`].
58    pub fn new_store_based(room_id: OwnedRoomId, store: EventCacheStoreLock) -> Self {
59        Self::PersistentStore(StoreDeduplicator { room_id, store })
60    }
61
62    /// Find duplicates in the given collection of events, and return both
63    /// valid events (those with an event id) as well as the event ids of
64    /// duplicate events along with their position.
65    pub async fn filter_duplicate_events(
66        &self,
67        mut events: Vec<Event>,
68        room_events: &RoomEvents,
69    ) -> Result<DeduplicationOutcome, EventCacheError> {
70        // Remove all events with no ID, or that is duplicated inside `events`, i.e.
71        // `events` contains duplicated events in itself, e.g. `[$e0, $e1, $e0]`, here
72        // `$e0` is duplicated in within `events`.
73        {
74            let mut event_ids = BTreeSet::new();
75
76            events.retain(|event| {
77                let Some(event_id) = event.event_id() else {
78                    // No event ID? Bye bye.
79                    return false;
80                };
81
82                // Already seen this event in `events`? Bye bye.
83                if event_ids.contains(&event_id) {
84                    return false;
85                }
86
87                event_ids.insert(event_id);
88
89                // Let's keep this event!
90                true
91            });
92        }
93
94        Ok(match self {
95            Deduplicator::InMemory(dedup) => dedup.filter_duplicate_events(events, room_events),
96            Deduplicator::PersistentStore(dedup) => {
97                dedup.filter_duplicate_events(events, room_events).await?
98            }
99        })
100    }
101}
102
103/// A deduplication mechanism based on the persistent storage associated to the
104/// event cache.
105///
106/// It will use queries to the persistent storage to figure when events are
107/// duplicates or not, making it entirely stateless.
108pub struct StoreDeduplicator {
109    /// The room this deduplicator applies to.
110    room_id: OwnedRoomId,
111    /// The actual event cache store implementation used to query events.
112    store: EventCacheStoreLock,
113}
114
115impl StoreDeduplicator {
116    async fn filter_duplicate_events(
117        &self,
118        events: Vec<Event>,
119        room_events: &RoomEvents,
120    ) -> Result<DeduplicationOutcome, EventCacheError> {
121        let store = self.store.lock().await?;
122
123        // Let the store do its magic ✨
124        let duplicated_event_ids = store
125            .filter_duplicated_events(
126                &self.room_id,
127                events.iter().filter_map(|event| event.event_id()).collect(),
128            )
129            .await?;
130
131        // Separate duplicated events in two collections: ones that are in-memory, ones
132        // that are in the store.
133        let (in_memory_duplicated_event_ids, in_store_duplicated_event_ids) = {
134            // Collect all in-memory chunk identifiers.
135            let in_memory_chunk_identifiers =
136                room_events.chunks().map(|chunk| chunk.identifier()).collect::<Vec<_>>();
137
138            let mut in_memory = vec![];
139            let mut in_store = vec![];
140
141            for (duplicated_event_id, position) in duplicated_event_ids {
142                if in_memory_chunk_identifiers.contains(&position.chunk_identifier()) {
143                    in_memory.push((duplicated_event_id, position));
144                } else {
145                    in_store.push((duplicated_event_id, position));
146                }
147            }
148
149            (in_memory, in_store)
150        };
151
152        Ok(DeduplicationOutcome {
153            all_events: events,
154            in_memory_duplicated_event_ids,
155            in_store_duplicated_event_ids,
156        })
157    }
158}
159
160/// `BloomFilterDeduplicator` is an efficient type to find duplicated events,
161/// using an in-memory cache.
162///
163/// It uses a [bloom filter] to provide a memory efficient probabilistic answer
164/// to: “has event E been seen already?”. False positives are possible, while
165/// false negatives are impossible. In the case of a positive reply, we fallback
166/// to a linear (backward) search on all events to check whether it's a false
167/// positive or not
168///
169/// [bloom filter]: https://en.wikipedia.org/wiki/Bloom_filter
170pub struct BloomFilterDeduplicator {
171    bloom_filter: Mutex<GrowableBloom>,
172}
173
174impl fmt::Debug for BloomFilterDeduplicator {
175    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
176        formatter.debug_struct("Deduplicator").finish_non_exhaustive()
177    }
178}
179
180impl BloomFilterDeduplicator {
181    // Note: don't use too high numbers here, or the amount of allocated memory will
182    // explode. See https://github.com/matrix-org/matrix-rust-sdk/pull/4231 for details.
183    const APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS: usize = 1_000;
184    const DESIRED_FALSE_POSITIVE_RATE: f64 = 0.01;
185
186    /// Create a new `Deduplicator` with no prior knowledge of known events.
187    fn new() -> Self {
188        let bloom_filter = GrowableBloomBuilder::new()
189            .estimated_insertions(Self::APPROXIMATED_MAXIMUM_NUMBER_OF_EVENTS)
190            .desired_error_ratio(Self::DESIRED_FALSE_POSITIVE_RATE)
191            .build();
192        Self { bloom_filter: Mutex::new(bloom_filter) }
193    }
194
195    /// Find duplicates in the given collection of events, and return both
196    /// valid events (those with an event id) as well as the event ids of
197    /// duplicate events along with their position.
198    fn filter_duplicate_events(
199        &self,
200        events: Vec<Event>,
201        room_events: &RoomEvents,
202    ) -> DeduplicationOutcome {
203        let mut duplicated_event_ids = Vec::new();
204
205        let events = self
206            .scan_and_learn(events.into_iter(), room_events)
207            .map(|decorated_event| match decorated_event {
208                Decoration::Unique(event) => event,
209                Decoration::Duplicated((event, position)) => {
210                    debug!(event_id = ?event.event_id(), "Found a duplicated event");
211
212                    let event_id = event
213                        .event_id()
214                        // SAFETY: An event with no ID is not possible, as invalid events are
215                        // already filtered out. Thus, it's safe to unwrap the
216                        // `Option<OwnedEventId>` here.
217                        .expect("The event has no ID");
218
219                    duplicated_event_ids.push((event_id, position));
220
221                    // Keep the new event!
222                    event
223                }
224            })
225            .collect::<Vec<_>>();
226
227        DeduplicationOutcome {
228            all_events: events,
229            in_memory_duplicated_event_ids: duplicated_event_ids,
230            in_store_duplicated_event_ids: vec![],
231        }
232    }
233
234    /// Scan a collection of events and detect duplications.
235    ///
236    /// This method takes a collection of events `new_events_to_scan` and
237    /// returns a new collection of events, where each event is decorated by
238    /// a [`Decoration`], so that the caller can decide what to do with
239    /// these events.
240    ///
241    /// Each scanned event will update `Self`'s internal state.
242    ///
243    /// `existing_events` represents all events of a room that already exist.
244    fn scan_and_learn<'a, I>(
245        &'a self,
246        new_events_to_scan: I,
247        existing_events: &'a RoomEvents,
248    ) -> impl Iterator<Item = Decoration<I::Item>> + 'a
249    where
250        I: Iterator<Item = Event> + 'a,
251    {
252        new_events_to_scan.filter_map(move |event| {
253            let Some(event_id) = event.event_id() else {
254                // The event has no `event_id`. This is normally unreachable as event with no ID
255                // are already filtered out.
256                error!(?event, "Found an event with no ID");
257                return None;
258            };
259
260            Some(if self.bloom_filter.lock().unwrap().check_and_set(&event_id) {
261                // Oh oh, it looks like we have found a duplicate!
262                //
263                // However, bloom filters have false positives. We are NOT sure the event is NOT
264                // present. Even if the false positive rate is low, we need to
265                // iterate over all events to ensure it isn't present.
266                //
267                // We can iterate over all events to ensure `event` is not present in
268                // `existing_events`.
269                let position_of_the_duplicated_event =
270                    existing_events.revents().find_map(|(position, other_event)| {
271                        (other_event.event_id().as_ref() == Some(&event_id)).then_some(position)
272                    });
273
274                if let Some(position) = position_of_the_duplicated_event {
275                    Decoration::Duplicated((event, position))
276                } else {
277                    Decoration::Unique(event)
278                }
279            } else {
280                // Bloom filter has no false negatives. We are sure the event is NOT present: we
281                // can keep it in the iterator.
282                Decoration::Unique(event)
283            })
284        })
285    }
286}
287
288/// Information about the scanned collection of events.
289#[derive(Debug)]
290enum Decoration<I> {
291    /// This event is not duplicated.
292    Unique(I),
293
294    /// This event is duplicated.
295    Duplicated((I, Position)),
296}
297
298pub(super) struct DeduplicationOutcome {
299    /// All events passed to the deduplicator.
300    ///
301    /// All events in this collection have a valid event ID.
302    ///
303    /// This collection does not contain duplicated events in itself.
304    pub all_events: Vec<Event>,
305
306    /// Events in [`Self::all_events`] that are duplicated and present in
307    /// memory. It means they have been loaded from the store if any.
308    ///
309    /// Events are sorted by their position, from the newest to the oldest
310    /// (position is descending).
311    pub in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
312
313    /// Events in [`Self::all_events`] that are duplicated and present in
314    /// the store. It means they have **NOT** been loaded from the store into
315    /// memory yet.
316    ///
317    /// Events are sorted by their position, from the newest to the oldest
318    /// (position is descending).
319    pub in_store_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
320}
321
322#[cfg(test)]
323mod tests {
324    use assert_matches2::{assert_let, assert_matches};
325    use matrix_sdk_base::{deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier};
326    use matrix_sdk_test::{async_test, event_factory::EventFactory};
327    use ruma::{owned_event_id, serde::Raw, user_id, EventId};
328
329    use super::*;
330
331    fn timeline_event(event_id: &EventId) -> TimelineEvent {
332        EventFactory::new()
333            .text_msg("")
334            .sender(user_id!("@mnt_io:matrix.org"))
335            .event_id(event_id)
336            .into_event()
337    }
338
339    #[async_test]
340    async fn test_filter_find_duplicates_in_the_input() {
341        let event_id_0 = owned_event_id!("$ev0");
342        let event_id_1 = owned_event_id!("$ev1");
343
344        let event_0 = timeline_event(&event_id_0);
345        let event_1 = timeline_event(&event_id_1);
346
347        // It doesn't matter which deduplicator we peak, the feature is ensured by the
348        // “frontend”, not the “backend” of the deduplicator.
349        let deduplicator = Deduplicator::new_memory_based();
350        let room_events = RoomEvents::new();
351
352        let outcome = deduplicator
353            .filter_duplicate_events(
354                vec![
355                    event_0.clone(), // Ok
356                    event_1,         // Ok
357                    event_0,         // Duplicated
358                ],
359                &room_events,
360            )
361            .await
362            .unwrap();
363
364        // We get 2 events, not 3, because one was duplicated.
365        assert_eq!(outcome.all_events.len(), 2);
366        assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0));
367        assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1));
368    }
369
370    #[async_test]
371    async fn test_filter_exclude_invalid_events_from_the_input() {
372        let event_id_0 = owned_event_id!("$ev0");
373        let event_id_1 = owned_event_id!("$ev1");
374
375        let event_0 = timeline_event(&event_id_0);
376        let event_1 = timeline_event(&event_id_1);
377        // An event with no ID.
378        let event_2 = TimelineEvent::new(Raw::from_json_string("{}".to_owned()).unwrap());
379
380        // It doesn't matter which deduplicator we peak, the feature is ensured by the
381        // “frontend”, not the “backend” of the deduplicator.
382        let deduplicator = Deduplicator::new_memory_based();
383        let room_events = RoomEvents::new();
384
385        let outcome = deduplicator
386            .filter_duplicate_events(
387                vec![
388                    event_0.clone(), // Ok
389                    event_1,         // Ok
390                    event_2,         // Invalid
391                ],
392                &room_events,
393            )
394            .await
395            .unwrap();
396
397        // We get 2 events, not 3, because one was invalid.
398        assert_eq!(outcome.all_events.len(), 2);
399        assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0));
400        assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1));
401    }
402
403    #[async_test]
404    async fn test_memory_based_duplicated_event_ids_from_in_memory_vs_in_store() {
405        let event_id_0 = owned_event_id!("$ev0");
406        let event_id_1 = owned_event_id!("$ev1");
407
408        let event_0 = timeline_event(&event_id_0);
409        let event_1 = timeline_event(&event_id_1);
410
411        let mut deduplicator = Deduplicator::new_memory_based();
412        let mut room_events = RoomEvents::new();
413        // `event_0` is loaded in memory.
414        // `event_1` is not loaded in memory, it's new.
415        {
416            let Deduplicator::InMemory(bloom_filter) = &mut deduplicator else {
417                panic!("test is broken, but sky is beautiful");
418            };
419            bloom_filter.bloom_filter.lock().unwrap().insert(event_id_0.clone());
420            room_events.push_events([event_0.clone()]);
421        }
422
423        let outcome = deduplicator
424            .filter_duplicate_events(vec![event_0, event_1], &room_events)
425            .await
426            .unwrap();
427
428        // The deduplication says 2 events are valid.
429        assert_eq!(outcome.all_events.len(), 2);
430        assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
431        assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1));
432
433        // From these 2 events, 1 is duplicated and has been loaded in memory.
434        assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 1);
435        assert_eq!(
436            outcome.in_memory_duplicated_event_ids[0],
437            (event_id_0, Position::new(ChunkIdentifier::new(0), 0))
438        );
439
440        // From these 2 events, 0 are duplicated and live in the store.
441        //
442        // Note: with the Bloom filter, this value is always empty because there is no
443        // store.
444        assert!(outcome.in_store_duplicated_event_ids.is_empty());
445    }
446
447    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
448    #[async_test]
449    async fn test_store_based_duplicated_event_ids_from_in_memory_vs_in_store() {
450        use std::sync::Arc;
451
452        use matrix_sdk_base::{
453            event_cache::store::{EventCacheStore, MemoryStore},
454            linked_chunk::Update,
455        };
456        use ruma::room_id;
457
458        let event_id_0 = owned_event_id!("$ev0");
459        let event_id_1 = owned_event_id!("$ev1");
460        let event_id_2 = owned_event_id!("$ev2");
461        let event_id_3 = owned_event_id!("$ev3");
462        let event_id_4 = owned_event_id!("$ev4");
463
464        // `event_0` and `event_1` are in the store.
465        // `event_2` and `event_3` is in the store, but also in memory: it's loaded in
466        // memory from the store.
467        // `event_4` is nowhere, it's new.
468        let event_0 = timeline_event(&event_id_0);
469        let event_1 = timeline_event(&event_id_1);
470        let event_2 = timeline_event(&event_id_2);
471        let event_3 = timeline_event(&event_id_3);
472        let event_4 = timeline_event(&event_id_4);
473
474        let event_cache_store = Arc::new(MemoryStore::new());
475        let room_id = room_id!("!fondue:raclette.ch");
476
477        // Prefill the store with ev1 and ev2.
478        event_cache_store
479            .handle_linked_chunk_updates(
480                room_id,
481                vec![
482                    Update::NewItemsChunk {
483                        previous: None,
484                        new: ChunkIdentifier::new(42),
485                        next: None,
486                    },
487                    Update::PushItems {
488                        at: Position::new(ChunkIdentifier::new(42), 0),
489                        items: vec![event_0.clone(), event_1.clone()],
490                    },
491                    Update::NewItemsChunk {
492                        previous: Some(ChunkIdentifier::new(42)),
493                        new: ChunkIdentifier::new(0), // must match the chunk in `RoomEvents`, so 0. It simulates a lazy-load for example.
494                        next: None,
495                    },
496                    Update::PushItems {
497                        at: Position::new(ChunkIdentifier::new(0), 0),
498                        items: vec![event_2.clone(), event_3.clone()],
499                    },
500                ],
501            )
502            .await
503            .unwrap();
504
505        let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
506
507        let deduplicator = Deduplicator::new_store_based(room_id.to_owned(), event_cache_store);
508        let mut room_events = RoomEvents::new();
509        room_events.push_events([event_2.clone(), event_3.clone()]);
510
511        let outcome = deduplicator
512            .filter_duplicate_events(
513                vec![event_0, event_1, event_2, event_3, event_4],
514                &room_events,
515            )
516            .await
517            .unwrap();
518
519        // The deduplication says 5 events are valid.
520        assert_eq!(outcome.all_events.len(), 5);
521        assert_eq!(outcome.all_events[0].event_id(), Some(event_id_0.clone()));
522        assert_eq!(outcome.all_events[1].event_id(), Some(event_id_1.clone()));
523        assert_eq!(outcome.all_events[2].event_id(), Some(event_id_2.clone()));
524        assert_eq!(outcome.all_events[3].event_id(), Some(event_id_3.clone()));
525        assert_eq!(outcome.all_events[4].event_id(), Some(event_id_4.clone()));
526
527        // From these 5 events, 2 are duplicated and have been loaded in memory.
528        //
529        // Note that events are sorted by their descending position.
530        assert_eq!(outcome.in_memory_duplicated_event_ids.len(), 2);
531        assert_eq!(
532            outcome.in_memory_duplicated_event_ids[0],
533            (event_id_2, Position::new(ChunkIdentifier::new(0), 0))
534        );
535        assert_eq!(
536            outcome.in_memory_duplicated_event_ids[1],
537            (event_id_3, Position::new(ChunkIdentifier::new(0), 1))
538        );
539
540        // From these 4 events, 2 are duplicated and live in the store only, they have
541        // not been loaded in memory.
542        //
543        // Note that events are sorted by their descending position.
544        assert_eq!(outcome.in_store_duplicated_event_ids.len(), 2);
545        assert_eq!(
546            outcome.in_store_duplicated_event_ids[0],
547            (event_id_0, Position::new(ChunkIdentifier::new(42), 0))
548        );
549        assert_eq!(
550            outcome.in_store_duplicated_event_ids[1],
551            (event_id_1, Position::new(ChunkIdentifier::new(42), 1))
552        );
553    }
554
555    #[test]
556    fn test_bloom_filter_no_duplicate() {
557        let event_id_0 = owned_event_id!("$ev0");
558        let event_id_1 = owned_event_id!("$ev1");
559        let event_id_2 = owned_event_id!("$ev2");
560
561        let event_0 = timeline_event(&event_id_0);
562        let event_1 = timeline_event(&event_id_1);
563        let event_2 = timeline_event(&event_id_2);
564
565        let deduplicator = BloomFilterDeduplicator::new();
566        let existing_events = RoomEvents::new();
567
568        let mut events =
569            deduplicator.scan_and_learn([event_0, event_1, event_2].into_iter(), &existing_events);
570
571        assert_let!(Some(Decoration::Unique(event)) = events.next());
572        assert_eq!(event.event_id(), Some(event_id_0));
573
574        assert_let!(Some(Decoration::Unique(event)) = events.next());
575        assert_eq!(event.event_id(), Some(event_id_1));
576
577        assert_let!(Some(Decoration::Unique(event)) = events.next());
578        assert_eq!(event.event_id(), Some(event_id_2));
579
580        assert!(events.next().is_none());
581    }
582
583    #[test]
584    fn test_bloom_filter_growth() {
585        // This test was used as a testbed to observe, using `valgrind --tool=massive`,
586        // the total memory allocated by the deduplicator. We keep it checked in
587        // to revive this experiment in the future, if needs be.
588
589        let num_rooms = if let Ok(num_rooms) = std::env::var("ROOMS") {
590            num_rooms.parse().unwrap()
591        } else {
592            10
593        };
594
595        let num_events = if let Ok(num_events) = std::env::var("EVENTS") {
596            num_events.parse().unwrap()
597        } else {
598            100
599        };
600
601        let mut dedups = Vec::with_capacity(num_rooms);
602
603        for _ in 0..num_rooms {
604            let dedup = BloomFilterDeduplicator::new();
605            let existing_events = RoomEvents::new();
606
607            for i in 0..num_events {
608                let event = timeline_event(&EventId::parse(format!("$event{i}")).unwrap());
609                let mut it = dedup.scan_and_learn([event].into_iter(), &existing_events);
610
611                assert_matches!(it.next(), Some(Decoration::Unique(..)));
612                assert_matches!(it.next(), None);
613            }
614
615            dedups.push(dedup);
616        }
617    }
618
619    #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support.
620    #[async_test]
621    async fn test_storage_deduplication() {
622        use std::sync::Arc;
623
624        use matrix_sdk_base::{
625            event_cache::store::{EventCacheStore as _, MemoryStore},
626            linked_chunk::{ChunkIdentifier, Position, Update},
627        };
628        use matrix_sdk_test::{ALICE, BOB};
629        use ruma::{event_id, room_id};
630
631        let room_id = room_id!("!galette:saucisse.bzh");
632        let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh"));
633
634        let event_cache_store = Arc::new(MemoryStore::new());
635
636        let eid1 = event_id!("$1");
637        let eid2 = event_id!("$2");
638        let eid3 = event_id!("$3");
639
640        let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(eid1).into_event();
641        let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(eid2).into_event();
642        let ev3 = f.text_msg("wassup").sender(*ALICE).event_id(eid3).into_event();
643        // An invalid event (doesn't have an event id.).
644        let ev4 = TimelineEvent::new(Raw::from_json_string("{}".to_owned()).unwrap());
645
646        // Prefill the store with ev1 and ev2.
647        event_cache_store
648            .handle_linked_chunk_updates(
649                room_id,
650                vec![
651                    // Non empty items chunk.
652                    Update::NewItemsChunk {
653                        previous: None,
654                        new: ChunkIdentifier::new(42),
655                        next: None,
656                    },
657                    Update::PushItems {
658                        at: Position::new(ChunkIdentifier::new(42), 0),
659                        items: vec![ev1.clone()],
660                    },
661                    // And another items chunk, non-empty again.
662                    Update::NewItemsChunk {
663                        previous: Some(ChunkIdentifier::new(42)),
664                        new: ChunkIdentifier::new(43),
665                        next: None,
666                    },
667                    Update::PushItems {
668                        at: Position::new(ChunkIdentifier::new(43), 0),
669                        items: vec![ev2.clone()],
670                    },
671                ],
672            )
673            .await
674            .unwrap();
675
676        // Wrap the store into its lock.
677        let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned());
678
679        let deduplicator = Deduplicator::new_store_based(room_id.to_owned(), event_cache_store);
680
681        let room_events = RoomEvents::new();
682        let DeduplicationOutcome {
683            all_events: events,
684            in_memory_duplicated_event_ids,
685            in_store_duplicated_event_ids,
686        } = deduplicator
687            .filter_duplicate_events(vec![ev1, ev2, ev3, ev4], &room_events)
688            .await
689            .unwrap();
690
691        assert_eq!(events.len(), 3);
692        assert_eq!(events[0].event_id().as_deref(), Some(eid1));
693        assert_eq!(events[1].event_id().as_deref(), Some(eid2));
694        assert_eq!(events[2].event_id().as_deref(), Some(eid3));
695
696        assert!(in_memory_duplicated_event_ids.is_empty());
697
698        assert_eq!(in_store_duplicated_event_ids.len(), 2);
699        assert_eq!(
700            in_store_duplicated_event_ids[0],
701            (eid1.to_owned(), Position::new(ChunkIdentifier::new(42), 0))
702        );
703        assert_eq!(
704            in_store_duplicated_event_ids[1],
705            (eid2.to_owned(), Position::new(ChunkIdentifier::new(43), 0))
706        );
707    }
708}