matrix_sdk_common/linked_chunk/updates.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16 collections::HashMap,
17 pin::Pin,
18 sync::{Arc, RwLock, Weak},
19 task::{Context, Poll, Waker},
20};
21
22use futures_core::Stream;
23
24use super::{ChunkIdentifier, Position};
25
26/// Represent the updates that have happened inside a [`LinkedChunk`].
27///
28/// To retrieve the updates, use [`LinkedChunk::updates`].
29///
30/// These updates are useful to store a `LinkedChunk` in another form of
31/// storage, like a database or something similar.
32///
33/// [`LinkedChunk`]: super::LinkedChunk
34/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
35#[derive(Debug, Clone, PartialEq)]
36pub enum Update<Item, Gap> {
37 /// A new chunk of kind Items has been created.
38 NewItemsChunk {
39 /// The identifier of the previous chunk of this new chunk.
40 previous: Option<ChunkIdentifier>,
41
42 /// The identifier of the new chunk.
43 new: ChunkIdentifier,
44
45 /// The identifier of the next chunk of this new chunk.
46 next: Option<ChunkIdentifier>,
47 },
48
49 /// A new chunk of kind Gap has been created.
50 NewGapChunk {
51 /// The identifier of the previous chunk of this new chunk.
52 previous: Option<ChunkIdentifier>,
53
54 /// The identifier of the new chunk.
55 new: ChunkIdentifier,
56
57 /// The identifier of the next chunk of this new chunk.
58 next: Option<ChunkIdentifier>,
59
60 /// The content of the chunk.
61 gap: Gap,
62 },
63
64 /// A chunk has been removed.
65 RemoveChunk(ChunkIdentifier),
66
67 /// Items are pushed inside a chunk of kind Items.
68 PushItems {
69 /// The [`Position`] of the items.
70 ///
71 /// This value is given to prevent the need for position computations by
72 /// the update readers. Items are pushed, so the positions should be
73 /// incrementally computed from the previous items, which requires the
74 /// reading of the last previous item. With `at`, the update readers no
75 /// longer need to do so.
76 at: Position,
77
78 /// The items.
79 items: Vec<Item>,
80 },
81
82 /// An item has been replaced in the linked chunk.
83 ///
84 /// The `at` position MUST resolve to the actual position an existing *item*
85 /// (not a gap).
86 ReplaceItem {
87 /// The position of the item that's being replaced.
88 at: Position,
89
90 /// The new value for the item.
91 item: Item,
92 },
93
94 /// An item has been removed inside a chunk of kind Items.
95 RemoveItem {
96 /// The [`Position`] of the item.
97 at: Position,
98 },
99
100 /// The last items of a chunk have been detached, i.e. the chunk has been
101 /// truncated.
102 DetachLastItems {
103 /// The split position. Before this position (`..position`), items are
104 /// kept, from this position (`position..`), items are
105 /// detached.
106 at: Position,
107 },
108
109 /// Detached items (see [`Self::DetachLastItems`]) starts being reattached.
110 StartReattachItems,
111
112 /// Reattaching items (see [`Self::StartReattachItems`]) is finished.
113 EndReattachItems,
114
115 /// All chunks have been cleared, i.e. all items and all gaps have been
116 /// dropped.
117 Clear,
118}
119
120/// A collection of [`Update`]s that can be observed.
121///
122/// Get a value for this type with [`LinkedChunk::updates`].
123///
124/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
125#[derive(Debug)]
126pub struct ObservableUpdates<Item, Gap> {
127 pub(super) inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
128}
129
130impl<Item, Gap> ObservableUpdates<Item, Gap> {
131 /// Create a new [`ObservableUpdates`].
132 pub(super) fn new() -> Self {
133 Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
134 }
135
136 /// Push a new update.
137 pub(super) fn push(&mut self, update: Update<Item, Gap>) {
138 self.inner.write().unwrap().push(update);
139 }
140
141 /// Clear all pending updates.
142 pub(super) fn clear_pending(&mut self) {
143 self.inner.write().unwrap().clear_pending();
144 }
145
146 /// Take new updates.
147 ///
148 /// Updates that have been taken will not be read again.
149 pub fn take(&mut self) -> Vec<Update<Item, Gap>>
150 where
151 Item: Clone,
152 Gap: Clone,
153 {
154 self.inner.write().unwrap().take().to_owned()
155 }
156
157 /// Subscribe to updates by using a [`Stream`].
158 #[cfg(test)]
159 pub(super) fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
160 // A subscriber is a new update reader, it needs its own token.
161 let token = self.new_reader_token();
162
163 UpdatesSubscriber::new(Arc::downgrade(&self.inner), token)
164 }
165
166 /// Generate a new [`ReaderToken`].
167 pub(super) fn new_reader_token(&mut self) -> ReaderToken {
168 let mut inner = self.inner.write().unwrap();
169
170 // Add 1 before reading the `last_token`, in this particular order, because the
171 // 0 token is reserved by `MAIN_READER_TOKEN`.
172 inner.last_token += 1;
173 let last_token = inner.last_token;
174
175 inner.last_index_per_reader.insert(last_token, 0);
176
177 last_token
178 }
179}
180
181/// A token used to represent readers that read the updates in
182/// [`UpdatesInner`].
183pub(super) type ReaderToken = usize;
184
185/// Inner type for [`ObservableUpdates`].
186///
187/// The particularity of this type is that multiple readers can read the
188/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
189/// [`ObservableUpdates`]) is considered to be the _main reader_ (it has the
190/// token [`Self::MAIN_READER_TOKEN`]).
191///
192/// An update that have been read by all readers are garbage collected to be
193/// removed from the memory. An update will never be read twice by the same
194/// reader.
195///
196/// Why do we need multiple readers? The public API reads the updates with
197/// [`ObservableUpdates::take`], but the private API must also read the updates
198/// for example with [`UpdatesSubscriber`]. Of course, they can be multiple
199/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple
200/// readers.
201#[derive(Debug)]
202pub(super) struct UpdatesInner<Item, Gap> {
203 /// All the updates that have not been read by all readers.
204 updates: Vec<Update<Item, Gap>>,
205
206 /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
207 /// A reader is identified by a [`ReaderToken`].
208 ///
209 /// To each reader token is associated an index that represents the index of
210 /// the last reading. It is used to never return the same update twice.
211 last_index_per_reader: HashMap<ReaderToken, usize>,
212
213 /// The last generated token. This is useful to generate new token.
214 last_token: ReaderToken,
215
216 /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed
217 /// everytime it is called.
218 wakers: Vec<Waker>,
219}
220
221impl<Item, Gap> UpdatesInner<Item, Gap> {
222 /// The token used by the main reader. See [`Self::take`] to learn more.
223 const MAIN_READER_TOKEN: ReaderToken = 0;
224
225 /// Create a new [`Self`].
226 fn new() -> Self {
227 Self {
228 updates: Vec::with_capacity(8),
229 last_index_per_reader: {
230 let mut map = HashMap::with_capacity(2);
231 map.insert(Self::MAIN_READER_TOKEN, 0);
232
233 map
234 },
235 last_token: Self::MAIN_READER_TOKEN,
236 wakers: Vec::with_capacity(2),
237 }
238 }
239
240 /// Push a new update.
241 fn push(&mut self, update: Update<Item, Gap>) {
242 self.updates.push(update);
243
244 // Wake them up \o/.
245 for waker in self.wakers.drain(..) {
246 waker.wake();
247 }
248 }
249
250 /// Clear all pending updates.
251 fn clear_pending(&mut self) {
252 self.updates.clear();
253
254 // Reset all the per-reader indices.
255 for idx in self.last_index_per_reader.values_mut() {
256 *idx = 0;
257 }
258
259 // No need to wake the wakers; they're waiting for a new update, and we
260 // just made them all disappear.
261 }
262
263 /// Take new updates; it considers the caller is the main reader, i.e. it
264 /// will use the [`Self::MAIN_READER_TOKEN`].
265 ///
266 /// Updates that have been read will never be read again by the current
267 /// reader.
268 ///
269 /// Learn more by reading [`Self::take_with_token`].
270 fn take(&mut self) -> &[Update<Item, Gap>] {
271 self.take_with_token(Self::MAIN_READER_TOKEN)
272 }
273
274 /// Take new updates with a particular reader token.
275 ///
276 /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
277 /// A reader is identified by a [`ReaderToken`]. Every reader can
278 /// take/read/consume each update only once. An internal index is stored
279 /// per reader token to know where to start reading updates next time this
280 /// method is called.
281 pub(super) fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
282 // Let's garbage collect unused updates.
283 self.garbage_collect();
284
285 let index = self
286 .last_index_per_reader
287 .get_mut(&token)
288 .expect("Given `UpdatesToken` does not map to any index");
289
290 // Read new updates, and update the index.
291 let slice = &self.updates[*index..];
292 *index = self.updates.len();
293
294 slice
295 }
296
297 /// Return the number of updates in the buffer.
298 #[cfg(test)]
299 fn len(&self) -> usize {
300 self.updates.len()
301 }
302
303 /// Garbage collect unused updates. An update is considered unused when it's
304 /// been read by all readers.
305 ///
306 /// Basically, it reduces to finding the smallest last index for all
307 /// readers, and clear from 0 to that index.
308 fn garbage_collect(&mut self) {
309 let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
310
311 if min_index > 0 {
312 let _ = self.updates.drain(0..min_index);
313
314 // Let's shift the indices to the left by `min_index` to preserve them.
315 for index in self.last_index_per_reader.values_mut() {
316 *index -= min_index;
317 }
318 }
319 }
320}
321
322/// A subscriber to [`ObservableUpdates`]. It is helpful to receive updates via
323/// a [`Stream`].
324pub(super) struct UpdatesSubscriber<Item, Gap> {
325 /// Weak reference to [`UpdatesInner`].
326 ///
327 /// Using a weak reference allows [`ObservableUpdates`] to be dropped
328 /// freely even if a subscriber exists.
329 updates: Weak<RwLock<UpdatesInner<Item, Gap>>>,
330
331 /// The token to read the updates.
332 token: ReaderToken,
333}
334
335impl<Item, Gap> UpdatesSubscriber<Item, Gap> {
336 /// Create a new [`Self`].
337 #[cfg(test)]
338 fn new(updates: Weak<RwLock<UpdatesInner<Item, Gap>>>, token: ReaderToken) -> Self {
339 Self { updates, token }
340 }
341}
342
343impl<Item, Gap> Stream for UpdatesSubscriber<Item, Gap>
344where
345 Item: Clone,
346 Gap: Clone,
347{
348 type Item = Vec<Update<Item, Gap>>;
349
350 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
351 let Some(updates) = self.updates.upgrade() else {
352 // The `ObservableUpdates` has been dropped. It's time to close this stream.
353 return Poll::Ready(None);
354 };
355
356 let mut updates = updates.write().unwrap();
357 let the_updates = updates.take_with_token(self.token);
358
359 // No updates.
360 if the_updates.is_empty() {
361 // Let's register the waker.
362 updates.wakers.push(context.waker().clone());
363
364 // The stream is pending.
365 return Poll::Pending;
366 }
367
368 // There is updates! Let's forward them in this stream.
369 Poll::Ready(Some(the_updates.to_owned()))
370 }
371}
372
373impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
374 fn drop(&mut self) {
375 // Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
376 // This is important so that the garbage collector can do its jobs correctly
377 // without a dead dangling reader token.
378 if let Some(updates) = self.updates.upgrade() {
379 let mut updates = updates.write().unwrap();
380
381 // Remove the reader token from `UpdatesInner`.
382 // It's safe to ignore the result of `remove` here: `None` means the token was
383 // already removed (note: it should be unreachable).
384 let _ = updates.last_index_per_reader.remove(&self.token);
385 }
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use std::{
392 sync::{Arc, Mutex},
393 task::{Context, Poll, Wake},
394 };
395
396 use assert_matches::assert_matches;
397 use futures_util::pin_mut;
398
399 use super::{super::LinkedChunk, ChunkIdentifier, Position, Stream, UpdatesInner};
400
401 #[test]
402 fn test_updates_take_and_garbage_collector() {
403 use super::Update::*;
404
405 let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
406
407 // Simulate another updates “reader”, it can a subscriber.
408 let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
409 let other_token = {
410 let updates = linked_chunk.updates().unwrap();
411 let mut inner = updates.inner.write().unwrap();
412 inner.last_token += 1;
413
414 let other_token = inner.last_token;
415 inner.last_index_per_reader.insert(other_token, 0);
416
417 other_token
418 };
419
420 // There is an initial update.
421 {
422 let updates = linked_chunk.updates().unwrap();
423
424 assert_eq!(
425 updates.take(),
426 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
427 );
428 assert_eq!(
429 updates.inner.write().unwrap().take_with_token(other_token),
430 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
431 );
432 }
433
434 // No new update.
435 {
436 let updates = linked_chunk.updates().unwrap();
437
438 assert!(updates.take().is_empty());
439 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
440 }
441
442 linked_chunk.push_items_back(['a']);
443 linked_chunk.push_items_back(['b']);
444 linked_chunk.push_items_back(['c']);
445
446 // Scenario 1: “main” takes the new updates, “other” doesn't take the new
447 // updates.
448 //
449 // 0 1 2 3
450 // +---+---+---+
451 // | a | b | c |
452 // +---+---+---+
453 //
454 // “main” will move its index from 0 to 3.
455 // “other” won't move its index.
456 {
457 let updates = linked_chunk.updates().unwrap();
458
459 {
460 // Inspect number of updates in memory.
461 assert_eq!(updates.inner.read().unwrap().len(), 3);
462 }
463
464 assert_eq!(
465 updates.take(),
466 &[
467 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
468 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
469 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
470 ]
471 );
472
473 {
474 let inner = updates.inner.read().unwrap();
475
476 // Inspect number of updates in memory.
477 // It must be the same number as before as the garbage collector weren't not
478 // able to remove any unused updates.
479 assert_eq!(inner.len(), 3);
480
481 // Inspect the indices.
482 let indices = &inner.last_index_per_reader;
483
484 assert_eq!(indices.get(&main_token), Some(&3));
485 assert_eq!(indices.get(&other_token), Some(&0));
486 }
487 }
488
489 linked_chunk.push_items_back(['d']);
490 linked_chunk.push_items_back(['e']);
491 linked_chunk.push_items_back(['f']);
492
493 // Scenario 2: “other“ takes the new updates, “main” doesn't take the
494 // new updates.
495 //
496 // 0 1 2 3 4 5 6
497 // +---+---+---+---+---+---+
498 // | a | b | c | d | e | f |
499 // +---+---+---+---+---+---+
500 //
501 // “main” won't move its index.
502 // “other” will move its index from 0 to 6.
503 {
504 let updates = linked_chunk.updates().unwrap();
505
506 assert_eq!(
507 updates.inner.write().unwrap().take_with_token(other_token),
508 &[
509 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
510 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
511 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
512 PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
513 PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
514 PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
515 ]
516 );
517
518 {
519 let inner = updates.inner.read().unwrap();
520
521 // Inspect number of updates in memory.
522 // It must be the same number as before as the garbage collector will be able to
523 // remove unused updates but at the next call…
524 assert_eq!(inner.len(), 6);
525
526 // Inspect the indices.
527 let indices = &inner.last_index_per_reader;
528
529 assert_eq!(indices.get(&main_token), Some(&3));
530 assert_eq!(indices.get(&other_token), Some(&6));
531 }
532 }
533
534 // Scenario 3: “other” take new updates, but there is none, “main”
535 // doesn't take new updates. The garbage collector will run and collect
536 // unused updates.
537 //
538 // 0 1 2 3
539 // +---+---+---+
540 // | d | e | f |
541 // +---+---+---+
542 //
543 // “main” will have its index updated from 3 to 0.
544 // “other” will have its index updated from 6 to 3.
545 {
546 let updates = linked_chunk.updates().unwrap();
547
548 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
549
550 {
551 let inner = updates.inner.read().unwrap();
552
553 // Inspect number of updates in memory.
554 // The garbage collector has removed unused updates.
555 assert_eq!(inner.len(), 3);
556
557 // Inspect the indices. They must have been adjusted.
558 let indices = &inner.last_index_per_reader;
559
560 assert_eq!(indices.get(&main_token), Some(&0));
561 assert_eq!(indices.get(&other_token), Some(&3));
562 }
563 }
564
565 linked_chunk.push_items_back(['g']);
566 linked_chunk.push_items_back(['h']);
567 linked_chunk.push_items_back(['i']);
568
569 // Scenario 4: both “main” and “other” take the new updates.
570 //
571 // 0 1 2 3 4 5 6
572 // +---+---+---+---+---+---+
573 // | d | e | f | g | h | i |
574 // +---+---+---+---+---+---+
575 //
576 // “main” will have its index updated from 0 to 3.
577 // “other” will have its index updated from 6 to 3.
578 {
579 let updates = linked_chunk.updates().unwrap();
580
581 assert_eq!(
582 updates.take(),
583 &[
584 PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
585 PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
586 PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
587 PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
588 PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
589 PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
590 ]
591 );
592 assert_eq!(
593 updates.inner.write().unwrap().take_with_token(other_token),
594 &[
595 PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
596 PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
597 PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
598 ]
599 );
600
601 {
602 let inner = updates.inner.read().unwrap();
603
604 // Inspect number of updates in memory.
605 // The garbage collector had a chance to collect the first 3 updates.
606 assert_eq!(inner.len(), 3);
607
608 // Inspect the indices.
609 let indices = &inner.last_index_per_reader;
610
611 assert_eq!(indices.get(&main_token), Some(&3));
612 assert_eq!(indices.get(&other_token), Some(&3));
613 }
614 }
615
616 // Scenario 5: no more updates but they both try to take new updates.
617 // The garbage collector will collect all updates as all of them as
618 // been read already.
619 //
620 // “main” will have its index updated from 0 to 0.
621 // “other” will have its index updated from 3 to 0.
622 {
623 let updates = linked_chunk.updates().unwrap();
624
625 assert!(updates.take().is_empty());
626 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
627
628 {
629 let inner = updates.inner.read().unwrap();
630
631 // Inspect number of updates in memory.
632 // The garbage collector had a chance to collect all updates.
633 assert_eq!(inner.len(), 0);
634
635 // Inspect the indices.
636 let indices = &inner.last_index_per_reader;
637
638 assert_eq!(indices.get(&main_token), Some(&0));
639 assert_eq!(indices.get(&other_token), Some(&0));
640 }
641 }
642 }
643
644 struct CounterWaker {
645 number_of_wakeup: Mutex<usize>,
646 }
647
648 impl Wake for CounterWaker {
649 fn wake(self: Arc<Self>) {
650 *self.number_of_wakeup.lock().unwrap() += 1;
651 }
652 }
653
654 #[test]
655 fn test_updates_stream() {
656 use super::Update::*;
657
658 let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
659 let waker = counter_waker.clone().into();
660 let mut context = Context::from_waker(&waker);
661
662 let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
663
664 let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
665 pin_mut!(updates_subscriber);
666
667 // Initial update, stream is ready.
668 assert_matches!(
669 updates_subscriber.as_mut().poll_next(&mut context),
670 Poll::Ready(Some(items)) => {
671 assert_eq!(
672 items,
673 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
674 );
675 }
676 );
677 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
678 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
679
680 // Let's generate an update.
681 linked_chunk.push_items_back(['a']);
682
683 // The waker must have been called.
684 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
685
686 // There is an update! Right after that, the stream is pending again.
687 assert_matches!(
688 updates_subscriber.as_mut().poll_next(&mut context),
689 Poll::Ready(Some(items)) => {
690 assert_eq!(
691 items,
692 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
693 );
694 }
695 );
696 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
697
698 // Let's generate two other updates.
699 linked_chunk.push_items_back(['b']);
700 linked_chunk.push_items_back(['c']);
701
702 // The waker must have been called only once for the two updates.
703 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
704
705 // We can consume the updates without the stream, but the stream continues to
706 // know it has updates.
707 assert_eq!(
708 linked_chunk.updates().unwrap().take(),
709 &[
710 NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None },
711 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
712 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
713 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
714 ]
715 );
716 assert_matches!(
717 updates_subscriber.as_mut().poll_next(&mut context),
718 Poll::Ready(Some(items)) => {
719 assert_eq!(
720 items,
721 &[
722 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
723 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
724 ]
725 );
726 }
727 );
728 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
729
730 // When dropping the `LinkedChunk`, it closes the stream.
731 drop(linked_chunk);
732 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
733
734 // Wakers calls have not changed.
735 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
736 }
737
738 #[test]
739 fn test_updates_multiple_streams() {
740 use super::Update::*;
741
742 let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
743 let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
744
745 let waker1 = counter_waker1.clone().into();
746 let waker2 = counter_waker2.clone().into();
747
748 let mut context1 = Context::from_waker(&waker1);
749 let mut context2 = Context::from_waker(&waker2);
750
751 let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
752
753 let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
754 pin_mut!(updates_subscriber1);
755
756 // Scope for `updates_subscriber2`.
757 let updates_subscriber2_token = {
758 let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
759 pin_mut!(updates_subscriber2);
760
761 // Initial updates, streams are ready.
762 assert_matches!(
763 updates_subscriber1.as_mut().poll_next(&mut context1),
764 Poll::Ready(Some(items)) => {
765 assert_eq!(
766 items,
767 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
768 );
769 }
770 );
771 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
772 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
773
774 assert_matches!(
775 updates_subscriber2.as_mut().poll_next(&mut context2),
776 Poll::Ready(Some(items)) => {
777 assert_eq!(
778 items,
779 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
780 );
781 }
782 );
783 assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
784 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);
785
786 // Let's generate an update.
787 linked_chunk.push_items_back(['a']);
788
789 // The wakers must have been called.
790 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
791 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);
792
793 // There is an update! Right after that, the streams are pending again.
794 assert_matches!(
795 updates_subscriber1.as_mut().poll_next(&mut context1),
796 Poll::Ready(Some(items)) => {
797 assert_eq!(
798 items,
799 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
800 );
801 }
802 );
803 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
804 assert_matches!(
805 updates_subscriber2.as_mut().poll_next(&mut context2),
806 Poll::Ready(Some(items)) => {
807 assert_eq!(
808 items,
809 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
810 );
811 }
812 );
813 assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
814
815 // Let's generate two other updates.
816 linked_chunk.push_items_back(['b']);
817 linked_chunk.push_items_back(['c']);
818
819 // A waker is consumed when called. The first call to `push_items_back` will
820 // call and consume the wakers. The second call to `push_items_back` will do
821 // nothing as the wakers have been consumed. New wakers will be registered on
822 // polling.
823 //
824 // So, the waker must have been called only once for the two updates.
825 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
826 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);
827
828 // Let's poll `updates_subscriber1` only.
829 assert_matches!(
830 updates_subscriber1.as_mut().poll_next(&mut context1),
831 Poll::Ready(Some(items)) => {
832 assert_eq!(
833 items,
834 &[
835 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
836 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
837 ]
838 );
839 }
840 );
841 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
842
843 // For the sake of this test, we also need to advance the main reader token.
844 let _ = linked_chunk.updates().unwrap().take();
845 let _ = linked_chunk.updates().unwrap().take();
846
847 // If we inspect the garbage collector state, `a`, `b` and `c` should still be
848 // present because not all of them have been consumed by `updates_subscriber2`
849 // yet.
850 {
851 let updates = linked_chunk.updates().unwrap();
852
853 let inner = updates.inner.read().unwrap();
854
855 // Inspect number of updates in memory.
856 // We get 2 because the garbage collector runs before data are taken, not after:
857 // `updates_subscriber2` has read `a` only, so `b` and `c` remain.
858 assert_eq!(inner.len(), 2);
859
860 // Inspect the indices.
861 let indices = &inner.last_index_per_reader;
862
863 assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
864 assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
865 }
866
867 // Poll `updates_subscriber1` again: there is no new update so it must be
868 // pending.
869 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
870
871 // The state of the garbage collector is unchanged: `a`, `b` and `c` are still
872 // in memory.
873 {
874 let updates = linked_chunk.updates().unwrap();
875
876 let inner = updates.inner.read().unwrap();
877
878 // Inspect number of updates in memory. Value is unchanged.
879 assert_eq!(inner.len(), 2);
880
881 // Inspect the indices. They are unchanged.
882 let indices = &inner.last_index_per_reader;
883
884 assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
885 assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
886 }
887
888 updates_subscriber2.token
889 // Drop `updates_subscriber2`!
890 };
891
892 // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
893 // still no new update, but it will run the garbage collector again, and this
894 // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
895 // collector must be empty.
896 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
897
898 // Inspect the garbage collector.
899 {
900 let updates = linked_chunk.updates().unwrap();
901
902 let inner = updates.inner.read().unwrap();
903
904 // Inspect number of updates in memory.
905 assert_eq!(inner.len(), 0);
906
907 // Inspect the indices.
908 let indices = &inner.last_index_per_reader;
909
910 assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
911 assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown!
912 }
913
914 // When dropping the `LinkedChunk`, it closes the stream.
915 drop(linked_chunk);
916 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
917 }
918}