matrix_sdk_common/linked_chunk/
updates.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    pin::Pin,
18    sync::{Arc, RwLock, Weak},
19    task::{Context, Poll, Waker},
20};
21
22use futures_core::Stream;
23
24use super::{ChunkIdentifier, Position};
25
26/// Represent the updates that have happened inside a [`LinkedChunk`].
27///
28/// To retrieve the updates, use [`LinkedChunk::updates`].
29///
30/// These updates are useful to store a `LinkedChunk` in another form of
31/// storage, like a database or something similar.
32///
33/// [`LinkedChunk`]: super::LinkedChunk
34/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
35#[derive(Debug, Clone, PartialEq)]
36pub enum Update<Item, Gap> {
37    /// A new chunk of kind Items has been created.
38    NewItemsChunk {
39        /// The identifier of the previous chunk of this new chunk.
40        previous: Option<ChunkIdentifier>,
41
42        /// The identifier of the new chunk.
43        new: ChunkIdentifier,
44
45        /// The identifier of the next chunk of this new chunk.
46        next: Option<ChunkIdentifier>,
47    },
48
49    /// A new chunk of kind Gap has been created.
50    NewGapChunk {
51        /// The identifier of the previous chunk of this new chunk.
52        previous: Option<ChunkIdentifier>,
53
54        /// The identifier of the new chunk.
55        new: ChunkIdentifier,
56
57        /// The identifier of the next chunk of this new chunk.
58        next: Option<ChunkIdentifier>,
59
60        /// The content of the chunk.
61        gap: Gap,
62    },
63
64    /// A chunk has been removed.
65    RemoveChunk(ChunkIdentifier),
66
67    /// Items are pushed inside a chunk of kind Items.
68    PushItems {
69        /// The [`Position`] of the items.
70        ///
71        /// This value is given to prevent the need for position computations by
72        /// the update readers. Items are pushed, so the positions should be
73        /// incrementally computed from the previous items, which requires the
74        /// reading of the last previous item. With `at`, the update readers no
75        /// longer need to do so.
76        at: Position,
77
78        /// The items.
79        items: Vec<Item>,
80    },
81
82    /// An item has been replaced in the linked chunk.
83    ///
84    /// The `at` position MUST resolve to the actual position an existing *item*
85    /// (not a gap).
86    ReplaceItem {
87        /// The position of the item that's being replaced.
88        at: Position,
89
90        /// The new value for the item.
91        item: Item,
92    },
93
94    /// An item has been removed inside a chunk of kind Items.
95    RemoveItem {
96        /// The [`Position`] of the item.
97        at: Position,
98    },
99
100    /// The last items of a chunk have been detached, i.e. the chunk has been
101    /// truncated.
102    DetachLastItems {
103        /// The split position. Before this position (`..position`), items are
104        /// kept, from this position (`position..`), items are
105        /// detached.
106        at: Position,
107    },
108
109    /// Detached items (see [`Self::DetachLastItems`]) starts being reattached.
110    StartReattachItems,
111
112    /// Reattaching items (see [`Self::StartReattachItems`]) is finished.
113    EndReattachItems,
114
115    /// All chunks have been cleared, i.e. all items and all gaps have been
116    /// dropped.
117    Clear,
118}
119
120/// A collection of [`Update`]s that can be observed.
121///
122/// Get a value for this type with [`LinkedChunk::updates`].
123///
124/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
125#[derive(Debug)]
126pub struct ObservableUpdates<Item, Gap> {
127    pub(super) inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
128}
129
130impl<Item, Gap> ObservableUpdates<Item, Gap> {
131    /// Create a new [`ObservableUpdates`].
132    pub(super) fn new() -> Self {
133        Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
134    }
135
136    /// Push a new update.
137    pub(super) fn push(&mut self, update: Update<Item, Gap>) {
138        self.inner.write().unwrap().push(update);
139    }
140
141    /// Clear all pending updates.
142    pub(super) fn clear_pending(&mut self) {
143        self.inner.write().unwrap().clear_pending();
144    }
145
146    /// Take new updates.
147    ///
148    /// Updates that have been taken will not be read again.
149    pub fn take(&mut self) -> Vec<Update<Item, Gap>>
150    where
151        Item: Clone,
152        Gap: Clone,
153    {
154        self.inner.write().unwrap().take().to_owned()
155    }
156
157    /// Subscribe to updates by using a [`Stream`].
158    #[cfg(test)]
159    pub(super) fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
160        // A subscriber is a new update reader, it needs its own token.
161        let token = self.new_reader_token();
162
163        UpdatesSubscriber::new(Arc::downgrade(&self.inner), token)
164    }
165
166    /// Generate a new [`ReaderToken`].
167    pub(super) fn new_reader_token(&mut self) -> ReaderToken {
168        let mut inner = self.inner.write().unwrap();
169
170        // Add 1 before reading the `last_token`, in this particular order, because the
171        // 0 token is reserved by `MAIN_READER_TOKEN`.
172        inner.last_token += 1;
173        let last_token = inner.last_token;
174
175        inner.last_index_per_reader.insert(last_token, 0);
176
177        last_token
178    }
179}
180
181/// A token used to represent readers that read the updates in
182/// [`UpdatesInner`].
183pub(super) type ReaderToken = usize;
184
185/// Inner type for [`ObservableUpdates`].
186///
187/// The particularity of this type is that multiple readers can read the
188/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
189/// [`ObservableUpdates`]) is considered to be the _main reader_ (it has the
190/// token [`Self::MAIN_READER_TOKEN`]).
191///
192/// An update that have been read by all readers are garbage collected to be
193/// removed from the memory. An update will never be read twice by the same
194/// reader.
195///
196/// Why do we need multiple readers? The public API reads the updates with
197/// [`ObservableUpdates::take`], but the private API must also read the updates
198/// for example with [`UpdatesSubscriber`]. Of course, they can be multiple
199/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple
200/// readers.
201#[derive(Debug)]
202pub(super) struct UpdatesInner<Item, Gap> {
203    /// All the updates that have not been read by all readers.
204    updates: Vec<Update<Item, Gap>>,
205
206    /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
207    /// A reader is identified by a [`ReaderToken`].
208    ///
209    /// To each reader token is associated an index that represents the index of
210    /// the last reading. It is used to never return the same update twice.
211    last_index_per_reader: HashMap<ReaderToken, usize>,
212
213    /// The last generated token. This is useful to generate new token.
214    last_token: ReaderToken,
215
216    /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed
217    /// everytime it is called.
218    wakers: Vec<Waker>,
219}
220
221impl<Item, Gap> UpdatesInner<Item, Gap> {
222    /// The token used by the main reader. See [`Self::take`] to learn more.
223    const MAIN_READER_TOKEN: ReaderToken = 0;
224
225    /// Create a new [`Self`].
226    fn new() -> Self {
227        Self {
228            updates: Vec::with_capacity(8),
229            last_index_per_reader: {
230                let mut map = HashMap::with_capacity(2);
231                map.insert(Self::MAIN_READER_TOKEN, 0);
232
233                map
234            },
235            last_token: Self::MAIN_READER_TOKEN,
236            wakers: Vec::with_capacity(2),
237        }
238    }
239
240    /// Push a new update.
241    fn push(&mut self, update: Update<Item, Gap>) {
242        self.updates.push(update);
243
244        // Wake them up \o/.
245        for waker in self.wakers.drain(..) {
246            waker.wake();
247        }
248    }
249
250    /// Clear all pending updates.
251    fn clear_pending(&mut self) {
252        self.updates.clear();
253
254        // Reset all the per-reader indices.
255        for idx in self.last_index_per_reader.values_mut() {
256            *idx = 0;
257        }
258
259        // No need to wake the wakers; they're waiting for a new update, and we
260        // just made them all disappear.
261    }
262
263    /// Take new updates; it considers the caller is the main reader, i.e. it
264    /// will use the [`Self::MAIN_READER_TOKEN`].
265    ///
266    /// Updates that have been read will never be read again by the current
267    /// reader.
268    ///
269    /// Learn more by reading [`Self::take_with_token`].
270    fn take(&mut self) -> &[Update<Item, Gap>] {
271        self.take_with_token(Self::MAIN_READER_TOKEN)
272    }
273
274    /// Take new updates with a particular reader token.
275    ///
276    /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
277    /// A reader is identified by a [`ReaderToken`]. Every reader can
278    /// take/read/consume each update only once. An internal index is stored
279    /// per reader token to know where to start reading updates next time this
280    /// method is called.
281    pub(super) fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
282        // Let's garbage collect unused updates.
283        self.garbage_collect();
284
285        let index = self
286            .last_index_per_reader
287            .get_mut(&token)
288            .expect("Given `UpdatesToken` does not map to any index");
289
290        // Read new updates, and update the index.
291        let slice = &self.updates[*index..];
292        *index = self.updates.len();
293
294        slice
295    }
296
297    /// Return the number of updates in the buffer.
298    #[cfg(test)]
299    fn len(&self) -> usize {
300        self.updates.len()
301    }
302
303    /// Garbage collect unused updates. An update is considered unused when it's
304    /// been read by all readers.
305    ///
306    /// Basically, it reduces to finding the smallest last index for all
307    /// readers, and clear from 0 to that index.
308    fn garbage_collect(&mut self) {
309        let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
310
311        if min_index > 0 {
312            let _ = self.updates.drain(0..min_index);
313
314            // Let's shift the indices to the left by `min_index` to preserve them.
315            for index in self.last_index_per_reader.values_mut() {
316                *index -= min_index;
317            }
318        }
319    }
320}
321
322/// A subscriber to [`ObservableUpdates`]. It is helpful to receive updates via
323/// a [`Stream`].
324pub(super) struct UpdatesSubscriber<Item, Gap> {
325    /// Weak reference to [`UpdatesInner`].
326    ///
327    /// Using a weak reference allows [`ObservableUpdates`] to be dropped
328    /// freely even if a subscriber exists.
329    updates: Weak<RwLock<UpdatesInner<Item, Gap>>>,
330
331    /// The token to read the updates.
332    token: ReaderToken,
333}
334
335impl<Item, Gap> UpdatesSubscriber<Item, Gap> {
336    /// Create a new [`Self`].
337    #[cfg(test)]
338    fn new(updates: Weak<RwLock<UpdatesInner<Item, Gap>>>, token: ReaderToken) -> Self {
339        Self { updates, token }
340    }
341}
342
343impl<Item, Gap> Stream for UpdatesSubscriber<Item, Gap>
344where
345    Item: Clone,
346    Gap: Clone,
347{
348    type Item = Vec<Update<Item, Gap>>;
349
350    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
351        let Some(updates) = self.updates.upgrade() else {
352            // The `ObservableUpdates` has been dropped. It's time to close this stream.
353            return Poll::Ready(None);
354        };
355
356        let mut updates = updates.write().unwrap();
357        let the_updates = updates.take_with_token(self.token);
358
359        // No updates.
360        if the_updates.is_empty() {
361            // Let's register the waker.
362            updates.wakers.push(context.waker().clone());
363
364            // The stream is pending.
365            return Poll::Pending;
366        }
367
368        // There is updates! Let's forward them in this stream.
369        Poll::Ready(Some(the_updates.to_owned()))
370    }
371}
372
373impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
374    fn drop(&mut self) {
375        // Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
376        // This is important so that the garbage collector can do its jobs correctly
377        // without a dead dangling reader token.
378        if let Some(updates) = self.updates.upgrade() {
379            let mut updates = updates.write().unwrap();
380
381            // Remove the reader token from `UpdatesInner`.
382            // It's safe to ignore the result of `remove` here: `None` means the token was
383            // already removed (note: it should be unreachable).
384            let _ = updates.last_index_per_reader.remove(&self.token);
385        }
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use std::{
392        sync::{Arc, Mutex},
393        task::{Context, Poll, Wake},
394    };
395
396    use assert_matches::assert_matches;
397    use futures_util::pin_mut;
398
399    use super::{super::LinkedChunk, ChunkIdentifier, Position, Stream, UpdatesInner};
400
401    #[test]
402    fn test_updates_take_and_garbage_collector() {
403        use super::Update::*;
404
405        let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
406
407        // Simulate another updates “reader”, it can a subscriber.
408        let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
409        let other_token = {
410            let updates = linked_chunk.updates().unwrap();
411            let mut inner = updates.inner.write().unwrap();
412            inner.last_token += 1;
413
414            let other_token = inner.last_token;
415            inner.last_index_per_reader.insert(other_token, 0);
416
417            other_token
418        };
419
420        // There is an initial update.
421        {
422            let updates = linked_chunk.updates().unwrap();
423
424            assert_eq!(
425                updates.take(),
426                &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
427            );
428            assert_eq!(
429                updates.inner.write().unwrap().take_with_token(other_token),
430                &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
431            );
432        }
433
434        // No new update.
435        {
436            let updates = linked_chunk.updates().unwrap();
437
438            assert!(updates.take().is_empty());
439            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
440        }
441
442        linked_chunk.push_items_back(['a']);
443        linked_chunk.push_items_back(['b']);
444        linked_chunk.push_items_back(['c']);
445
446        // Scenario 1: “main” takes the new updates, “other” doesn't take the new
447        // updates.
448        //
449        // 0   1   2   3
450        // +---+---+---+
451        // | a | b | c |
452        // +---+---+---+
453        //
454        // “main” will move its index from 0 to 3.
455        // “other” won't move its index.
456        {
457            let updates = linked_chunk.updates().unwrap();
458
459            {
460                // Inspect number of updates in memory.
461                assert_eq!(updates.inner.read().unwrap().len(), 3);
462            }
463
464            assert_eq!(
465                updates.take(),
466                &[
467                    PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
468                    PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
469                    PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
470                ]
471            );
472
473            {
474                let inner = updates.inner.read().unwrap();
475
476                // Inspect number of updates in memory.
477                // It must be the same number as before as the garbage collector weren't not
478                // able to remove any unused updates.
479                assert_eq!(inner.len(), 3);
480
481                // Inspect the indices.
482                let indices = &inner.last_index_per_reader;
483
484                assert_eq!(indices.get(&main_token), Some(&3));
485                assert_eq!(indices.get(&other_token), Some(&0));
486            }
487        }
488
489        linked_chunk.push_items_back(['d']);
490        linked_chunk.push_items_back(['e']);
491        linked_chunk.push_items_back(['f']);
492
493        // Scenario 2: “other“ takes the new updates, “main” doesn't take the
494        // new updates.
495        //
496        // 0   1   2   3   4   5   6
497        // +---+---+---+---+---+---+
498        // | a | b | c | d | e | f |
499        // +---+---+---+---+---+---+
500        //
501        // “main” won't move its index.
502        // “other” will move its index from 0 to 6.
503        {
504            let updates = linked_chunk.updates().unwrap();
505
506            assert_eq!(
507                updates.inner.write().unwrap().take_with_token(other_token),
508                &[
509                    PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
510                    PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
511                    PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
512                    PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
513                    PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
514                    PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
515                ]
516            );
517
518            {
519                let inner = updates.inner.read().unwrap();
520
521                // Inspect number of updates in memory.
522                // It must be the same number as before as the garbage collector will be able to
523                // remove unused updates but at the next call…
524                assert_eq!(inner.len(), 6);
525
526                // Inspect the indices.
527                let indices = &inner.last_index_per_reader;
528
529                assert_eq!(indices.get(&main_token), Some(&3));
530                assert_eq!(indices.get(&other_token), Some(&6));
531            }
532        }
533
534        // Scenario 3: “other” take new updates, but there is none, “main”
535        // doesn't take new updates. The garbage collector will run and collect
536        // unused updates.
537        //
538        // 0   1   2   3
539        // +---+---+---+
540        // | d | e | f |
541        // +---+---+---+
542        //
543        // “main” will have its index updated from 3 to 0.
544        // “other” will have its index updated from 6 to 3.
545        {
546            let updates = linked_chunk.updates().unwrap();
547
548            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
549
550            {
551                let inner = updates.inner.read().unwrap();
552
553                // Inspect number of updates in memory.
554                // The garbage collector has removed unused updates.
555                assert_eq!(inner.len(), 3);
556
557                // Inspect the indices. They must have been adjusted.
558                let indices = &inner.last_index_per_reader;
559
560                assert_eq!(indices.get(&main_token), Some(&0));
561                assert_eq!(indices.get(&other_token), Some(&3));
562            }
563        }
564
565        linked_chunk.push_items_back(['g']);
566        linked_chunk.push_items_back(['h']);
567        linked_chunk.push_items_back(['i']);
568
569        // Scenario 4: both “main” and “other” take the new updates.
570        //
571        // 0   1   2   3   4   5   6
572        // +---+---+---+---+---+---+
573        // | d | e | f | g | h | i |
574        // +---+---+---+---+---+---+
575        //
576        // “main” will have its index updated from 0 to 3.
577        // “other” will have its index updated from 6 to 3.
578        {
579            let updates = linked_chunk.updates().unwrap();
580
581            assert_eq!(
582                updates.take(),
583                &[
584                    PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
585                    PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
586                    PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
587                    PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
588                    PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
589                    PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
590                ]
591            );
592            assert_eq!(
593                updates.inner.write().unwrap().take_with_token(other_token),
594                &[
595                    PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
596                    PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
597                    PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
598                ]
599            );
600
601            {
602                let inner = updates.inner.read().unwrap();
603
604                // Inspect number of updates in memory.
605                // The garbage collector had a chance to collect the first 3 updates.
606                assert_eq!(inner.len(), 3);
607
608                // Inspect the indices.
609                let indices = &inner.last_index_per_reader;
610
611                assert_eq!(indices.get(&main_token), Some(&3));
612                assert_eq!(indices.get(&other_token), Some(&3));
613            }
614        }
615
616        // Scenario 5: no more updates but they both try to take new updates.
617        // The garbage collector will collect all updates as all of them as
618        // been read already.
619        //
620        // “main” will have its index updated from 0 to 0.
621        // “other” will have its index updated from 3 to 0.
622        {
623            let updates = linked_chunk.updates().unwrap();
624
625            assert!(updates.take().is_empty());
626            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
627
628            {
629                let inner = updates.inner.read().unwrap();
630
631                // Inspect number of updates in memory.
632                // The garbage collector had a chance to collect all updates.
633                assert_eq!(inner.len(), 0);
634
635                // Inspect the indices.
636                let indices = &inner.last_index_per_reader;
637
638                assert_eq!(indices.get(&main_token), Some(&0));
639                assert_eq!(indices.get(&other_token), Some(&0));
640            }
641        }
642    }
643
644    struct CounterWaker {
645        number_of_wakeup: Mutex<usize>,
646    }
647
648    impl Wake for CounterWaker {
649        fn wake(self: Arc<Self>) {
650            *self.number_of_wakeup.lock().unwrap() += 1;
651        }
652    }
653
654    #[test]
655    fn test_updates_stream() {
656        use super::Update::*;
657
658        let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
659        let waker = counter_waker.clone().into();
660        let mut context = Context::from_waker(&waker);
661
662        let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
663
664        let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
665        pin_mut!(updates_subscriber);
666
667        // Initial update, stream is ready.
668        assert_matches!(
669            updates_subscriber.as_mut().poll_next(&mut context),
670            Poll::Ready(Some(items)) => {
671                assert_eq!(
672                    items,
673                    &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
674                );
675            }
676        );
677        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
678        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
679
680        // Let's generate an update.
681        linked_chunk.push_items_back(['a']);
682
683        // The waker must have been called.
684        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
685
686        // There is an update! Right after that, the stream is pending again.
687        assert_matches!(
688            updates_subscriber.as_mut().poll_next(&mut context),
689            Poll::Ready(Some(items)) => {
690                assert_eq!(
691                    items,
692                    &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
693                );
694            }
695        );
696        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
697
698        // Let's generate two other updates.
699        linked_chunk.push_items_back(['b']);
700        linked_chunk.push_items_back(['c']);
701
702        // The waker must have been called only once for the two updates.
703        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
704
705        // We can consume the updates without the stream, but the stream continues to
706        // know it has updates.
707        assert_eq!(
708            linked_chunk.updates().unwrap().take(),
709            &[
710                NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None },
711                PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
712                PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
713                PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
714            ]
715        );
716        assert_matches!(
717            updates_subscriber.as_mut().poll_next(&mut context),
718            Poll::Ready(Some(items)) => {
719                assert_eq!(
720                    items,
721                    &[
722                        PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
723                        PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
724                    ]
725                );
726            }
727        );
728        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
729
730        // When dropping the `LinkedChunk`, it closes the stream.
731        drop(linked_chunk);
732        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
733
734        // Wakers calls have not changed.
735        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
736    }
737
738    #[test]
739    fn test_updates_multiple_streams() {
740        use super::Update::*;
741
742        let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
743        let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
744
745        let waker1 = counter_waker1.clone().into();
746        let waker2 = counter_waker2.clone().into();
747
748        let mut context1 = Context::from_waker(&waker1);
749        let mut context2 = Context::from_waker(&waker2);
750
751        let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
752
753        let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
754        pin_mut!(updates_subscriber1);
755
756        // Scope for `updates_subscriber2`.
757        let updates_subscriber2_token = {
758            let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
759            pin_mut!(updates_subscriber2);
760
761            // Initial updates, streams are ready.
762            assert_matches!(
763                updates_subscriber1.as_mut().poll_next(&mut context1),
764                Poll::Ready(Some(items)) => {
765                    assert_eq!(
766                        items,
767                        &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
768                    );
769                }
770            );
771            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
772            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
773
774            assert_matches!(
775                updates_subscriber2.as_mut().poll_next(&mut context2),
776                Poll::Ready(Some(items)) => {
777                    assert_eq!(
778                        items,
779                        &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
780                    );
781                }
782            );
783            assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
784            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);
785
786            // Let's generate an update.
787            linked_chunk.push_items_back(['a']);
788
789            // The wakers must have been called.
790            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
791            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);
792
793            // There is an update! Right after that, the streams are pending again.
794            assert_matches!(
795                updates_subscriber1.as_mut().poll_next(&mut context1),
796                Poll::Ready(Some(items)) => {
797                    assert_eq!(
798                        items,
799                        &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
800                    );
801                }
802            );
803            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
804            assert_matches!(
805                updates_subscriber2.as_mut().poll_next(&mut context2),
806                Poll::Ready(Some(items)) => {
807                    assert_eq!(
808                        items,
809                        &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
810                    );
811                }
812            );
813            assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
814
815            // Let's generate two other updates.
816            linked_chunk.push_items_back(['b']);
817            linked_chunk.push_items_back(['c']);
818
819            // A waker is consumed when called. The first call to `push_items_back` will
820            // call and consume the wakers. The second call to `push_items_back` will do
821            // nothing as the wakers have been consumed. New wakers will be registered on
822            // polling.
823            //
824            // So, the waker must have been called only once for the two updates.
825            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
826            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);
827
828            // Let's poll `updates_subscriber1` only.
829            assert_matches!(
830                updates_subscriber1.as_mut().poll_next(&mut context1),
831                Poll::Ready(Some(items)) => {
832                    assert_eq!(
833                        items,
834                        &[
835                            PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
836                            PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
837                        ]
838                    );
839                }
840            );
841            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
842
843            // For the sake of this test, we also need to advance the main reader token.
844            let _ = linked_chunk.updates().unwrap().take();
845            let _ = linked_chunk.updates().unwrap().take();
846
847            // If we inspect the garbage collector state, `a`, `b` and `c` should still be
848            // present because not all of them have been consumed by `updates_subscriber2`
849            // yet.
850            {
851                let updates = linked_chunk.updates().unwrap();
852
853                let inner = updates.inner.read().unwrap();
854
855                // Inspect number of updates in memory.
856                // We get 2 because the garbage collector runs before data are taken, not after:
857                // `updates_subscriber2` has read `a` only, so `b` and `c` remain.
858                assert_eq!(inner.len(), 2);
859
860                // Inspect the indices.
861                let indices = &inner.last_index_per_reader;
862
863                assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
864                assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
865            }
866
867            // Poll `updates_subscriber1` again: there is no new update so it must be
868            // pending.
869            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
870
871            // The state of the garbage collector is unchanged: `a`, `b` and `c` are still
872            // in memory.
873            {
874                let updates = linked_chunk.updates().unwrap();
875
876                let inner = updates.inner.read().unwrap();
877
878                // Inspect number of updates in memory. Value is unchanged.
879                assert_eq!(inner.len(), 2);
880
881                // Inspect the indices. They are unchanged.
882                let indices = &inner.last_index_per_reader;
883
884                assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
885                assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
886            }
887
888            updates_subscriber2.token
889            // Drop `updates_subscriber2`!
890        };
891
892        // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
893        // still no new update, but it will run the garbage collector again, and this
894        // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
895        // collector must be empty.
896        assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
897
898        // Inspect the garbage collector.
899        {
900            let updates = linked_chunk.updates().unwrap();
901
902            let inner = updates.inner.read().unwrap();
903
904            // Inspect number of updates in memory.
905            assert_eq!(inner.len(), 0);
906
907            // Inspect the indices.
908            let indices = &inner.last_index_per_reader;
909
910            assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
911            assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown!
912        }
913
914        // When dropping the `LinkedChunk`, it closes the stream.
915        drop(linked_chunk);
916        assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
917    }
918}