matrix_sdk_common/linked_chunk/
updates.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    sync::{Arc, RwLock},
18    task::Waker,
19};
20
21use super::{ChunkIdentifier, Position};
22
23/// Represent the updates that have happened inside a [`LinkedChunk`].
24///
25/// To retrieve the updates, use [`LinkedChunk::updates`].
26///
27/// These updates are useful to store a `LinkedChunk` in another form of
28/// storage, like a database or something similar.
29///
30/// [`LinkedChunk`]: super::LinkedChunk
31/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
32#[derive(Debug, Clone, PartialEq)]
33pub enum Update<Item, Gap> {
34    /// A new chunk of kind Items has been created.
35    NewItemsChunk {
36        /// The identifier of the previous chunk of this new chunk.
37        previous: Option<ChunkIdentifier>,
38
39        /// The identifier of the new chunk.
40        new: ChunkIdentifier,
41
42        /// The identifier of the next chunk of this new chunk.
43        next: Option<ChunkIdentifier>,
44    },
45
46    /// A new chunk of kind Gap has been created.
47    NewGapChunk {
48        /// The identifier of the previous chunk of this new chunk.
49        previous: Option<ChunkIdentifier>,
50
51        /// The identifier of the new chunk.
52        new: ChunkIdentifier,
53
54        /// The identifier of the next chunk of this new chunk.
55        next: Option<ChunkIdentifier>,
56
57        /// The content of the chunk.
58        gap: Gap,
59    },
60
61    /// A chunk has been removed.
62    RemoveChunk(ChunkIdentifier),
63
64    /// Items are pushed inside a chunk of kind Items.
65    PushItems {
66        /// The [`Position`] of the items.
67        ///
68        /// This value is given to prevent the need for position computations by
69        /// the update readers. Items are pushed, so the positions should be
70        /// incrementally computed from the previous items, which requires the
71        /// reading of the last previous item. With `at`, the update readers no
72        /// longer need to do so.
73        at: Position,
74
75        /// The items.
76        items: Vec<Item>,
77    },
78
79    /// An item has been replaced in the linked chunk.
80    ///
81    /// The `at` position MUST resolve to the actual position an existing *item*
82    /// (not a gap).
83    ReplaceItem {
84        /// The position of the item that's being replaced.
85        at: Position,
86
87        /// The new value for the item.
88        item: Item,
89    },
90
91    /// An item has been removed inside a chunk of kind Items.
92    RemoveItem {
93        /// The [`Position`] of the item.
94        at: Position,
95    },
96
97    /// The last items of a chunk have been detached, i.e. the chunk has been
98    /// truncated.
99    DetachLastItems {
100        /// The split position. Before this position (`..position`), items are
101        /// kept, from this position (`position..`), items are
102        /// detached.
103        at: Position,
104    },
105
106    /// Detached items (see [`Self::DetachLastItems`]) starts being reattached.
107    StartReattachItems,
108
109    /// Reattaching items (see [`Self::StartReattachItems`]) is finished.
110    EndReattachItems,
111
112    /// All chunks have been cleared, i.e. all items and all gaps have been
113    /// dropped.
114    Clear,
115}
116
117/// A collection of [`Update`]s that can be observed.
118///
119/// Get a value for this type with [`LinkedChunk::updates`].
120///
121/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
122#[derive(Debug)]
123pub struct ObservableUpdates<Item, Gap> {
124    pub(super) inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
125}
126
127impl<Item, Gap> ObservableUpdates<Item, Gap> {
128    /// Create a new [`ObservableUpdates`].
129    pub(super) fn new() -> Self {
130        Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
131    }
132
133    /// Push a new update.
134    pub(super) fn push(&mut self, update: Update<Item, Gap>) {
135        self.inner.write().unwrap().push(update);
136    }
137
138    /// Clear all pending updates.
139    pub(super) fn clear_pending(&mut self) {
140        self.inner.write().unwrap().clear_pending();
141    }
142
143    /// Take new updates.
144    ///
145    /// Updates that have been taken will not be read again.
146    pub fn take(&mut self) -> Vec<Update<Item, Gap>>
147    where
148        Item: Clone,
149        Gap: Clone,
150    {
151        self.inner.write().unwrap().take().to_owned()
152    }
153
154    /// Subscribe to updates by using a [`Stream`].
155    #[cfg(test)]
156    pub(super) fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
157        // A subscriber is a new update reader, it needs its own token.
158        let token = self.new_reader_token();
159
160        UpdatesSubscriber::new(Arc::downgrade(&self.inner), token)
161    }
162
163    /// Generate a new [`ReaderToken`].
164    pub(super) fn new_reader_token(&mut self) -> ReaderToken {
165        let mut inner = self.inner.write().unwrap();
166
167        // Add 1 before reading the `last_token`, in this particular order, because the
168        // 0 token is reserved by `MAIN_READER_TOKEN`.
169        inner.last_token += 1;
170        let last_token = inner.last_token;
171
172        inner.last_index_per_reader.insert(last_token, 0);
173
174        last_token
175    }
176}
177
178/// A token used to represent readers that read the updates in
179/// [`UpdatesInner`].
180pub(super) type ReaderToken = usize;
181
182/// Inner type for [`ObservableUpdates`].
183///
184/// The particularity of this type is that multiple readers can read the
185/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
186/// [`ObservableUpdates`]) is considered to be the _main reader_ (it has the
187/// token [`Self::MAIN_READER_TOKEN`]).
188///
189/// An update that have been read by all readers are garbage collected to be
190/// removed from the memory. An update will never be read twice by the same
191/// reader.
192///
193/// Why do we need multiple readers? The public API reads the updates with
194/// [`ObservableUpdates::take`], but the private API must also read the updates
195/// for example with [`UpdatesSubscriber`]. Of course, they can be multiple
196/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple
197/// readers.
198#[derive(Debug)]
199pub(super) struct UpdatesInner<Item, Gap> {
200    /// All the updates that have not been read by all readers.
201    updates: Vec<Update<Item, Gap>>,
202
203    /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
204    /// A reader is identified by a [`ReaderToken`].
205    ///
206    /// To each reader token is associated an index that represents the index of
207    /// the last reading. It is used to never return the same update twice.
208    last_index_per_reader: HashMap<ReaderToken, usize>,
209
210    /// The last generated token. This is useful to generate new token.
211    last_token: ReaderToken,
212
213    /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed
214    /// everytime it is called.
215    wakers: Vec<Waker>,
216}
217
218impl<Item, Gap> UpdatesInner<Item, Gap> {
219    /// The token used by the main reader. See [`Self::take`] to learn more.
220    const MAIN_READER_TOKEN: ReaderToken = 0;
221
222    /// Create a new [`Self`].
223    fn new() -> Self {
224        Self {
225            updates: Vec::with_capacity(8),
226            last_index_per_reader: {
227                let mut map = HashMap::with_capacity(2);
228                map.insert(Self::MAIN_READER_TOKEN, 0);
229
230                map
231            },
232            last_token: Self::MAIN_READER_TOKEN,
233            wakers: Vec::with_capacity(2),
234        }
235    }
236
237    /// Push a new update.
238    fn push(&mut self, update: Update<Item, Gap>) {
239        self.updates.push(update);
240
241        // Wake them up \o/.
242        for waker in self.wakers.drain(..) {
243            waker.wake();
244        }
245    }
246
247    /// Clear all pending updates.
248    fn clear_pending(&mut self) {
249        self.updates.clear();
250
251        // Reset all the per-reader indices.
252        for idx in self.last_index_per_reader.values_mut() {
253            *idx = 0;
254        }
255
256        // No need to wake the wakers; they're waiting for a new update, and we
257        // just made them all disappear.
258    }
259
260    /// Take new updates; it considers the caller is the main reader, i.e. it
261    /// will use the [`Self::MAIN_READER_TOKEN`].
262    ///
263    /// Updates that have been read will never be read again by the current
264    /// reader.
265    ///
266    /// Learn more by reading [`Self::take_with_token`].
267    fn take(&mut self) -> &[Update<Item, Gap>] {
268        self.take_with_token(Self::MAIN_READER_TOKEN)
269    }
270
271    /// Take new updates with a particular reader token.
272    ///
273    /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
274    /// A reader is identified by a [`ReaderToken`]. Every reader can
275    /// take/read/consume each update only once. An internal index is stored
276    /// per reader token to know where to start reading updates next time this
277    /// method is called.
278    pub(super) fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
279        // Let's garbage collect unused updates.
280        self.garbage_collect();
281
282        let index = self
283            .last_index_per_reader
284            .get_mut(&token)
285            .expect("Given `UpdatesToken` does not map to any index");
286
287        // Read new updates, and update the index.
288        let slice = &self.updates[*index..];
289        *index = self.updates.len();
290
291        slice
292    }
293
294    /// Has the given reader, identified by its [`ReaderToken`], some pending
295    /// updates, or has it consumed all the pending updates?
296    pub(super) fn is_reader_up_to_date(&self, token: ReaderToken) -> bool {
297        *self.last_index_per_reader.get(&token).expect("unknown reader token") == self.updates.len()
298    }
299
300    /// Return the number of updates in the buffer.
301    #[cfg(test)]
302    fn len(&self) -> usize {
303        self.updates.len()
304    }
305
306    /// Garbage collect unused updates. An update is considered unused when it's
307    /// been read by all readers.
308    ///
309    /// Basically, it reduces to finding the smallest last index for all
310    /// readers, and clear from 0 to that index.
311    fn garbage_collect(&mut self) {
312        let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
313
314        if min_index > 0 {
315            let _ = self.updates.drain(0..min_index);
316
317            // Let's shift the indices to the left by `min_index` to preserve them.
318            for index in self.last_index_per_reader.values_mut() {
319                *index -= min_index;
320            }
321        }
322    }
323}
324
325/// A subscriber to [`ObservableUpdates`]. It is helpful to receive updates via
326/// a [`Stream`].
327#[cfg(test)]
328pub(super) struct UpdatesSubscriber<Item, Gap> {
329    /// Weak reference to [`UpdatesInner`].
330    ///
331    /// Using a weak reference allows [`ObservableUpdates`] to be dropped
332    /// freely even if a subscriber exists.
333    updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>,
334
335    /// The token to read the updates.
336    token: ReaderToken,
337}
338
339#[cfg(test)]
340impl<Item, Gap> UpdatesSubscriber<Item, Gap> {
341    /// Create a new [`Self`].
342    #[cfg(test)]
343    fn new(updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>, token: ReaderToken) -> Self {
344        Self { updates, token }
345    }
346}
347
348#[cfg(test)]
349impl<Item, Gap> futures_core::Stream for UpdatesSubscriber<Item, Gap>
350where
351    Item: Clone,
352    Gap: Clone,
353{
354    type Item = Vec<Update<Item, Gap>>;
355
356    fn poll_next(
357        self: std::pin::Pin<&mut Self>,
358        context: &mut std::task::Context<'_>,
359    ) -> std::task::Poll<Option<Self::Item>> {
360        let Some(updates) = self.updates.upgrade() else {
361            // The `ObservableUpdates` has been dropped. It's time to close this stream.
362            return std::task::Poll::Ready(None);
363        };
364
365        let mut updates = updates.write().unwrap();
366        let the_updates = updates.take_with_token(self.token);
367
368        // No updates.
369        if the_updates.is_empty() {
370            // Let's register the waker.
371            updates.wakers.push(context.waker().clone());
372
373            // The stream is pending.
374            return std::task::Poll::Pending;
375        }
376
377        // There is updates! Let's forward them in this stream.
378        std::task::Poll::Ready(Some(the_updates.to_owned()))
379    }
380}
381
382#[cfg(test)]
383impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
384    fn drop(&mut self) {
385        // Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
386        // This is important so that the garbage collector can do its jobs correctly
387        // without a dead dangling reader token.
388        if let Some(updates) = self.updates.upgrade() {
389            let mut updates = updates.write().unwrap();
390
391            // Remove the reader token from `UpdatesInner`.
392            // It's safe to ignore the result of `remove` here: `None` means the token was
393            // already removed (note: it should be unreachable).
394            let _ = updates.last_index_per_reader.remove(&self.token);
395        }
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use std::{
402        sync::{Arc, Mutex},
403        task::{Context, Poll, Wake},
404    };
405
406    use assert_matches::assert_matches;
407    use futures_core::Stream;
408    use futures_util::pin_mut;
409
410    use super::{super::LinkedChunk, ChunkIdentifier, Position, UpdatesInner};
411
412    #[test]
413    fn test_updates_take_and_garbage_collector() {
414        use super::Update::*;
415
416        let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
417
418        // Simulate another updates “reader”, it can a subscriber.
419        let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
420        let other_token = {
421            let updates = linked_chunk.updates().unwrap();
422            let mut inner = updates.inner.write().unwrap();
423            inner.last_token += 1;
424
425            let other_token = inner.last_token;
426            inner.last_index_per_reader.insert(other_token, 0);
427
428            other_token
429        };
430
431        // There is an initial update.
432        {
433            let updates = linked_chunk.updates().unwrap();
434
435            assert_eq!(
436                updates.take(),
437                &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
438            );
439            assert_eq!(
440                updates.inner.write().unwrap().take_with_token(other_token),
441                &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
442            );
443        }
444
445        // No new update.
446        {
447            let updates = linked_chunk.updates().unwrap();
448
449            assert!(updates.take().is_empty());
450            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
451        }
452
453        linked_chunk.push_items_back(['a']);
454        linked_chunk.push_items_back(['b']);
455        linked_chunk.push_items_back(['c']);
456
457        // Scenario 1: “main” takes the new updates, “other” doesn't take the new
458        // updates.
459        //
460        // 0   1   2   3
461        // +---+---+---+
462        // | a | b | c |
463        // +---+---+---+
464        //
465        // “main” will move its index from 0 to 3.
466        // “other” won't move its index.
467        {
468            let updates = linked_chunk.updates().unwrap();
469
470            {
471                // Inspect number of updates in memory.
472                assert_eq!(updates.inner.read().unwrap().len(), 3);
473            }
474
475            assert_eq!(
476                updates.take(),
477                &[
478                    PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
479                    PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
480                    PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
481                ]
482            );
483
484            {
485                let inner = updates.inner.read().unwrap();
486
487                // Inspect number of updates in memory.
488                // It must be the same number as before as the garbage collector weren't not
489                // able to remove any unused updates.
490                assert_eq!(inner.len(), 3);
491
492                // Inspect the indices.
493                let indices = &inner.last_index_per_reader;
494
495                assert_eq!(indices.get(&main_token), Some(&3));
496                assert_eq!(indices.get(&other_token), Some(&0));
497            }
498        }
499
500        linked_chunk.push_items_back(['d']);
501        linked_chunk.push_items_back(['e']);
502        linked_chunk.push_items_back(['f']);
503
504        // Scenario 2: “other“ takes the new updates, “main” doesn't take the
505        // new updates.
506        //
507        // 0   1   2   3   4   5   6
508        // +---+---+---+---+---+---+
509        // | a | b | c | d | e | f |
510        // +---+---+---+---+---+---+
511        //
512        // “main” won't move its index.
513        // “other” will move its index from 0 to 6.
514        {
515            let updates = linked_chunk.updates().unwrap();
516
517            assert_eq!(
518                updates.inner.write().unwrap().take_with_token(other_token),
519                &[
520                    PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
521                    PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
522                    PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
523                    PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
524                    PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
525                    PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
526                ]
527            );
528
529            {
530                let inner = updates.inner.read().unwrap();
531
532                // Inspect number of updates in memory.
533                // It must be the same number as before as the garbage collector will be able to
534                // remove unused updates but at the next call…
535                assert_eq!(inner.len(), 6);
536
537                // Inspect the indices.
538                let indices = &inner.last_index_per_reader;
539
540                assert_eq!(indices.get(&main_token), Some(&3));
541                assert_eq!(indices.get(&other_token), Some(&6));
542            }
543        }
544
545        // Scenario 3: “other” take new updates, but there is none, “main”
546        // doesn't take new updates. The garbage collector will run and collect
547        // unused updates.
548        //
549        // 0   1   2   3
550        // +---+---+---+
551        // | d | e | f |
552        // +---+---+---+
553        //
554        // “main” will have its index updated from 3 to 0.
555        // “other” will have its index updated from 6 to 3.
556        {
557            let updates = linked_chunk.updates().unwrap();
558
559            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
560
561            {
562                let inner = updates.inner.read().unwrap();
563
564                // Inspect number of updates in memory.
565                // The garbage collector has removed unused updates.
566                assert_eq!(inner.len(), 3);
567
568                // Inspect the indices. They must have been adjusted.
569                let indices = &inner.last_index_per_reader;
570
571                assert_eq!(indices.get(&main_token), Some(&0));
572                assert_eq!(indices.get(&other_token), Some(&3));
573            }
574        }
575
576        linked_chunk.push_items_back(['g']);
577        linked_chunk.push_items_back(['h']);
578        linked_chunk.push_items_back(['i']);
579
580        // Scenario 4: both “main” and “other” take the new updates.
581        //
582        // 0   1   2   3   4   5   6
583        // +---+---+---+---+---+---+
584        // | d | e | f | g | h | i |
585        // +---+---+---+---+---+---+
586        //
587        // “main” will have its index updated from 0 to 3.
588        // “other” will have its index updated from 6 to 3.
589        {
590            let updates = linked_chunk.updates().unwrap();
591
592            assert_eq!(
593                updates.take(),
594                &[
595                    PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
596                    PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
597                    PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
598                    PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
599                    PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
600                    PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
601                ]
602            );
603            assert_eq!(
604                updates.inner.write().unwrap().take_with_token(other_token),
605                &[
606                    PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
607                    PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
608                    PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
609                ]
610            );
611
612            {
613                let inner = updates.inner.read().unwrap();
614
615                // Inspect number of updates in memory.
616                // The garbage collector had a chance to collect the first 3 updates.
617                assert_eq!(inner.len(), 3);
618
619                // Inspect the indices.
620                let indices = &inner.last_index_per_reader;
621
622                assert_eq!(indices.get(&main_token), Some(&3));
623                assert_eq!(indices.get(&other_token), Some(&3));
624            }
625        }
626
627        // Scenario 5: no more updates but they both try to take new updates.
628        // The garbage collector will collect all updates as all of them as
629        // been read already.
630        //
631        // “main” will have its index updated from 0 to 0.
632        // “other” will have its index updated from 3 to 0.
633        {
634            let updates = linked_chunk.updates().unwrap();
635
636            assert!(updates.take().is_empty());
637            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
638
639            {
640                let inner = updates.inner.read().unwrap();
641
642                // Inspect number of updates in memory.
643                // The garbage collector had a chance to collect all updates.
644                assert_eq!(inner.len(), 0);
645
646                // Inspect the indices.
647                let indices = &inner.last_index_per_reader;
648
649                assert_eq!(indices.get(&main_token), Some(&0));
650                assert_eq!(indices.get(&other_token), Some(&0));
651            }
652        }
653    }
654
655    struct CounterWaker {
656        number_of_wakeup: Mutex<usize>,
657    }
658
659    impl Wake for CounterWaker {
660        fn wake(self: Arc<Self>) {
661            *self.number_of_wakeup.lock().unwrap() += 1;
662        }
663    }
664
665    #[test]
666    fn test_updates_stream() {
667        use super::Update::*;
668
669        let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
670        let waker = counter_waker.clone().into();
671        let mut context = Context::from_waker(&waker);
672
673        let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
674
675        let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
676        pin_mut!(updates_subscriber);
677
678        // Initial update, stream is ready.
679        assert_matches!(
680            updates_subscriber.as_mut().poll_next(&mut context),
681            Poll::Ready(Some(items)) => {
682                assert_eq!(
683                    items,
684                    &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
685                );
686            }
687        );
688        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
689        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
690
691        // Let's generate an update.
692        linked_chunk.push_items_back(['a']);
693
694        // The waker must have been called.
695        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
696
697        // There is an update! Right after that, the stream is pending again.
698        assert_matches!(
699            updates_subscriber.as_mut().poll_next(&mut context),
700            Poll::Ready(Some(items)) => {
701                assert_eq!(
702                    items,
703                    &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
704                );
705            }
706        );
707        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
708
709        // Let's generate two other updates.
710        linked_chunk.push_items_back(['b']);
711        linked_chunk.push_items_back(['c']);
712
713        // The waker must have been called only once for the two updates.
714        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
715
716        // We can consume the updates without the stream, but the stream continues to
717        // know it has updates.
718        assert_eq!(
719            linked_chunk.updates().unwrap().take(),
720            &[
721                NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None },
722                PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
723                PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
724                PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
725            ]
726        );
727        assert_matches!(
728            updates_subscriber.as_mut().poll_next(&mut context),
729            Poll::Ready(Some(items)) => {
730                assert_eq!(
731                    items,
732                    &[
733                        PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
734                        PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
735                    ]
736                );
737            }
738        );
739        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
740
741        // When dropping the `LinkedChunk`, it closes the stream.
742        drop(linked_chunk);
743        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
744
745        // Wakers calls have not changed.
746        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
747    }
748
749    #[test]
750    fn test_updates_multiple_streams() {
751        use super::Update::*;
752
753        let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
754        let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
755
756        let waker1 = counter_waker1.clone().into();
757        let waker2 = counter_waker2.clone().into();
758
759        let mut context1 = Context::from_waker(&waker1);
760        let mut context2 = Context::from_waker(&waker2);
761
762        let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
763
764        let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
765        pin_mut!(updates_subscriber1);
766
767        // Scope for `updates_subscriber2`.
768        let updates_subscriber2_token = {
769            let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
770            pin_mut!(updates_subscriber2);
771
772            // Initial updates, streams are ready.
773            assert_matches!(
774                updates_subscriber1.as_mut().poll_next(&mut context1),
775                Poll::Ready(Some(items)) => {
776                    assert_eq!(
777                        items,
778                        &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
779                    );
780                }
781            );
782            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
783            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
784
785            assert_matches!(
786                updates_subscriber2.as_mut().poll_next(&mut context2),
787                Poll::Ready(Some(items)) => {
788                    assert_eq!(
789                        items,
790                        &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
791                    );
792                }
793            );
794            assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
795            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);
796
797            // Let's generate an update.
798            linked_chunk.push_items_back(['a']);
799
800            // The wakers must have been called.
801            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
802            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);
803
804            // There is an update! Right after that, the streams are pending again.
805            assert_matches!(
806                updates_subscriber1.as_mut().poll_next(&mut context1),
807                Poll::Ready(Some(items)) => {
808                    assert_eq!(
809                        items,
810                        &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
811                    );
812                }
813            );
814            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
815            assert_matches!(
816                updates_subscriber2.as_mut().poll_next(&mut context2),
817                Poll::Ready(Some(items)) => {
818                    assert_eq!(
819                        items,
820                        &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
821                    );
822                }
823            );
824            assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
825
826            // Let's generate two other updates.
827            linked_chunk.push_items_back(['b']);
828            linked_chunk.push_items_back(['c']);
829
830            // A waker is consumed when called. The first call to `push_items_back` will
831            // call and consume the wakers. The second call to `push_items_back` will do
832            // nothing as the wakers have been consumed. New wakers will be registered on
833            // polling.
834            //
835            // So, the waker must have been called only once for the two updates.
836            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
837            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);
838
839            // Let's poll `updates_subscriber1` only.
840            assert_matches!(
841                updates_subscriber1.as_mut().poll_next(&mut context1),
842                Poll::Ready(Some(items)) => {
843                    assert_eq!(
844                        items,
845                        &[
846                            PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
847                            PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
848                        ]
849                    );
850                }
851            );
852            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
853
854            // For the sake of this test, we also need to advance the main reader token.
855            let _ = linked_chunk.updates().unwrap().take();
856            let _ = linked_chunk.updates().unwrap().take();
857
858            // If we inspect the garbage collector state, `a`, `b` and `c` should still be
859            // present because not all of them have been consumed by `updates_subscriber2`
860            // yet.
861            {
862                let updates = linked_chunk.updates().unwrap();
863
864                let inner = updates.inner.read().unwrap();
865
866                // Inspect number of updates in memory.
867                // We get 2 because the garbage collector runs before data are taken, not after:
868                // `updates_subscriber2` has read `a` only, so `b` and `c` remain.
869                assert_eq!(inner.len(), 2);
870
871                // Inspect the indices.
872                let indices = &inner.last_index_per_reader;
873
874                assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
875                assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
876            }
877
878            // Poll `updates_subscriber1` again: there is no new update so it must be
879            // pending.
880            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
881
882            // The state of the garbage collector is unchanged: `a`, `b` and `c` are still
883            // in memory.
884            {
885                let updates = linked_chunk.updates().unwrap();
886
887                let inner = updates.inner.read().unwrap();
888
889                // Inspect number of updates in memory. Value is unchanged.
890                assert_eq!(inner.len(), 2);
891
892                // Inspect the indices. They are unchanged.
893                let indices = &inner.last_index_per_reader;
894
895                assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
896                assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
897            }
898
899            updates_subscriber2.token
900            // Drop `updates_subscriber2`!
901        };
902
903        // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
904        // still no new update, but it will run the garbage collector again, and this
905        // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
906        // collector must be empty.
907        assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
908
909        // Inspect the garbage collector.
910        {
911            let updates = linked_chunk.updates().unwrap();
912
913            let inner = updates.inner.read().unwrap();
914
915            // Inspect number of updates in memory.
916            assert_eq!(inner.len(), 0);
917
918            // Inspect the indices.
919            let indices = &inner.last_index_per_reader;
920
921            assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
922            assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown!
923        }
924
925        // When dropping the `LinkedChunk`, it closes the stream.
926        drop(linked_chunk);
927        assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
928    }
929}