matrix_sdk_common/linked_chunk/
updates.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    pin::Pin,
18    sync::{Arc, RwLock, Weak},
19    task::{Context, Poll, Waker},
20};
21
22use futures_core::Stream;
23
24use super::{ChunkIdentifier, Position};
25
26/// Represent the updates that have happened inside a [`LinkedChunk`].
27///
28/// To retrieve the updates, use [`LinkedChunk::updates`].
29///
30/// These updates are useful to store a `LinkedChunk` in another form of
31/// storage, like a database or something similar.
32///
33/// [`LinkedChunk`]: super::LinkedChunk
34/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
35#[derive(Debug, Clone, PartialEq)]
36pub enum Update<Item, Gap> {
37    /// A new chunk of kind Items has been created.
38    NewItemsChunk {
39        /// The identifier of the previous chunk of this new chunk.
40        previous: Option<ChunkIdentifier>,
41
42        /// The identifier of the new chunk.
43        new: ChunkIdentifier,
44
45        /// The identifier of the next chunk of this new chunk.
46        next: Option<ChunkIdentifier>,
47    },
48
49    /// A new chunk of kind Gap has been created.
50    NewGapChunk {
51        /// The identifier of the previous chunk of this new chunk.
52        previous: Option<ChunkIdentifier>,
53
54        /// The identifier of the new chunk.
55        new: ChunkIdentifier,
56
57        /// The identifier of the next chunk of this new chunk.
58        next: Option<ChunkIdentifier>,
59
60        /// The content of the chunk.
61        gap: Gap,
62    },
63
64    /// A chunk has been removed.
65    RemoveChunk(ChunkIdentifier),
66
67    /// Items are pushed inside a chunk of kind Items.
68    PushItems {
69        /// The [`Position`] of the items.
70        ///
71        /// This value is given to prevent the need for position computations by
72        /// the update readers. Items are pushed, so the positions should be
73        /// incrementally computed from the previous items, which requires the
74        /// reading of the last previous item. With `at`, the update readers no
75        /// longer need to do so.
76        at: Position,
77
78        /// The items.
79        items: Vec<Item>,
80    },
81
82    /// An item has been replaced in the linked chunk.
83    ///
84    /// The `at` position MUST resolve to the actual position an existing *item*
85    /// (not a gap).
86    ReplaceItem {
87        /// The position of the item that's being replaced.
88        at: Position,
89
90        /// The new value for the item.
91        item: Item,
92    },
93
94    /// An item has been removed inside a chunk of kind Items.
95    RemoveItem {
96        /// The [`Position`] of the item.
97        at: Position,
98    },
99
100    /// The last items of a chunk have been detached, i.e. the chunk has been
101    /// truncated.
102    DetachLastItems {
103        /// The split position. Before this position (`..position`), items are
104        /// kept, from this position (`position..`), items are
105        /// detached.
106        at: Position,
107    },
108
109    /// Detached items (see [`Self::DetachLastItems`]) starts being reattached.
110    StartReattachItems,
111
112    /// Reattaching items (see [`Self::StartReattachItems`]) is finished.
113    EndReattachItems,
114
115    /// All chunks have been cleared, i.e. all items and all gaps have been
116    /// dropped.
117    Clear,
118}
119
120/// A collection of [`Update`]s that can be observed.
121///
122/// Get a value for this type with [`LinkedChunk::updates`].
123///
124/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
125#[derive(Debug)]
126pub struct ObservableUpdates<Item, Gap> {
127    pub(super) inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
128}
129
130impl<Item, Gap> ObservableUpdates<Item, Gap> {
131    /// Create a new [`ObservableUpdates`].
132    pub(super) fn new() -> Self {
133        Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
134    }
135
136    /// Push a new update.
137    pub(super) fn push(&mut self, update: Update<Item, Gap>) {
138        self.inner.write().unwrap().push(update);
139    }
140
141    /// Take new updates.
142    ///
143    /// Updates that have been taken will not be read again.
144    pub fn take(&mut self) -> Vec<Update<Item, Gap>>
145    where
146        Item: Clone,
147        Gap: Clone,
148    {
149        self.inner.write().unwrap().take().to_owned()
150    }
151
152    /// Subscribe to updates by using a [`Stream`].
153    #[cfg(test)]
154    pub(super) fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
155        // A subscriber is a new update reader, it needs its own token.
156        let token = self.new_reader_token();
157
158        UpdatesSubscriber::new(Arc::downgrade(&self.inner), token)
159    }
160
161    /// Generate a new [`ReaderToken`].
162    pub(super) fn new_reader_token(&mut self) -> ReaderToken {
163        let mut inner = self.inner.write().unwrap();
164
165        // Add 1 before reading the `last_token`, in this particular order, because the
166        // 0 token is reserved by `MAIN_READER_TOKEN`.
167        inner.last_token += 1;
168        let last_token = inner.last_token;
169
170        inner.last_index_per_reader.insert(last_token, 0);
171
172        last_token
173    }
174}
175
176/// A token used to represent readers that read the updates in
177/// [`UpdatesInner`].
178pub(super) type ReaderToken = usize;
179
180/// Inner type for [`ObservableUpdates`].
181///
182/// The particularity of this type is that multiple readers can read the
183/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
184/// [`ObservableUpdates`]) is considered to be the _main reader_ (it has the
185/// token [`Self::MAIN_READER_TOKEN`]).
186///
187/// An update that have been read by all readers are garbage collected to be
188/// removed from the memory. An update will never be read twice by the same
189/// reader.
190///
191/// Why do we need multiple readers? The public API reads the updates with
192/// [`ObservableUpdates::take`], but the private API must also read the updates
193/// for example with [`UpdatesSubscriber`]. Of course, they can be multiple
194/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple
195/// readers.
196#[derive(Debug)]
197pub(super) struct UpdatesInner<Item, Gap> {
198    /// All the updates that have not been read by all readers.
199    updates: Vec<Update<Item, Gap>>,
200
201    /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
202    /// A reader is identified by a [`ReaderToken`].
203    ///
204    /// To each reader token is associated an index that represents the index of
205    /// the last reading. It is used to never return the same update twice.
206    last_index_per_reader: HashMap<ReaderToken, usize>,
207
208    /// The last generated token. This is useful to generate new token.
209    last_token: ReaderToken,
210
211    /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed
212    /// everytime it is called.
213    wakers: Vec<Waker>,
214}
215
216impl<Item, Gap> UpdatesInner<Item, Gap> {
217    /// The token used by the main reader. See [`Self::take`] to learn more.
218    const MAIN_READER_TOKEN: ReaderToken = 0;
219
220    /// Create a new [`Self`].
221    fn new() -> Self {
222        Self {
223            updates: Vec::with_capacity(8),
224            last_index_per_reader: {
225                let mut map = HashMap::with_capacity(2);
226                map.insert(Self::MAIN_READER_TOKEN, 0);
227
228                map
229            },
230            last_token: Self::MAIN_READER_TOKEN,
231            wakers: Vec::with_capacity(2),
232        }
233    }
234
235    /// Push a new update.
236    fn push(&mut self, update: Update<Item, Gap>) {
237        self.updates.push(update);
238
239        // Wake them up \o/.
240        for waker in self.wakers.drain(..) {
241            waker.wake();
242        }
243    }
244
245    /// Take new updates; it considers the caller is the main reader, i.e. it
246    /// will use the [`Self::MAIN_READER_TOKEN`].
247    ///
248    /// Updates that have been read will never be read again by the current
249    /// reader.
250    ///
251    /// Learn more by reading [`Self::take_with_token`].
252    fn take(&mut self) -> &[Update<Item, Gap>] {
253        self.take_with_token(Self::MAIN_READER_TOKEN)
254    }
255
256    /// Take new updates with a particular reader token.
257    ///
258    /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
259    /// A reader is identified by a [`ReaderToken`]. Every reader can
260    /// take/read/consume each update only once. An internal index is stored
261    /// per reader token to know where to start reading updates next time this
262    /// method is called.
263    pub(super) fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
264        // Let's garbage collect unused updates.
265        self.garbage_collect();
266
267        let index = self
268            .last_index_per_reader
269            .get_mut(&token)
270            .expect("Given `UpdatesToken` does not map to any index");
271
272        // Read new updates, and update the index.
273        let slice = &self.updates[*index..];
274        *index = self.updates.len();
275
276        slice
277    }
278
279    /// Return the number of updates in the buffer.
280    #[cfg(test)]
281    fn len(&self) -> usize {
282        self.updates.len()
283    }
284
285    /// Garbage collect unused updates. An update is considered unused when it's
286    /// been read by all readers.
287    ///
288    /// Basically, it reduces to finding the smallest last index for all
289    /// readers, and clear from 0 to that index.
290    fn garbage_collect(&mut self) {
291        let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
292
293        if min_index > 0 {
294            let _ = self.updates.drain(0..min_index);
295
296            // Let's shift the indices to the left by `min_index` to preserve them.
297            for index in self.last_index_per_reader.values_mut() {
298                *index -= min_index;
299            }
300        }
301    }
302}
303
304/// A subscriber to [`ObservableUpdates`]. It is helpful to receive updates via
305/// a [`Stream`].
306pub(super) struct UpdatesSubscriber<Item, Gap> {
307    /// Weak reference to [`UpdatesInner`].
308    ///
309    /// Using a weak reference allows [`ObservableUpdates`] to be dropped
310    /// freely even if a subscriber exists.
311    updates: Weak<RwLock<UpdatesInner<Item, Gap>>>,
312
313    /// The token to read the updates.
314    token: ReaderToken,
315}
316
317impl<Item, Gap> UpdatesSubscriber<Item, Gap> {
318    /// Create a new [`Self`].
319    #[cfg(test)]
320    fn new(updates: Weak<RwLock<UpdatesInner<Item, Gap>>>, token: ReaderToken) -> Self {
321        Self { updates, token }
322    }
323}
324
325impl<Item, Gap> Stream for UpdatesSubscriber<Item, Gap>
326where
327    Item: Clone,
328    Gap: Clone,
329{
330    type Item = Vec<Update<Item, Gap>>;
331
332    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333        let Some(updates) = self.updates.upgrade() else {
334            // The `ObservableUpdates` has been dropped. It's time to close this stream.
335            return Poll::Ready(None);
336        };
337
338        let mut updates = updates.write().unwrap();
339        let the_updates = updates.take_with_token(self.token);
340
341        // No updates.
342        if the_updates.is_empty() {
343            // Let's register the waker.
344            updates.wakers.push(context.waker().clone());
345
346            // The stream is pending.
347            return Poll::Pending;
348        }
349
350        // There is updates! Let's forward them in this stream.
351        Poll::Ready(Some(the_updates.to_owned()))
352    }
353}
354
355impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
356    fn drop(&mut self) {
357        // Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
358        // This is important so that the garbage collector can do its jobs correctly
359        // without a dead dangling reader token.
360        if let Some(updates) = self.updates.upgrade() {
361            let mut updates = updates.write().unwrap();
362
363            // Remove the reader token from `UpdatesInner`.
364            // It's safe to ignore the result of `remove` here: `None` means the token was
365            // already removed (note: it should be unreachable).
366            let _ = updates.last_index_per_reader.remove(&self.token);
367        }
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use std::{
374        sync::{Arc, Mutex},
375        task::{Context, Poll, Wake},
376    };
377
378    use assert_matches::assert_matches;
379    use futures_util::pin_mut;
380
381    use super::{super::LinkedChunk, ChunkIdentifier, Position, Stream, UpdatesInner};
382
383    #[test]
384    fn test_updates_take_and_garbage_collector() {
385        use super::Update::*;
386
387        let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
388
389        // Simulate another updates “reader”, it can a subscriber.
390        let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
391        let other_token = {
392            let updates = linked_chunk.updates().unwrap();
393            let mut inner = updates.inner.write().unwrap();
394            inner.last_token += 1;
395
396            let other_token = inner.last_token;
397            inner.last_index_per_reader.insert(other_token, 0);
398
399            other_token
400        };
401
402        // There is an initial update.
403        {
404            let updates = linked_chunk.updates().unwrap();
405
406            assert_eq!(
407                updates.take(),
408                &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
409            );
410            assert_eq!(
411                updates.inner.write().unwrap().take_with_token(other_token),
412                &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
413            );
414        }
415
416        // No new update.
417        {
418            let updates = linked_chunk.updates().unwrap();
419
420            assert!(updates.take().is_empty());
421            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
422        }
423
424        linked_chunk.push_items_back(['a']);
425        linked_chunk.push_items_back(['b']);
426        linked_chunk.push_items_back(['c']);
427
428        // Scenario 1: “main” takes the new updates, “other” doesn't take the new
429        // updates.
430        //
431        // 0   1   2   3
432        // +---+---+---+
433        // | a | b | c |
434        // +---+---+---+
435        //
436        // “main” will move its index from 0 to 3.
437        // “other” won't move its index.
438        {
439            let updates = linked_chunk.updates().unwrap();
440
441            {
442                // Inspect number of updates in memory.
443                assert_eq!(updates.inner.read().unwrap().len(), 3);
444            }
445
446            assert_eq!(
447                updates.take(),
448                &[
449                    PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
450                    PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
451                    PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
452                ]
453            );
454
455            {
456                let inner = updates.inner.read().unwrap();
457
458                // Inspect number of updates in memory.
459                // It must be the same number as before as the garbage collector weren't not
460                // able to remove any unused updates.
461                assert_eq!(inner.len(), 3);
462
463                // Inspect the indices.
464                let indices = &inner.last_index_per_reader;
465
466                assert_eq!(indices.get(&main_token), Some(&3));
467                assert_eq!(indices.get(&other_token), Some(&0));
468            }
469        }
470
471        linked_chunk.push_items_back(['d']);
472        linked_chunk.push_items_back(['e']);
473        linked_chunk.push_items_back(['f']);
474
475        // Scenario 2: “other“ takes the new updates, “main” doesn't take the
476        // new updates.
477        //
478        // 0   1   2   3   4   5   6
479        // +---+---+---+---+---+---+
480        // | a | b | c | d | e | f |
481        // +---+---+---+---+---+---+
482        //
483        // “main” won't move its index.
484        // “other” will move its index from 0 to 6.
485        {
486            let updates = linked_chunk.updates().unwrap();
487
488            assert_eq!(
489                updates.inner.write().unwrap().take_with_token(other_token),
490                &[
491                    PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
492                    PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
493                    PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
494                    PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
495                    PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
496                    PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
497                ]
498            );
499
500            {
501                let inner = updates.inner.read().unwrap();
502
503                // Inspect number of updates in memory.
504                // It must be the same number as before as the garbage collector will be able to
505                // remove unused updates but at the next call…
506                assert_eq!(inner.len(), 6);
507
508                // Inspect the indices.
509                let indices = &inner.last_index_per_reader;
510
511                assert_eq!(indices.get(&main_token), Some(&3));
512                assert_eq!(indices.get(&other_token), Some(&6));
513            }
514        }
515
516        // Scenario 3: “other” take new updates, but there is none, “main”
517        // doesn't take new updates. The garbage collector will run and collect
518        // unused updates.
519        //
520        // 0   1   2   3
521        // +---+---+---+
522        // | d | e | f |
523        // +---+---+---+
524        //
525        // “main” will have its index updated from 3 to 0.
526        // “other” will have its index updated from 6 to 3.
527        {
528            let updates = linked_chunk.updates().unwrap();
529
530            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
531
532            {
533                let inner = updates.inner.read().unwrap();
534
535                // Inspect number of updates in memory.
536                // The garbage collector has removed unused updates.
537                assert_eq!(inner.len(), 3);
538
539                // Inspect the indices. They must have been adjusted.
540                let indices = &inner.last_index_per_reader;
541
542                assert_eq!(indices.get(&main_token), Some(&0));
543                assert_eq!(indices.get(&other_token), Some(&3));
544            }
545        }
546
547        linked_chunk.push_items_back(['g']);
548        linked_chunk.push_items_back(['h']);
549        linked_chunk.push_items_back(['i']);
550
551        // Scenario 4: both “main” and “other” take the new updates.
552        //
553        // 0   1   2   3   4   5   6
554        // +---+---+---+---+---+---+
555        // | d | e | f | g | h | i |
556        // +---+---+---+---+---+---+
557        //
558        // “main” will have its index updated from 0 to 3.
559        // “other” will have its index updated from 6 to 3.
560        {
561            let updates = linked_chunk.updates().unwrap();
562
563            assert_eq!(
564                updates.take(),
565                &[
566                    PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
567                    PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
568                    PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
569                    PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
570                    PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
571                    PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
572                ]
573            );
574            assert_eq!(
575                updates.inner.write().unwrap().take_with_token(other_token),
576                &[
577                    PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
578                    PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
579                    PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
580                ]
581            );
582
583            {
584                let inner = updates.inner.read().unwrap();
585
586                // Inspect number of updates in memory.
587                // The garbage collector had a chance to collect the first 3 updates.
588                assert_eq!(inner.len(), 3);
589
590                // Inspect the indices.
591                let indices = &inner.last_index_per_reader;
592
593                assert_eq!(indices.get(&main_token), Some(&3));
594                assert_eq!(indices.get(&other_token), Some(&3));
595            }
596        }
597
598        // Scenario 5: no more updates but they both try to take new updates.
599        // The garbage collector will collect all updates as all of them as
600        // been read already.
601        //
602        // “main” will have its index updated from 0 to 0.
603        // “other” will have its index updated from 3 to 0.
604        {
605            let updates = linked_chunk.updates().unwrap();
606
607            assert!(updates.take().is_empty());
608            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
609
610            {
611                let inner = updates.inner.read().unwrap();
612
613                // Inspect number of updates in memory.
614                // The garbage collector had a chance to collect all updates.
615                assert_eq!(inner.len(), 0);
616
617                // Inspect the indices.
618                let indices = &inner.last_index_per_reader;
619
620                assert_eq!(indices.get(&main_token), Some(&0));
621                assert_eq!(indices.get(&other_token), Some(&0));
622            }
623        }
624    }
625
626    struct CounterWaker {
627        number_of_wakeup: Mutex<usize>,
628    }
629
630    impl Wake for CounterWaker {
631        fn wake(self: Arc<Self>) {
632            *self.number_of_wakeup.lock().unwrap() += 1;
633        }
634    }
635
636    #[test]
637    fn test_updates_stream() {
638        use super::Update::*;
639
640        let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
641        let waker = counter_waker.clone().into();
642        let mut context = Context::from_waker(&waker);
643
644        let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
645
646        let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
647        pin_mut!(updates_subscriber);
648
649        // Initial update, stream is ready.
650        assert_matches!(
651            updates_subscriber.as_mut().poll_next(&mut context),
652            Poll::Ready(Some(items)) => {
653                assert_eq!(
654                    items,
655                    &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
656                );
657            }
658        );
659        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
660        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
661
662        // Let's generate an update.
663        linked_chunk.push_items_back(['a']);
664
665        // The waker must have been called.
666        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
667
668        // There is an update! Right after that, the stream is pending again.
669        assert_matches!(
670            updates_subscriber.as_mut().poll_next(&mut context),
671            Poll::Ready(Some(items)) => {
672                assert_eq!(
673                    items,
674                    &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
675                );
676            }
677        );
678        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
679
680        // Let's generate two other updates.
681        linked_chunk.push_items_back(['b']);
682        linked_chunk.push_items_back(['c']);
683
684        // The waker must have been called only once for the two updates.
685        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
686
687        // We can consume the updates without the stream, but the stream continues to
688        // know it has updates.
689        assert_eq!(
690            linked_chunk.updates().unwrap().take(),
691            &[
692                NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None },
693                PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
694                PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
695                PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
696            ]
697        );
698        assert_matches!(
699            updates_subscriber.as_mut().poll_next(&mut context),
700            Poll::Ready(Some(items)) => {
701                assert_eq!(
702                    items,
703                    &[
704                        PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
705                        PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
706                    ]
707                );
708            }
709        );
710        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
711
712        // When dropping the `LinkedChunk`, it closes the stream.
713        drop(linked_chunk);
714        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
715
716        // Wakers calls have not changed.
717        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
718    }
719
720    #[test]
721    fn test_updates_multiple_streams() {
722        use super::Update::*;
723
724        let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
725        let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
726
727        let waker1 = counter_waker1.clone().into();
728        let waker2 = counter_waker2.clone().into();
729
730        let mut context1 = Context::from_waker(&waker1);
731        let mut context2 = Context::from_waker(&waker2);
732
733        let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
734
735        let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
736        pin_mut!(updates_subscriber1);
737
738        // Scope for `updates_subscriber2`.
739        let updates_subscriber2_token = {
740            let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
741            pin_mut!(updates_subscriber2);
742
743            // Initial updates, streams are ready.
744            assert_matches!(
745                updates_subscriber1.as_mut().poll_next(&mut context1),
746                Poll::Ready(Some(items)) => {
747                    assert_eq!(
748                        items,
749                        &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
750                    );
751                }
752            );
753            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
754            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
755
756            assert_matches!(
757                updates_subscriber2.as_mut().poll_next(&mut context2),
758                Poll::Ready(Some(items)) => {
759                    assert_eq!(
760                        items,
761                        &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
762                    );
763                }
764            );
765            assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
766            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);
767
768            // Let's generate an update.
769            linked_chunk.push_items_back(['a']);
770
771            // The wakers must have been called.
772            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
773            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);
774
775            // There is an update! Right after that, the streams are pending again.
776            assert_matches!(
777                updates_subscriber1.as_mut().poll_next(&mut context1),
778                Poll::Ready(Some(items)) => {
779                    assert_eq!(
780                        items,
781                        &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
782                    );
783                }
784            );
785            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
786            assert_matches!(
787                updates_subscriber2.as_mut().poll_next(&mut context2),
788                Poll::Ready(Some(items)) => {
789                    assert_eq!(
790                        items,
791                        &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
792                    );
793                }
794            );
795            assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
796
797            // Let's generate two other updates.
798            linked_chunk.push_items_back(['b']);
799            linked_chunk.push_items_back(['c']);
800
801            // A waker is consumed when called. The first call to `push_items_back` will
802            // call and consume the wakers. The second call to `push_items_back` will do
803            // nothing as the wakers have been consumed. New wakers will be registered on
804            // polling.
805            //
806            // So, the waker must have been called only once for the two updates.
807            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
808            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);
809
810            // Let's poll `updates_subscriber1` only.
811            assert_matches!(
812                updates_subscriber1.as_mut().poll_next(&mut context1),
813                Poll::Ready(Some(items)) => {
814                    assert_eq!(
815                        items,
816                        &[
817                            PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
818                            PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
819                        ]
820                    );
821                }
822            );
823            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
824
825            // For the sake of this test, we also need to advance the main reader token.
826            let _ = linked_chunk.updates().unwrap().take();
827            let _ = linked_chunk.updates().unwrap().take();
828
829            // If we inspect the garbage collector state, `a`, `b` and `c` should still be
830            // present because not all of them have been consumed by `updates_subscriber2`
831            // yet.
832            {
833                let updates = linked_chunk.updates().unwrap();
834
835                let inner = updates.inner.read().unwrap();
836
837                // Inspect number of updates in memory.
838                // We get 2 because the garbage collector runs before data are taken, not after:
839                // `updates_subscriber2` has read `a` only, so `b` and `c` remain.
840                assert_eq!(inner.len(), 2);
841
842                // Inspect the indices.
843                let indices = &inner.last_index_per_reader;
844
845                assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
846                assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
847            }
848
849            // Poll `updates_subscriber1` again: there is no new update so it must be
850            // pending.
851            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
852
853            // The state of the garbage collector is unchanged: `a`, `b` and `c` are still
854            // in memory.
855            {
856                let updates = linked_chunk.updates().unwrap();
857
858                let inner = updates.inner.read().unwrap();
859
860                // Inspect number of updates in memory. Value is unchanged.
861                assert_eq!(inner.len(), 2);
862
863                // Inspect the indices. They are unchanged.
864                let indices = &inner.last_index_per_reader;
865
866                assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
867                assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
868            }
869
870            updates_subscriber2.token
871            // Drop `updates_subscriber2`!
872        };
873
874        // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
875        // still no new update, but it will run the garbage collector again, and this
876        // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
877        // collector must be empty.
878        assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
879
880        // Inspect the garbage collector.
881        {
882            let updates = linked_chunk.updates().unwrap();
883
884            let inner = updates.inner.read().unwrap();
885
886            // Inspect number of updates in memory.
887            assert_eq!(inner.len(), 0);
888
889            // Inspect the indices.
890            let indices = &inner.last_index_per_reader;
891
892            assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
893            assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown!
894        }
895
896        // When dropping the `LinkedChunk`, it closes the stream.
897        drop(linked_chunk);
898        assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
899    }
900}