matrix_sdk_common/linked_chunk/updates.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16 collections::HashMap,
17 pin::Pin,
18 sync::{Arc, RwLock, Weak},
19 task::{Context, Poll, Waker},
20};
21
22use futures_core::Stream;
23
24use super::{ChunkIdentifier, Position};
25
26/// Represent the updates that have happened inside a [`LinkedChunk`].
27///
28/// To retrieve the updates, use [`LinkedChunk::updates`].
29///
30/// These updates are useful to store a `LinkedChunk` in another form of
31/// storage, like a database or something similar.
32///
33/// [`LinkedChunk`]: super::LinkedChunk
34/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
35#[derive(Debug, Clone, PartialEq)]
36pub enum Update<Item, Gap> {
37 /// A new chunk of kind Items has been created.
38 NewItemsChunk {
39 /// The identifier of the previous chunk of this new chunk.
40 previous: Option<ChunkIdentifier>,
41
42 /// The identifier of the new chunk.
43 new: ChunkIdentifier,
44
45 /// The identifier of the next chunk of this new chunk.
46 next: Option<ChunkIdentifier>,
47 },
48
49 /// A new chunk of kind Gap has been created.
50 NewGapChunk {
51 /// The identifier of the previous chunk of this new chunk.
52 previous: Option<ChunkIdentifier>,
53
54 /// The identifier of the new chunk.
55 new: ChunkIdentifier,
56
57 /// The identifier of the next chunk of this new chunk.
58 next: Option<ChunkIdentifier>,
59
60 /// The content of the chunk.
61 gap: Gap,
62 },
63
64 /// A chunk has been removed.
65 RemoveChunk(ChunkIdentifier),
66
67 /// Items are pushed inside a chunk of kind Items.
68 PushItems {
69 /// The [`Position`] of the items.
70 ///
71 /// This value is given to prevent the need for position computations by
72 /// the update readers. Items are pushed, so the positions should be
73 /// incrementally computed from the previous items, which requires the
74 /// reading of the last previous item. With `at`, the update readers no
75 /// longer need to do so.
76 at: Position,
77
78 /// The items.
79 items: Vec<Item>,
80 },
81
82 /// An item has been replaced in the linked chunk.
83 ///
84 /// The `at` position MUST resolve to the actual position an existing *item*
85 /// (not a gap).
86 ReplaceItem {
87 /// The position of the item that's being replaced.
88 at: Position,
89
90 /// The new value for the item.
91 item: Item,
92 },
93
94 /// An item has been removed inside a chunk of kind Items.
95 RemoveItem {
96 /// The [`Position`] of the item.
97 at: Position,
98 },
99
100 /// The last items of a chunk have been detached, i.e. the chunk has been
101 /// truncated.
102 DetachLastItems {
103 /// The split position. Before this position (`..position`), items are
104 /// kept, from this position (`position..`), items are
105 /// detached.
106 at: Position,
107 },
108
109 /// Detached items (see [`Self::DetachLastItems`]) starts being reattached.
110 StartReattachItems,
111
112 /// Reattaching items (see [`Self::StartReattachItems`]) is finished.
113 EndReattachItems,
114
115 /// All chunks have been cleared, i.e. all items and all gaps have been
116 /// dropped.
117 Clear,
118}
119
120/// A collection of [`Update`]s that can be observed.
121///
122/// Get a value for this type with [`LinkedChunk::updates`].
123///
124/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
125#[derive(Debug)]
126pub struct ObservableUpdates<Item, Gap> {
127 pub(super) inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
128}
129
130impl<Item, Gap> ObservableUpdates<Item, Gap> {
131 /// Create a new [`ObservableUpdates`].
132 pub(super) fn new() -> Self {
133 Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
134 }
135
136 /// Push a new update.
137 pub(super) fn push(&mut self, update: Update<Item, Gap>) {
138 self.inner.write().unwrap().push(update);
139 }
140
141 /// Take new updates.
142 ///
143 /// Updates that have been taken will not be read again.
144 pub fn take(&mut self) -> Vec<Update<Item, Gap>>
145 where
146 Item: Clone,
147 Gap: Clone,
148 {
149 self.inner.write().unwrap().take().to_owned()
150 }
151
152 /// Subscribe to updates by using a [`Stream`].
153 #[cfg(test)]
154 pub(super) fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
155 // A subscriber is a new update reader, it needs its own token.
156 let token = self.new_reader_token();
157
158 UpdatesSubscriber::new(Arc::downgrade(&self.inner), token)
159 }
160
161 /// Generate a new [`ReaderToken`].
162 pub(super) fn new_reader_token(&mut self) -> ReaderToken {
163 let mut inner = self.inner.write().unwrap();
164
165 // Add 1 before reading the `last_token`, in this particular order, because the
166 // 0 token is reserved by `MAIN_READER_TOKEN`.
167 inner.last_token += 1;
168 let last_token = inner.last_token;
169
170 inner.last_index_per_reader.insert(last_token, 0);
171
172 last_token
173 }
174}
175
176/// A token used to represent readers that read the updates in
177/// [`UpdatesInner`].
178pub(super) type ReaderToken = usize;
179
180/// Inner type for [`ObservableUpdates`].
181///
182/// The particularity of this type is that multiple readers can read the
183/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
184/// [`ObservableUpdates`]) is considered to be the _main reader_ (it has the
185/// token [`Self::MAIN_READER_TOKEN`]).
186///
187/// An update that have been read by all readers are garbage collected to be
188/// removed from the memory. An update will never be read twice by the same
189/// reader.
190///
191/// Why do we need multiple readers? The public API reads the updates with
192/// [`ObservableUpdates::take`], but the private API must also read the updates
193/// for example with [`UpdatesSubscriber`]. Of course, they can be multiple
194/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple
195/// readers.
196#[derive(Debug)]
197pub(super) struct UpdatesInner<Item, Gap> {
198 /// All the updates that have not been read by all readers.
199 updates: Vec<Update<Item, Gap>>,
200
201 /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
202 /// A reader is identified by a [`ReaderToken`].
203 ///
204 /// To each reader token is associated an index that represents the index of
205 /// the last reading. It is used to never return the same update twice.
206 last_index_per_reader: HashMap<ReaderToken, usize>,
207
208 /// The last generated token. This is useful to generate new token.
209 last_token: ReaderToken,
210
211 /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed
212 /// everytime it is called.
213 wakers: Vec<Waker>,
214}
215
216impl<Item, Gap> UpdatesInner<Item, Gap> {
217 /// The token used by the main reader. See [`Self::take`] to learn more.
218 const MAIN_READER_TOKEN: ReaderToken = 0;
219
220 /// Create a new [`Self`].
221 fn new() -> Self {
222 Self {
223 updates: Vec::with_capacity(8),
224 last_index_per_reader: {
225 let mut map = HashMap::with_capacity(2);
226 map.insert(Self::MAIN_READER_TOKEN, 0);
227
228 map
229 },
230 last_token: Self::MAIN_READER_TOKEN,
231 wakers: Vec::with_capacity(2),
232 }
233 }
234
235 /// Push a new update.
236 fn push(&mut self, update: Update<Item, Gap>) {
237 self.updates.push(update);
238
239 // Wake them up \o/.
240 for waker in self.wakers.drain(..) {
241 waker.wake();
242 }
243 }
244
245 /// Take new updates; it considers the caller is the main reader, i.e. it
246 /// will use the [`Self::MAIN_READER_TOKEN`].
247 ///
248 /// Updates that have been read will never be read again by the current
249 /// reader.
250 ///
251 /// Learn more by reading [`Self::take_with_token`].
252 fn take(&mut self) -> &[Update<Item, Gap>] {
253 self.take_with_token(Self::MAIN_READER_TOKEN)
254 }
255
256 /// Take new updates with a particular reader token.
257 ///
258 /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
259 /// A reader is identified by a [`ReaderToken`]. Every reader can
260 /// take/read/consume each update only once. An internal index is stored
261 /// per reader token to know where to start reading updates next time this
262 /// method is called.
263 pub(super) fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
264 // Let's garbage collect unused updates.
265 self.garbage_collect();
266
267 let index = self
268 .last_index_per_reader
269 .get_mut(&token)
270 .expect("Given `UpdatesToken` does not map to any index");
271
272 // Read new updates, and update the index.
273 let slice = &self.updates[*index..];
274 *index = self.updates.len();
275
276 slice
277 }
278
279 /// Return the number of updates in the buffer.
280 #[cfg(test)]
281 fn len(&self) -> usize {
282 self.updates.len()
283 }
284
285 /// Garbage collect unused updates. An update is considered unused when it's
286 /// been read by all readers.
287 ///
288 /// Basically, it reduces to finding the smallest last index for all
289 /// readers, and clear from 0 to that index.
290 fn garbage_collect(&mut self) {
291 let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
292
293 if min_index > 0 {
294 let _ = self.updates.drain(0..min_index);
295
296 // Let's shift the indices to the left by `min_index` to preserve them.
297 for index in self.last_index_per_reader.values_mut() {
298 *index -= min_index;
299 }
300 }
301 }
302}
303
304/// A subscriber to [`ObservableUpdates`]. It is helpful to receive updates via
305/// a [`Stream`].
306pub(super) struct UpdatesSubscriber<Item, Gap> {
307 /// Weak reference to [`UpdatesInner`].
308 ///
309 /// Using a weak reference allows [`ObservableUpdates`] to be dropped
310 /// freely even if a subscriber exists.
311 updates: Weak<RwLock<UpdatesInner<Item, Gap>>>,
312
313 /// The token to read the updates.
314 token: ReaderToken,
315}
316
317impl<Item, Gap> UpdatesSubscriber<Item, Gap> {
318 /// Create a new [`Self`].
319 #[cfg(test)]
320 fn new(updates: Weak<RwLock<UpdatesInner<Item, Gap>>>, token: ReaderToken) -> Self {
321 Self { updates, token }
322 }
323}
324
325impl<Item, Gap> Stream for UpdatesSubscriber<Item, Gap>
326where
327 Item: Clone,
328 Gap: Clone,
329{
330 type Item = Vec<Update<Item, Gap>>;
331
332 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333 let Some(updates) = self.updates.upgrade() else {
334 // The `ObservableUpdates` has been dropped. It's time to close this stream.
335 return Poll::Ready(None);
336 };
337
338 let mut updates = updates.write().unwrap();
339 let the_updates = updates.take_with_token(self.token);
340
341 // No updates.
342 if the_updates.is_empty() {
343 // Let's register the waker.
344 updates.wakers.push(context.waker().clone());
345
346 // The stream is pending.
347 return Poll::Pending;
348 }
349
350 // There is updates! Let's forward them in this stream.
351 Poll::Ready(Some(the_updates.to_owned()))
352 }
353}
354
355impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
356 fn drop(&mut self) {
357 // Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
358 // This is important so that the garbage collector can do its jobs correctly
359 // without a dead dangling reader token.
360 if let Some(updates) = self.updates.upgrade() {
361 let mut updates = updates.write().unwrap();
362
363 // Remove the reader token from `UpdatesInner`.
364 // It's safe to ignore the result of `remove` here: `None` means the token was
365 // already removed (note: it should be unreachable).
366 let _ = updates.last_index_per_reader.remove(&self.token);
367 }
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use std::{
374 sync::{Arc, Mutex},
375 task::{Context, Poll, Wake},
376 };
377
378 use assert_matches::assert_matches;
379 use futures_util::pin_mut;
380
381 use super::{super::LinkedChunk, ChunkIdentifier, Position, Stream, UpdatesInner};
382
383 #[test]
384 fn test_updates_take_and_garbage_collector() {
385 use super::Update::*;
386
387 let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
388
389 // Simulate another updates “reader”, it can a subscriber.
390 let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
391 let other_token = {
392 let updates = linked_chunk.updates().unwrap();
393 let mut inner = updates.inner.write().unwrap();
394 inner.last_token += 1;
395
396 let other_token = inner.last_token;
397 inner.last_index_per_reader.insert(other_token, 0);
398
399 other_token
400 };
401
402 // There is an initial update.
403 {
404 let updates = linked_chunk.updates().unwrap();
405
406 assert_eq!(
407 updates.take(),
408 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
409 );
410 assert_eq!(
411 updates.inner.write().unwrap().take_with_token(other_token),
412 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
413 );
414 }
415
416 // No new update.
417 {
418 let updates = linked_chunk.updates().unwrap();
419
420 assert!(updates.take().is_empty());
421 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
422 }
423
424 linked_chunk.push_items_back(['a']);
425 linked_chunk.push_items_back(['b']);
426 linked_chunk.push_items_back(['c']);
427
428 // Scenario 1: “main” takes the new updates, “other” doesn't take the new
429 // updates.
430 //
431 // 0 1 2 3
432 // +---+---+---+
433 // | a | b | c |
434 // +---+---+---+
435 //
436 // “main” will move its index from 0 to 3.
437 // “other” won't move its index.
438 {
439 let updates = linked_chunk.updates().unwrap();
440
441 {
442 // Inspect number of updates in memory.
443 assert_eq!(updates.inner.read().unwrap().len(), 3);
444 }
445
446 assert_eq!(
447 updates.take(),
448 &[
449 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
450 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
451 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
452 ]
453 );
454
455 {
456 let inner = updates.inner.read().unwrap();
457
458 // Inspect number of updates in memory.
459 // It must be the same number as before as the garbage collector weren't not
460 // able to remove any unused updates.
461 assert_eq!(inner.len(), 3);
462
463 // Inspect the indices.
464 let indices = &inner.last_index_per_reader;
465
466 assert_eq!(indices.get(&main_token), Some(&3));
467 assert_eq!(indices.get(&other_token), Some(&0));
468 }
469 }
470
471 linked_chunk.push_items_back(['d']);
472 linked_chunk.push_items_back(['e']);
473 linked_chunk.push_items_back(['f']);
474
475 // Scenario 2: “other“ takes the new updates, “main” doesn't take the
476 // new updates.
477 //
478 // 0 1 2 3 4 5 6
479 // +---+---+---+---+---+---+
480 // | a | b | c | d | e | f |
481 // +---+---+---+---+---+---+
482 //
483 // “main” won't move its index.
484 // “other” will move its index from 0 to 6.
485 {
486 let updates = linked_chunk.updates().unwrap();
487
488 assert_eq!(
489 updates.inner.write().unwrap().take_with_token(other_token),
490 &[
491 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
492 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
493 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
494 PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
495 PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
496 PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
497 ]
498 );
499
500 {
501 let inner = updates.inner.read().unwrap();
502
503 // Inspect number of updates in memory.
504 // It must be the same number as before as the garbage collector will be able to
505 // remove unused updates but at the next call…
506 assert_eq!(inner.len(), 6);
507
508 // Inspect the indices.
509 let indices = &inner.last_index_per_reader;
510
511 assert_eq!(indices.get(&main_token), Some(&3));
512 assert_eq!(indices.get(&other_token), Some(&6));
513 }
514 }
515
516 // Scenario 3: “other” take new updates, but there is none, “main”
517 // doesn't take new updates. The garbage collector will run and collect
518 // unused updates.
519 //
520 // 0 1 2 3
521 // +---+---+---+
522 // | d | e | f |
523 // +---+---+---+
524 //
525 // “main” will have its index updated from 3 to 0.
526 // “other” will have its index updated from 6 to 3.
527 {
528 let updates = linked_chunk.updates().unwrap();
529
530 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
531
532 {
533 let inner = updates.inner.read().unwrap();
534
535 // Inspect number of updates in memory.
536 // The garbage collector has removed unused updates.
537 assert_eq!(inner.len(), 3);
538
539 // Inspect the indices. They must have been adjusted.
540 let indices = &inner.last_index_per_reader;
541
542 assert_eq!(indices.get(&main_token), Some(&0));
543 assert_eq!(indices.get(&other_token), Some(&3));
544 }
545 }
546
547 linked_chunk.push_items_back(['g']);
548 linked_chunk.push_items_back(['h']);
549 linked_chunk.push_items_back(['i']);
550
551 // Scenario 4: both “main” and “other” take the new updates.
552 //
553 // 0 1 2 3 4 5 6
554 // +---+---+---+---+---+---+
555 // | d | e | f | g | h | i |
556 // +---+---+---+---+---+---+
557 //
558 // “main” will have its index updated from 0 to 3.
559 // “other” will have its index updated from 6 to 3.
560 {
561 let updates = linked_chunk.updates().unwrap();
562
563 assert_eq!(
564 updates.take(),
565 &[
566 PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
567 PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
568 PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
569 PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
570 PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
571 PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
572 ]
573 );
574 assert_eq!(
575 updates.inner.write().unwrap().take_with_token(other_token),
576 &[
577 PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
578 PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
579 PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
580 ]
581 );
582
583 {
584 let inner = updates.inner.read().unwrap();
585
586 // Inspect number of updates in memory.
587 // The garbage collector had a chance to collect the first 3 updates.
588 assert_eq!(inner.len(), 3);
589
590 // Inspect the indices.
591 let indices = &inner.last_index_per_reader;
592
593 assert_eq!(indices.get(&main_token), Some(&3));
594 assert_eq!(indices.get(&other_token), Some(&3));
595 }
596 }
597
598 // Scenario 5: no more updates but they both try to take new updates.
599 // The garbage collector will collect all updates as all of them as
600 // been read already.
601 //
602 // “main” will have its index updated from 0 to 0.
603 // “other” will have its index updated from 3 to 0.
604 {
605 let updates = linked_chunk.updates().unwrap();
606
607 assert!(updates.take().is_empty());
608 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
609
610 {
611 let inner = updates.inner.read().unwrap();
612
613 // Inspect number of updates in memory.
614 // The garbage collector had a chance to collect all updates.
615 assert_eq!(inner.len(), 0);
616
617 // Inspect the indices.
618 let indices = &inner.last_index_per_reader;
619
620 assert_eq!(indices.get(&main_token), Some(&0));
621 assert_eq!(indices.get(&other_token), Some(&0));
622 }
623 }
624 }
625
626 struct CounterWaker {
627 number_of_wakeup: Mutex<usize>,
628 }
629
630 impl Wake for CounterWaker {
631 fn wake(self: Arc<Self>) {
632 *self.number_of_wakeup.lock().unwrap() += 1;
633 }
634 }
635
636 #[test]
637 fn test_updates_stream() {
638 use super::Update::*;
639
640 let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
641 let waker = counter_waker.clone().into();
642 let mut context = Context::from_waker(&waker);
643
644 let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
645
646 let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
647 pin_mut!(updates_subscriber);
648
649 // Initial update, stream is ready.
650 assert_matches!(
651 updates_subscriber.as_mut().poll_next(&mut context),
652 Poll::Ready(Some(items)) => {
653 assert_eq!(
654 items,
655 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
656 );
657 }
658 );
659 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
660 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
661
662 // Let's generate an update.
663 linked_chunk.push_items_back(['a']);
664
665 // The waker must have been called.
666 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
667
668 // There is an update! Right after that, the stream is pending again.
669 assert_matches!(
670 updates_subscriber.as_mut().poll_next(&mut context),
671 Poll::Ready(Some(items)) => {
672 assert_eq!(
673 items,
674 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
675 );
676 }
677 );
678 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
679
680 // Let's generate two other updates.
681 linked_chunk.push_items_back(['b']);
682 linked_chunk.push_items_back(['c']);
683
684 // The waker must have been called only once for the two updates.
685 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
686
687 // We can consume the updates without the stream, but the stream continues to
688 // know it has updates.
689 assert_eq!(
690 linked_chunk.updates().unwrap().take(),
691 &[
692 NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None },
693 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
694 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
695 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
696 ]
697 );
698 assert_matches!(
699 updates_subscriber.as_mut().poll_next(&mut context),
700 Poll::Ready(Some(items)) => {
701 assert_eq!(
702 items,
703 &[
704 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
705 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
706 ]
707 );
708 }
709 );
710 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
711
712 // When dropping the `LinkedChunk`, it closes the stream.
713 drop(linked_chunk);
714 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
715
716 // Wakers calls have not changed.
717 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
718 }
719
720 #[test]
721 fn test_updates_multiple_streams() {
722 use super::Update::*;
723
724 let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
725 let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
726
727 let waker1 = counter_waker1.clone().into();
728 let waker2 = counter_waker2.clone().into();
729
730 let mut context1 = Context::from_waker(&waker1);
731 let mut context2 = Context::from_waker(&waker2);
732
733 let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
734
735 let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
736 pin_mut!(updates_subscriber1);
737
738 // Scope for `updates_subscriber2`.
739 let updates_subscriber2_token = {
740 let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
741 pin_mut!(updates_subscriber2);
742
743 // Initial updates, streams are ready.
744 assert_matches!(
745 updates_subscriber1.as_mut().poll_next(&mut context1),
746 Poll::Ready(Some(items)) => {
747 assert_eq!(
748 items,
749 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
750 );
751 }
752 );
753 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
754 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
755
756 assert_matches!(
757 updates_subscriber2.as_mut().poll_next(&mut context2),
758 Poll::Ready(Some(items)) => {
759 assert_eq!(
760 items,
761 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
762 );
763 }
764 );
765 assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
766 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);
767
768 // Let's generate an update.
769 linked_chunk.push_items_back(['a']);
770
771 // The wakers must have been called.
772 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
773 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);
774
775 // There is an update! Right after that, the streams are pending again.
776 assert_matches!(
777 updates_subscriber1.as_mut().poll_next(&mut context1),
778 Poll::Ready(Some(items)) => {
779 assert_eq!(
780 items,
781 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
782 );
783 }
784 );
785 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
786 assert_matches!(
787 updates_subscriber2.as_mut().poll_next(&mut context2),
788 Poll::Ready(Some(items)) => {
789 assert_eq!(
790 items,
791 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
792 );
793 }
794 );
795 assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
796
797 // Let's generate two other updates.
798 linked_chunk.push_items_back(['b']);
799 linked_chunk.push_items_back(['c']);
800
801 // A waker is consumed when called. The first call to `push_items_back` will
802 // call and consume the wakers. The second call to `push_items_back` will do
803 // nothing as the wakers have been consumed. New wakers will be registered on
804 // polling.
805 //
806 // So, the waker must have been called only once for the two updates.
807 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
808 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);
809
810 // Let's poll `updates_subscriber1` only.
811 assert_matches!(
812 updates_subscriber1.as_mut().poll_next(&mut context1),
813 Poll::Ready(Some(items)) => {
814 assert_eq!(
815 items,
816 &[
817 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
818 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
819 ]
820 );
821 }
822 );
823 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
824
825 // For the sake of this test, we also need to advance the main reader token.
826 let _ = linked_chunk.updates().unwrap().take();
827 let _ = linked_chunk.updates().unwrap().take();
828
829 // If we inspect the garbage collector state, `a`, `b` and `c` should still be
830 // present because not all of them have been consumed by `updates_subscriber2`
831 // yet.
832 {
833 let updates = linked_chunk.updates().unwrap();
834
835 let inner = updates.inner.read().unwrap();
836
837 // Inspect number of updates in memory.
838 // We get 2 because the garbage collector runs before data are taken, not after:
839 // `updates_subscriber2` has read `a` only, so `b` and `c` remain.
840 assert_eq!(inner.len(), 2);
841
842 // Inspect the indices.
843 let indices = &inner.last_index_per_reader;
844
845 assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
846 assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
847 }
848
849 // Poll `updates_subscriber1` again: there is no new update so it must be
850 // pending.
851 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
852
853 // The state of the garbage collector is unchanged: `a`, `b` and `c` are still
854 // in memory.
855 {
856 let updates = linked_chunk.updates().unwrap();
857
858 let inner = updates.inner.read().unwrap();
859
860 // Inspect number of updates in memory. Value is unchanged.
861 assert_eq!(inner.len(), 2);
862
863 // Inspect the indices. They are unchanged.
864 let indices = &inner.last_index_per_reader;
865
866 assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
867 assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
868 }
869
870 updates_subscriber2.token
871 // Drop `updates_subscriber2`!
872 };
873
874 // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
875 // still no new update, but it will run the garbage collector again, and this
876 // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
877 // collector must be empty.
878 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
879
880 // Inspect the garbage collector.
881 {
882 let updates = linked_chunk.updates().unwrap();
883
884 let inner = updates.inner.read().unwrap();
885
886 // Inspect number of updates in memory.
887 assert_eq!(inner.len(), 0);
888
889 // Inspect the indices.
890 let indices = &inner.last_index_per_reader;
891
892 assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
893 assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown!
894 }
895
896 // When dropping the `LinkedChunk`, it closes the stream.
897 drop(linked_chunk);
898 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
899 }
900}