matrix_sdk_common/linked_chunk/
updates.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    sync::{Arc, RwLock},
18    task::Waker,
19};
20
21use super::{ChunkIdentifier, Position};
22
23/// Represent the updates that have happened inside a [`LinkedChunk`].
24///
25/// To retrieve the updates, use [`LinkedChunk::updates`].
26///
27/// These updates are useful to store a `LinkedChunk` in another form of
28/// storage, like a database or something similar.
29///
30/// [`LinkedChunk`]: super::LinkedChunk
31/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
32#[derive(Debug, Clone, PartialEq)]
33pub enum Update<Item, Gap> {
34    /// A new chunk of kind Items has been created.
35    NewItemsChunk {
36        /// The identifier of the previous chunk of this new chunk.
37        previous: Option<ChunkIdentifier>,
38
39        /// The identifier of the new chunk.
40        new: ChunkIdentifier,
41
42        /// The identifier of the next chunk of this new chunk.
43        next: Option<ChunkIdentifier>,
44    },
45
46    /// A new chunk of kind Gap has been created.
47    NewGapChunk {
48        /// The identifier of the previous chunk of this new chunk.
49        previous: Option<ChunkIdentifier>,
50
51        /// The identifier of the new chunk.
52        new: ChunkIdentifier,
53
54        /// The identifier of the next chunk of this new chunk.
55        next: Option<ChunkIdentifier>,
56
57        /// The content of the chunk.
58        gap: Gap,
59    },
60
61    /// A chunk has been removed.
62    RemoveChunk(ChunkIdentifier),
63
64    /// Items are pushed inside a chunk of kind Items.
65    PushItems {
66        /// The [`Position`] of the items.
67        ///
68        /// This value is given to prevent the need for position computations by
69        /// the update readers. Items are pushed, so the positions should be
70        /// incrementally computed from the previous items, which requires the
71        /// reading of the last previous item. With `at`, the update readers no
72        /// longer need to do so.
73        at: Position,
74
75        /// The items.
76        items: Vec<Item>,
77    },
78
79    /// An item has been replaced in the linked chunk.
80    ///
81    /// The `at` position MUST resolve to the actual position an existing *item*
82    /// (not a gap).
83    ReplaceItem {
84        /// The position of the item that's being replaced.
85        at: Position,
86
87        /// The new value for the item.
88        item: Item,
89    },
90
91    /// An item has been removed inside a chunk of kind Items.
92    RemoveItem {
93        /// The [`Position`] of the item.
94        at: Position,
95    },
96
97    /// The last items of a chunk have been detached, i.e. the chunk has been
98    /// truncated.
99    DetachLastItems {
100        /// The split position. Before this position (`..position`), items are
101        /// kept, from this position (`position..`), items are
102        /// detached.
103        at: Position,
104    },
105
106    /// Detached items (see [`Self::DetachLastItems`]) starts being reattached.
107    StartReattachItems,
108
109    /// Reattaching items (see [`Self::StartReattachItems`]) is finished.
110    EndReattachItems,
111
112    /// All chunks have been cleared, i.e. all items and all gaps have been
113    /// dropped.
114    Clear,
115}
116
117impl<Item, Gap> Update<Item, Gap> {
118    /// Get the items from the [`Update`] if any.
119    ///
120    /// This function is useful if you only care about the items from the
121    /// [`Update`] and not what kind of update it was and where the items
122    /// should be placed.
123    ///
124    /// [`Update`] variants which don't contain any items will return an empty
125    /// [`Vec`].
126    pub fn into_items(self) -> Vec<Item> {
127        match self {
128            Update::NewItemsChunk { .. }
129            | Update::NewGapChunk { .. }
130            | Update::RemoveChunk(_)
131            | Update::RemoveItem { .. }
132            | Update::DetachLastItems { .. }
133            | Update::StartReattachItems
134            | Update::EndReattachItems
135            | Update::Clear => vec![],
136            Update::PushItems { items, .. } => items,
137            Update::ReplaceItem { item, .. } => vec![item],
138        }
139    }
140}
141
142/// A collection of [`Update`]s that can be observed.
143///
144/// Get a value for this type with [`LinkedChunk::updates`].
145///
146/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
147#[derive(Debug)]
148pub struct ObservableUpdates<Item, Gap> {
149    pub(super) inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
150}
151
152impl<Item, Gap> ObservableUpdates<Item, Gap> {
153    /// Create a new [`ObservableUpdates`].
154    pub(super) fn new() -> Self {
155        Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
156    }
157
158    /// Push a new update.
159    pub(super) fn push(&mut self, update: Update<Item, Gap>) {
160        self.inner.write().unwrap().push(update);
161    }
162
163    /// Clear all pending updates.
164    pub(super) fn clear_pending(&mut self) {
165        self.inner.write().unwrap().clear_pending();
166    }
167
168    /// Take new updates.
169    ///
170    /// Updates that have been taken will not be read again.
171    pub fn take(&mut self) -> Vec<Update<Item, Gap>>
172    where
173        Item: Clone,
174        Gap: Clone,
175    {
176        self.inner.write().unwrap().take().to_owned()
177    }
178
179    /// Subscribe to updates by using a [`Stream`].
180    #[cfg(test)]
181    pub(super) fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
182        // A subscriber is a new update reader, it needs its own token.
183        let token = self.new_reader_token();
184
185        UpdatesSubscriber::new(Arc::downgrade(&self.inner), token)
186    }
187
188    /// Generate a new [`ReaderToken`].
189    pub(super) fn new_reader_token(&mut self) -> ReaderToken {
190        let mut inner = self.inner.write().unwrap();
191
192        // Add 1 before reading the `last_token`, in this particular order, because the
193        // 0 token is reserved by `MAIN_READER_TOKEN`.
194        inner.last_token += 1;
195        let last_token = inner.last_token;
196
197        inner.last_index_per_reader.insert(last_token, 0);
198
199        last_token
200    }
201}
202
203/// A token used to represent readers that read the updates in
204/// [`UpdatesInner`].
205pub(super) type ReaderToken = usize;
206
207/// Inner type for [`ObservableUpdates`].
208///
209/// The particularity of this type is that multiple readers can read the
210/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
211/// [`ObservableUpdates`]) is considered to be the _main reader_ (it has the
212/// token [`Self::MAIN_READER_TOKEN`]).
213///
214/// An update that have been read by all readers are garbage collected to be
215/// removed from the memory. An update will never be read twice by the same
216/// reader.
217///
218/// Why do we need multiple readers? The public API reads the updates with
219/// [`ObservableUpdates::take`], but the private API must also read the updates
220/// for example with [`UpdatesSubscriber`]. Of course, they can be multiple
221/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple
222/// readers.
223#[derive(Debug)]
224pub(super) struct UpdatesInner<Item, Gap> {
225    /// All the updates that have not been read by all readers.
226    updates: Vec<Update<Item, Gap>>,
227
228    /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
229    /// A reader is identified by a [`ReaderToken`].
230    ///
231    /// To each reader token is associated an index that represents the index of
232    /// the last reading. It is used to never return the same update twice.
233    last_index_per_reader: HashMap<ReaderToken, usize>,
234
235    /// The last generated token. This is useful to generate new token.
236    last_token: ReaderToken,
237
238    /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed
239    /// everytime it is called.
240    wakers: Vec<Waker>,
241}
242
243impl<Item, Gap> UpdatesInner<Item, Gap> {
244    /// The token used by the main reader. See [`Self::take`] to learn more.
245    const MAIN_READER_TOKEN: ReaderToken = 0;
246
247    /// Create a new [`Self`].
248    fn new() -> Self {
249        Self {
250            updates: Vec::with_capacity(8),
251            last_index_per_reader: {
252                let mut map = HashMap::with_capacity(2);
253                map.insert(Self::MAIN_READER_TOKEN, 0);
254
255                map
256            },
257            last_token: Self::MAIN_READER_TOKEN,
258            wakers: Vec::with_capacity(2),
259        }
260    }
261
262    /// Push a new update.
263    fn push(&mut self, update: Update<Item, Gap>) {
264        self.updates.push(update);
265
266        // Wake them up \o/.
267        for waker in self.wakers.drain(..) {
268            waker.wake();
269        }
270    }
271
272    /// Clear all pending updates.
273    fn clear_pending(&mut self) {
274        self.updates.clear();
275
276        // Reset all the per-reader indices.
277        for idx in self.last_index_per_reader.values_mut() {
278            *idx = 0;
279        }
280
281        // No need to wake the wakers; they're waiting for a new update, and we
282        // just made them all disappear.
283    }
284
285    /// Take new updates; it considers the caller is the main reader, i.e. it
286    /// will use the [`Self::MAIN_READER_TOKEN`].
287    ///
288    /// Updates that have been read will never be read again by the current
289    /// reader.
290    ///
291    /// Learn more by reading [`Self::take_with_token`].
292    fn take(&mut self) -> &[Update<Item, Gap>] {
293        self.take_with_token(Self::MAIN_READER_TOKEN)
294    }
295
296    /// Take new updates with a particular reader token.
297    ///
298    /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
299    /// A reader is identified by a [`ReaderToken`]. Every reader can
300    /// take/read/consume each update only once. An internal index is stored
301    /// per reader token to know where to start reading updates next time this
302    /// method is called.
303    pub(super) fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
304        // Let's garbage collect unused updates.
305        self.garbage_collect();
306
307        let index = self
308            .last_index_per_reader
309            .get_mut(&token)
310            .expect("Given `UpdatesToken` does not map to any index");
311
312        // Read new updates, and update the index.
313        let slice = &self.updates[*index..];
314        *index = self.updates.len();
315
316        slice
317    }
318
319    /// Has the given reader, identified by its [`ReaderToken`], some pending
320    /// updates, or has it consumed all the pending updates?
321    pub(super) fn is_reader_up_to_date(&self, token: ReaderToken) -> bool {
322        *self.last_index_per_reader.get(&token).expect("unknown reader token") == self.updates.len()
323    }
324
325    /// Return the number of updates in the buffer.
326    #[cfg(test)]
327    fn len(&self) -> usize {
328        self.updates.len()
329    }
330
331    /// Garbage collect unused updates. An update is considered unused when it's
332    /// been read by all readers.
333    ///
334    /// Basically, it reduces to finding the smallest last index for all
335    /// readers, and clear from 0 to that index.
336    fn garbage_collect(&mut self) {
337        let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
338
339        if min_index > 0 {
340            let _ = self.updates.drain(0..min_index);
341
342            // Let's shift the indices to the left by `min_index` to preserve them.
343            for index in self.last_index_per_reader.values_mut() {
344                *index -= min_index;
345            }
346        }
347    }
348}
349
350/// A subscriber to [`ObservableUpdates`]. It is helpful to receive updates via
351/// a [`Stream`].
352#[cfg(test)]
353pub(super) struct UpdatesSubscriber<Item, Gap> {
354    /// Weak reference to [`UpdatesInner`].
355    ///
356    /// Using a weak reference allows [`ObservableUpdates`] to be dropped
357    /// freely even if a subscriber exists.
358    updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>,
359
360    /// The token to read the updates.
361    token: ReaderToken,
362}
363
364#[cfg(test)]
365impl<Item, Gap> UpdatesSubscriber<Item, Gap> {
366    /// Create a new [`Self`].
367    #[cfg(test)]
368    fn new(updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>, token: ReaderToken) -> Self {
369        Self { updates, token }
370    }
371}
372
373#[cfg(test)]
374impl<Item, Gap> futures_core::Stream for UpdatesSubscriber<Item, Gap>
375where
376    Item: Clone,
377    Gap: Clone,
378{
379    type Item = Vec<Update<Item, Gap>>;
380
381    fn poll_next(
382        self: std::pin::Pin<&mut Self>,
383        context: &mut std::task::Context<'_>,
384    ) -> std::task::Poll<Option<Self::Item>> {
385        let Some(updates) = self.updates.upgrade() else {
386            // The `ObservableUpdates` has been dropped. It's time to close this stream.
387            return std::task::Poll::Ready(None);
388        };
389
390        let mut updates = updates.write().unwrap();
391        let the_updates = updates.take_with_token(self.token);
392
393        // No updates.
394        if the_updates.is_empty() {
395            // Let's register the waker.
396            updates.wakers.push(context.waker().clone());
397
398            // The stream is pending.
399            return std::task::Poll::Pending;
400        }
401
402        // There is updates! Let's forward them in this stream.
403        std::task::Poll::Ready(Some(the_updates.to_owned()))
404    }
405}
406
407#[cfg(test)]
408impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
409    fn drop(&mut self) {
410        // Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
411        // This is important so that the garbage collector can do its jobs correctly
412        // without a dead dangling reader token.
413        if let Some(updates) = self.updates.upgrade() {
414            let mut updates = updates.write().unwrap();
415
416            // Remove the reader token from `UpdatesInner`.
417            // It's safe to ignore the result of `remove` here: `None` means the token was
418            // already removed (note: it should be unreachable).
419            let _ = updates.last_index_per_reader.remove(&self.token);
420        }
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use std::{
427        sync::{Arc, Mutex},
428        task::{Context, Poll, Wake},
429    };
430
431    use assert_matches::assert_matches;
432    use futures_core::Stream;
433    use futures_util::pin_mut;
434
435    use super::{super::LinkedChunk, ChunkIdentifier, Position, UpdatesInner};
436    use crate::linked_chunk::Update;
437
438    #[test]
439    fn test_updates_take_and_garbage_collector() {
440        use super::Update::*;
441
442        let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
443
444        // Simulate another updates “reader”, it can a subscriber.
445        let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
446        let other_token = {
447            let updates = linked_chunk.updates().unwrap();
448            let mut inner = updates.inner.write().unwrap();
449            inner.last_token += 1;
450
451            let other_token = inner.last_token;
452            inner.last_index_per_reader.insert(other_token, 0);
453
454            other_token
455        };
456
457        // There is an initial update.
458        {
459            let updates = linked_chunk.updates().unwrap();
460
461            assert_eq!(
462                updates.take(),
463                &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
464            );
465            assert_eq!(
466                updates.inner.write().unwrap().take_with_token(other_token),
467                &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
468            );
469        }
470
471        // No new update.
472        {
473            let updates = linked_chunk.updates().unwrap();
474
475            assert!(updates.take().is_empty());
476            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
477        }
478
479        linked_chunk.push_items_back(['a']);
480        linked_chunk.push_items_back(['b']);
481        linked_chunk.push_items_back(['c']);
482
483        // Scenario 1: “main” takes the new updates, “other” doesn't take the new
484        // updates.
485        //
486        // 0   1   2   3
487        // +---+---+---+
488        // | a | b | c |
489        // +---+---+---+
490        //
491        // “main” will move its index from 0 to 3.
492        // “other” won't move its index.
493        {
494            let updates = linked_chunk.updates().unwrap();
495
496            {
497                // Inspect number of updates in memory.
498                assert_eq!(updates.inner.read().unwrap().len(), 3);
499            }
500
501            assert_eq!(
502                updates.take(),
503                &[
504                    PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
505                    PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
506                    PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
507                ]
508            );
509
510            {
511                let inner = updates.inner.read().unwrap();
512
513                // Inspect number of updates in memory.
514                // It must be the same number as before as the garbage collector weren't not
515                // able to remove any unused updates.
516                assert_eq!(inner.len(), 3);
517
518                // Inspect the indices.
519                let indices = &inner.last_index_per_reader;
520
521                assert_eq!(indices.get(&main_token), Some(&3));
522                assert_eq!(indices.get(&other_token), Some(&0));
523            }
524        }
525
526        linked_chunk.push_items_back(['d']);
527        linked_chunk.push_items_back(['e']);
528        linked_chunk.push_items_back(['f']);
529
530        // Scenario 2: “other“ takes the new updates, “main” doesn't take the
531        // new updates.
532        //
533        // 0   1   2   3   4   5   6
534        // +---+---+---+---+---+---+
535        // | a | b | c | d | e | f |
536        // +---+---+---+---+---+---+
537        //
538        // “main” won't move its index.
539        // “other” will move its index from 0 to 6.
540        {
541            let updates = linked_chunk.updates().unwrap();
542
543            assert_eq!(
544                updates.inner.write().unwrap().take_with_token(other_token),
545                &[
546                    PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
547                    PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
548                    PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
549                    PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
550                    PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
551                    PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
552                ]
553            );
554
555            {
556                let inner = updates.inner.read().unwrap();
557
558                // Inspect number of updates in memory.
559                // It must be the same number as before as the garbage collector will be able to
560                // remove unused updates but at the next call…
561                assert_eq!(inner.len(), 6);
562
563                // Inspect the indices.
564                let indices = &inner.last_index_per_reader;
565
566                assert_eq!(indices.get(&main_token), Some(&3));
567                assert_eq!(indices.get(&other_token), Some(&6));
568            }
569        }
570
571        // Scenario 3: “other” take new updates, but there is none, “main”
572        // doesn't take new updates. The garbage collector will run and collect
573        // unused updates.
574        //
575        // 0   1   2   3
576        // +---+---+---+
577        // | d | e | f |
578        // +---+---+---+
579        //
580        // “main” will have its index updated from 3 to 0.
581        // “other” will have its index updated from 6 to 3.
582        {
583            let updates = linked_chunk.updates().unwrap();
584
585            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
586
587            {
588                let inner = updates.inner.read().unwrap();
589
590                // Inspect number of updates in memory.
591                // The garbage collector has removed unused updates.
592                assert_eq!(inner.len(), 3);
593
594                // Inspect the indices. They must have been adjusted.
595                let indices = &inner.last_index_per_reader;
596
597                assert_eq!(indices.get(&main_token), Some(&0));
598                assert_eq!(indices.get(&other_token), Some(&3));
599            }
600        }
601
602        linked_chunk.push_items_back(['g']);
603        linked_chunk.push_items_back(['h']);
604        linked_chunk.push_items_back(['i']);
605
606        // Scenario 4: both “main” and “other” take the new updates.
607        //
608        // 0   1   2   3   4   5   6
609        // +---+---+---+---+---+---+
610        // | d | e | f | g | h | i |
611        // +---+---+---+---+---+---+
612        //
613        // “main” will have its index updated from 0 to 3.
614        // “other” will have its index updated from 6 to 3.
615        {
616            let updates = linked_chunk.updates().unwrap();
617
618            assert_eq!(
619                updates.take(),
620                &[
621                    PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
622                    PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
623                    PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
624                    PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
625                    PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
626                    PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
627                ]
628            );
629            assert_eq!(
630                updates.inner.write().unwrap().take_with_token(other_token),
631                &[
632                    PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
633                    PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
634                    PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
635                ]
636            );
637
638            {
639                let inner = updates.inner.read().unwrap();
640
641                // Inspect number of updates in memory.
642                // The garbage collector had a chance to collect the first 3 updates.
643                assert_eq!(inner.len(), 3);
644
645                // Inspect the indices.
646                let indices = &inner.last_index_per_reader;
647
648                assert_eq!(indices.get(&main_token), Some(&3));
649                assert_eq!(indices.get(&other_token), Some(&3));
650            }
651        }
652
653        // Scenario 5: no more updates but they both try to take new updates.
654        // The garbage collector will collect all updates as all of them as
655        // been read already.
656        //
657        // “main” will have its index updated from 0 to 0.
658        // “other” will have its index updated from 3 to 0.
659        {
660            let updates = linked_chunk.updates().unwrap();
661
662            assert!(updates.take().is_empty());
663            assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
664
665            {
666                let inner = updates.inner.read().unwrap();
667
668                // Inspect number of updates in memory.
669                // The garbage collector had a chance to collect all updates.
670                assert_eq!(inner.len(), 0);
671
672                // Inspect the indices.
673                let indices = &inner.last_index_per_reader;
674
675                assert_eq!(indices.get(&main_token), Some(&0));
676                assert_eq!(indices.get(&other_token), Some(&0));
677            }
678        }
679    }
680
681    struct CounterWaker {
682        number_of_wakeup: Mutex<usize>,
683    }
684
685    impl Wake for CounterWaker {
686        fn wake(self: Arc<Self>) {
687            *self.number_of_wakeup.lock().unwrap() += 1;
688        }
689    }
690
691    #[test]
692    fn test_updates_stream() {
693        use super::Update::*;
694
695        let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
696        let waker = counter_waker.clone().into();
697        let mut context = Context::from_waker(&waker);
698
699        let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
700
701        let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
702        pin_mut!(updates_subscriber);
703
704        // Initial update, stream is ready.
705        assert_matches!(
706            updates_subscriber.as_mut().poll_next(&mut context),
707            Poll::Ready(Some(items)) => {
708                assert_eq!(
709                    items,
710                    &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
711                );
712            }
713        );
714        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
715        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
716
717        // Let's generate an update.
718        linked_chunk.push_items_back(['a']);
719
720        // The waker must have been called.
721        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
722
723        // There is an update! Right after that, the stream is pending again.
724        assert_matches!(
725            updates_subscriber.as_mut().poll_next(&mut context),
726            Poll::Ready(Some(items)) => {
727                assert_eq!(
728                    items,
729                    &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
730                );
731            }
732        );
733        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
734
735        // Let's generate two other updates.
736        linked_chunk.push_items_back(['b']);
737        linked_chunk.push_items_back(['c']);
738
739        // The waker must have been called only once for the two updates.
740        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
741
742        // We can consume the updates without the stream, but the stream continues to
743        // know it has updates.
744        assert_eq!(
745            linked_chunk.updates().unwrap().take(),
746            &[
747                NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None },
748                PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
749                PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
750                PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
751            ]
752        );
753        assert_matches!(
754            updates_subscriber.as_mut().poll_next(&mut context),
755            Poll::Ready(Some(items)) => {
756                assert_eq!(
757                    items,
758                    &[
759                        PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
760                        PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
761                    ]
762                );
763            }
764        );
765        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
766
767        // When dropping the `LinkedChunk`, it closes the stream.
768        drop(linked_chunk);
769        assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
770
771        // Wakers calls have not changed.
772        assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
773    }
774
775    #[test]
776    fn test_updates_multiple_streams() {
777        use super::Update::*;
778
779        let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
780        let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
781
782        let waker1 = counter_waker1.clone().into();
783        let waker2 = counter_waker2.clone().into();
784
785        let mut context1 = Context::from_waker(&waker1);
786        let mut context2 = Context::from_waker(&waker2);
787
788        let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
789
790        let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
791        pin_mut!(updates_subscriber1);
792
793        // Scope for `updates_subscriber2`.
794        let updates_subscriber2_token = {
795            let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
796            pin_mut!(updates_subscriber2);
797
798            // Initial updates, streams are ready.
799            assert_matches!(
800                updates_subscriber1.as_mut().poll_next(&mut context1),
801                Poll::Ready(Some(items)) => {
802                    assert_eq!(
803                        items,
804                        &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
805                    );
806                }
807            );
808            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
809            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
810
811            assert_matches!(
812                updates_subscriber2.as_mut().poll_next(&mut context2),
813                Poll::Ready(Some(items)) => {
814                    assert_eq!(
815                        items,
816                        &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
817                    );
818                }
819            );
820            assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
821            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);
822
823            // Let's generate an update.
824            linked_chunk.push_items_back(['a']);
825
826            // The wakers must have been called.
827            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
828            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);
829
830            // There is an update! Right after that, the streams are pending again.
831            assert_matches!(
832                updates_subscriber1.as_mut().poll_next(&mut context1),
833                Poll::Ready(Some(items)) => {
834                    assert_eq!(
835                        items,
836                        &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
837                    );
838                }
839            );
840            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
841            assert_matches!(
842                updates_subscriber2.as_mut().poll_next(&mut context2),
843                Poll::Ready(Some(items)) => {
844                    assert_eq!(
845                        items,
846                        &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
847                    );
848                }
849            );
850            assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
851
852            // Let's generate two other updates.
853            linked_chunk.push_items_back(['b']);
854            linked_chunk.push_items_back(['c']);
855
856            // A waker is consumed when called. The first call to `push_items_back` will
857            // call and consume the wakers. The second call to `push_items_back` will do
858            // nothing as the wakers have been consumed. New wakers will be registered on
859            // polling.
860            //
861            // So, the waker must have been called only once for the two updates.
862            assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
863            assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);
864
865            // Let's poll `updates_subscriber1` only.
866            assert_matches!(
867                updates_subscriber1.as_mut().poll_next(&mut context1),
868                Poll::Ready(Some(items)) => {
869                    assert_eq!(
870                        items,
871                        &[
872                            PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
873                            PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
874                        ]
875                    );
876                }
877            );
878            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
879
880            // For the sake of this test, we also need to advance the main reader token.
881            let _ = linked_chunk.updates().unwrap().take();
882            let _ = linked_chunk.updates().unwrap().take();
883
884            // If we inspect the garbage collector state, `a`, `b` and `c` should still be
885            // present because not all of them have been consumed by `updates_subscriber2`
886            // yet.
887            {
888                let updates = linked_chunk.updates().unwrap();
889
890                let inner = updates.inner.read().unwrap();
891
892                // Inspect number of updates in memory.
893                // We get 2 because the garbage collector runs before data are taken, not after:
894                // `updates_subscriber2` has read `a` only, so `b` and `c` remain.
895                assert_eq!(inner.len(), 2);
896
897                // Inspect the indices.
898                let indices = &inner.last_index_per_reader;
899
900                assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
901                assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
902            }
903
904            // Poll `updates_subscriber1` again: there is no new update so it must be
905            // pending.
906            assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
907
908            // The state of the garbage collector is unchanged: `a`, `b` and `c` are still
909            // in memory.
910            {
911                let updates = linked_chunk.updates().unwrap();
912
913                let inner = updates.inner.read().unwrap();
914
915                // Inspect number of updates in memory. Value is unchanged.
916                assert_eq!(inner.len(), 2);
917
918                // Inspect the indices. They are unchanged.
919                let indices = &inner.last_index_per_reader;
920
921                assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
922                assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
923            }
924
925            updates_subscriber2.token
926            // Drop `updates_subscriber2`!
927        };
928
929        // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
930        // still no new update, but it will run the garbage collector again, and this
931        // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
932        // collector must be empty.
933        assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
934
935        // Inspect the garbage collector.
936        {
937            let updates = linked_chunk.updates().unwrap();
938
939            let inner = updates.inner.read().unwrap();
940
941            // Inspect number of updates in memory.
942            assert_eq!(inner.len(), 0);
943
944            // Inspect the indices.
945            let indices = &inner.last_index_per_reader;
946
947            assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
948            assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown!
949        }
950
951        // When dropping the `LinkedChunk`, it closes the stream.
952        drop(linked_chunk);
953        assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
954    }
955
956    #[test]
957    fn test_update_into_items() {
958        let updates: Update<_, u32> =
959            Update::PushItems { at: Position::new(ChunkIdentifier(0), 0), items: vec![1, 2, 3] };
960
961        assert_eq!(updates.into_items(), vec![1, 2, 3]);
962
963        let updates: Update<u32, u32> = Update::Clear;
964        assert!(updates.into_items().is_empty());
965
966        let updates: Update<u32, u32> =
967            Update::RemoveItem { at: Position::new(ChunkIdentifier(0), 0) };
968        assert!(updates.into_items().is_empty());
969
970        let updates: Update<u32, u32> =
971            Update::ReplaceItem { at: Position::new(ChunkIdentifier(0), 0), item: 42 };
972        assert_eq!(updates.into_items(), vec![42]);
973    }
974}