matrix_sdk_common/linked_chunk/updates.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16 collections::HashMap,
17 sync::{Arc, RwLock},
18 task::Waker,
19};
20
21use super::{ChunkIdentifier, Position};
22
23/// Represent the updates that have happened inside a [`LinkedChunk`].
24///
25/// To retrieve the updates, use [`LinkedChunk::updates`].
26///
27/// These updates are useful to store a `LinkedChunk` in another form of
28/// storage, like a database or something similar.
29///
30/// [`LinkedChunk`]: super::LinkedChunk
31/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
32#[derive(Debug, Clone, PartialEq)]
33pub enum Update<Item, Gap> {
34 /// A new chunk of kind Items has been created.
35 NewItemsChunk {
36 /// The identifier of the previous chunk of this new chunk.
37 previous: Option<ChunkIdentifier>,
38
39 /// The identifier of the new chunk.
40 new: ChunkIdentifier,
41
42 /// The identifier of the next chunk of this new chunk.
43 next: Option<ChunkIdentifier>,
44 },
45
46 /// A new chunk of kind Gap has been created.
47 NewGapChunk {
48 /// The identifier of the previous chunk of this new chunk.
49 previous: Option<ChunkIdentifier>,
50
51 /// The identifier of the new chunk.
52 new: ChunkIdentifier,
53
54 /// The identifier of the next chunk of this new chunk.
55 next: Option<ChunkIdentifier>,
56
57 /// The content of the chunk.
58 gap: Gap,
59 },
60
61 /// A chunk has been removed.
62 RemoveChunk(ChunkIdentifier),
63
64 /// Items are pushed inside a chunk of kind Items.
65 PushItems {
66 /// The [`Position`] of the items.
67 ///
68 /// This value is given to prevent the need for position computations by
69 /// the update readers. Items are pushed, so the positions should be
70 /// incrementally computed from the previous items, which requires the
71 /// reading of the last previous item. With `at`, the update readers no
72 /// longer need to do so.
73 at: Position,
74
75 /// The items.
76 items: Vec<Item>,
77 },
78
79 /// An item has been replaced in the linked chunk.
80 ///
81 /// The `at` position MUST resolve to the actual position an existing *item*
82 /// (not a gap).
83 ReplaceItem {
84 /// The position of the item that's being replaced.
85 at: Position,
86
87 /// The new value for the item.
88 item: Item,
89 },
90
91 /// An item has been removed inside a chunk of kind Items.
92 RemoveItem {
93 /// The [`Position`] of the item.
94 at: Position,
95 },
96
97 /// The last items of a chunk have been detached, i.e. the chunk has been
98 /// truncated.
99 DetachLastItems {
100 /// The split position. Before this position (`..position`), items are
101 /// kept, from this position (`position..`), items are
102 /// detached.
103 at: Position,
104 },
105
106 /// Detached items (see [`Self::DetachLastItems`]) starts being reattached.
107 StartReattachItems,
108
109 /// Reattaching items (see [`Self::StartReattachItems`]) is finished.
110 EndReattachItems,
111
112 /// All chunks have been cleared, i.e. all items and all gaps have been
113 /// dropped.
114 Clear,
115}
116
117/// A collection of [`Update`]s that can be observed.
118///
119/// Get a value for this type with [`LinkedChunk::updates`].
120///
121/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
122#[derive(Debug)]
123pub struct ObservableUpdates<Item, Gap> {
124 pub(super) inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
125}
126
127impl<Item, Gap> ObservableUpdates<Item, Gap> {
128 /// Create a new [`ObservableUpdates`].
129 pub(super) fn new() -> Self {
130 Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
131 }
132
133 /// Push a new update.
134 pub(super) fn push(&mut self, update: Update<Item, Gap>) {
135 self.inner.write().unwrap().push(update);
136 }
137
138 /// Clear all pending updates.
139 pub(super) fn clear_pending(&mut self) {
140 self.inner.write().unwrap().clear_pending();
141 }
142
143 /// Take new updates.
144 ///
145 /// Updates that have been taken will not be read again.
146 pub fn take(&mut self) -> Vec<Update<Item, Gap>>
147 where
148 Item: Clone,
149 Gap: Clone,
150 {
151 self.inner.write().unwrap().take().to_owned()
152 }
153
154 /// Subscribe to updates by using a [`Stream`].
155 #[cfg(test)]
156 pub(super) fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
157 // A subscriber is a new update reader, it needs its own token.
158 let token = self.new_reader_token();
159
160 UpdatesSubscriber::new(Arc::downgrade(&self.inner), token)
161 }
162
163 /// Generate a new [`ReaderToken`].
164 pub(super) fn new_reader_token(&mut self) -> ReaderToken {
165 let mut inner = self.inner.write().unwrap();
166
167 // Add 1 before reading the `last_token`, in this particular order, because the
168 // 0 token is reserved by `MAIN_READER_TOKEN`.
169 inner.last_token += 1;
170 let last_token = inner.last_token;
171
172 inner.last_index_per_reader.insert(last_token, 0);
173
174 last_token
175 }
176}
177
178/// A token used to represent readers that read the updates in
179/// [`UpdatesInner`].
180pub(super) type ReaderToken = usize;
181
182/// Inner type for [`ObservableUpdates`].
183///
184/// The particularity of this type is that multiple readers can read the
185/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
186/// [`ObservableUpdates`]) is considered to be the _main reader_ (it has the
187/// token [`Self::MAIN_READER_TOKEN`]).
188///
189/// An update that have been read by all readers are garbage collected to be
190/// removed from the memory. An update will never be read twice by the same
191/// reader.
192///
193/// Why do we need multiple readers? The public API reads the updates with
194/// [`ObservableUpdates::take`], but the private API must also read the updates
195/// for example with [`UpdatesSubscriber`]. Of course, they can be multiple
196/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple
197/// readers.
198#[derive(Debug)]
199pub(super) struct UpdatesInner<Item, Gap> {
200 /// All the updates that have not been read by all readers.
201 updates: Vec<Update<Item, Gap>>,
202
203 /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
204 /// A reader is identified by a [`ReaderToken`].
205 ///
206 /// To each reader token is associated an index that represents the index of
207 /// the last reading. It is used to never return the same update twice.
208 last_index_per_reader: HashMap<ReaderToken, usize>,
209
210 /// The last generated token. This is useful to generate new token.
211 last_token: ReaderToken,
212
213 /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed
214 /// everytime it is called.
215 wakers: Vec<Waker>,
216}
217
218impl<Item, Gap> UpdatesInner<Item, Gap> {
219 /// The token used by the main reader. See [`Self::take`] to learn more.
220 const MAIN_READER_TOKEN: ReaderToken = 0;
221
222 /// Create a new [`Self`].
223 fn new() -> Self {
224 Self {
225 updates: Vec::with_capacity(8),
226 last_index_per_reader: {
227 let mut map = HashMap::with_capacity(2);
228 map.insert(Self::MAIN_READER_TOKEN, 0);
229
230 map
231 },
232 last_token: Self::MAIN_READER_TOKEN,
233 wakers: Vec::with_capacity(2),
234 }
235 }
236
237 /// Push a new update.
238 fn push(&mut self, update: Update<Item, Gap>) {
239 self.updates.push(update);
240
241 // Wake them up \o/.
242 for waker in self.wakers.drain(..) {
243 waker.wake();
244 }
245 }
246
247 /// Clear all pending updates.
248 fn clear_pending(&mut self) {
249 self.updates.clear();
250
251 // Reset all the per-reader indices.
252 for idx in self.last_index_per_reader.values_mut() {
253 *idx = 0;
254 }
255
256 // No need to wake the wakers; they're waiting for a new update, and we
257 // just made them all disappear.
258 }
259
260 /// Take new updates; it considers the caller is the main reader, i.e. it
261 /// will use the [`Self::MAIN_READER_TOKEN`].
262 ///
263 /// Updates that have been read will never be read again by the current
264 /// reader.
265 ///
266 /// Learn more by reading [`Self::take_with_token`].
267 fn take(&mut self) -> &[Update<Item, Gap>] {
268 self.take_with_token(Self::MAIN_READER_TOKEN)
269 }
270
271 /// Take new updates with a particular reader token.
272 ///
273 /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
274 /// A reader is identified by a [`ReaderToken`]. Every reader can
275 /// take/read/consume each update only once. An internal index is stored
276 /// per reader token to know where to start reading updates next time this
277 /// method is called.
278 pub(super) fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
279 // Let's garbage collect unused updates.
280 self.garbage_collect();
281
282 let index = self
283 .last_index_per_reader
284 .get_mut(&token)
285 .expect("Given `UpdatesToken` does not map to any index");
286
287 // Read new updates, and update the index.
288 let slice = &self.updates[*index..];
289 *index = self.updates.len();
290
291 slice
292 }
293
294 /// Has the given reader, identified by its [`ReaderToken`], some pending
295 /// updates, or has it consumed all the pending updates?
296 pub(super) fn is_reader_up_to_date(&self, token: ReaderToken) -> bool {
297 *self.last_index_per_reader.get(&token).expect("unknown reader token") == self.updates.len()
298 }
299
300 /// Return the number of updates in the buffer.
301 #[cfg(test)]
302 fn len(&self) -> usize {
303 self.updates.len()
304 }
305
306 /// Garbage collect unused updates. An update is considered unused when it's
307 /// been read by all readers.
308 ///
309 /// Basically, it reduces to finding the smallest last index for all
310 /// readers, and clear from 0 to that index.
311 fn garbage_collect(&mut self) {
312 let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
313
314 if min_index > 0 {
315 let _ = self.updates.drain(0..min_index);
316
317 // Let's shift the indices to the left by `min_index` to preserve them.
318 for index in self.last_index_per_reader.values_mut() {
319 *index -= min_index;
320 }
321 }
322 }
323}
324
325/// A subscriber to [`ObservableUpdates`]. It is helpful to receive updates via
326/// a [`Stream`].
327#[cfg(test)]
328pub(super) struct UpdatesSubscriber<Item, Gap> {
329 /// Weak reference to [`UpdatesInner`].
330 ///
331 /// Using a weak reference allows [`ObservableUpdates`] to be dropped
332 /// freely even if a subscriber exists.
333 updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>,
334
335 /// The token to read the updates.
336 token: ReaderToken,
337}
338
339#[cfg(test)]
340impl<Item, Gap> UpdatesSubscriber<Item, Gap> {
341 /// Create a new [`Self`].
342 #[cfg(test)]
343 fn new(updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>, token: ReaderToken) -> Self {
344 Self { updates, token }
345 }
346}
347
348#[cfg(test)]
349impl<Item, Gap> futures_core::Stream for UpdatesSubscriber<Item, Gap>
350where
351 Item: Clone,
352 Gap: Clone,
353{
354 type Item = Vec<Update<Item, Gap>>;
355
356 fn poll_next(
357 self: std::pin::Pin<&mut Self>,
358 context: &mut std::task::Context<'_>,
359 ) -> std::task::Poll<Option<Self::Item>> {
360 let Some(updates) = self.updates.upgrade() else {
361 // The `ObservableUpdates` has been dropped. It's time to close this stream.
362 return std::task::Poll::Ready(None);
363 };
364
365 let mut updates = updates.write().unwrap();
366 let the_updates = updates.take_with_token(self.token);
367
368 // No updates.
369 if the_updates.is_empty() {
370 // Let's register the waker.
371 updates.wakers.push(context.waker().clone());
372
373 // The stream is pending.
374 return std::task::Poll::Pending;
375 }
376
377 // There is updates! Let's forward them in this stream.
378 std::task::Poll::Ready(Some(the_updates.to_owned()))
379 }
380}
381
382#[cfg(test)]
383impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
384 fn drop(&mut self) {
385 // Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
386 // This is important so that the garbage collector can do its jobs correctly
387 // without a dead dangling reader token.
388 if let Some(updates) = self.updates.upgrade() {
389 let mut updates = updates.write().unwrap();
390
391 // Remove the reader token from `UpdatesInner`.
392 // It's safe to ignore the result of `remove` here: `None` means the token was
393 // already removed (note: it should be unreachable).
394 let _ = updates.last_index_per_reader.remove(&self.token);
395 }
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use std::{
402 sync::{Arc, Mutex},
403 task::{Context, Poll, Wake},
404 };
405
406 use assert_matches::assert_matches;
407 use futures_core::Stream;
408 use futures_util::pin_mut;
409
410 use super::{super::LinkedChunk, ChunkIdentifier, Position, UpdatesInner};
411
412 #[test]
413 fn test_updates_take_and_garbage_collector() {
414 use super::Update::*;
415
416 let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
417
418 // Simulate another updates “reader”, it can a subscriber.
419 let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
420 let other_token = {
421 let updates = linked_chunk.updates().unwrap();
422 let mut inner = updates.inner.write().unwrap();
423 inner.last_token += 1;
424
425 let other_token = inner.last_token;
426 inner.last_index_per_reader.insert(other_token, 0);
427
428 other_token
429 };
430
431 // There is an initial update.
432 {
433 let updates = linked_chunk.updates().unwrap();
434
435 assert_eq!(
436 updates.take(),
437 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
438 );
439 assert_eq!(
440 updates.inner.write().unwrap().take_with_token(other_token),
441 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
442 );
443 }
444
445 // No new update.
446 {
447 let updates = linked_chunk.updates().unwrap();
448
449 assert!(updates.take().is_empty());
450 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
451 }
452
453 linked_chunk.push_items_back(['a']);
454 linked_chunk.push_items_back(['b']);
455 linked_chunk.push_items_back(['c']);
456
457 // Scenario 1: “main” takes the new updates, “other” doesn't take the new
458 // updates.
459 //
460 // 0 1 2 3
461 // +---+---+---+
462 // | a | b | c |
463 // +---+---+---+
464 //
465 // “main” will move its index from 0 to 3.
466 // “other” won't move its index.
467 {
468 let updates = linked_chunk.updates().unwrap();
469
470 {
471 // Inspect number of updates in memory.
472 assert_eq!(updates.inner.read().unwrap().len(), 3);
473 }
474
475 assert_eq!(
476 updates.take(),
477 &[
478 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
479 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
480 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
481 ]
482 );
483
484 {
485 let inner = updates.inner.read().unwrap();
486
487 // Inspect number of updates in memory.
488 // It must be the same number as before as the garbage collector weren't not
489 // able to remove any unused updates.
490 assert_eq!(inner.len(), 3);
491
492 // Inspect the indices.
493 let indices = &inner.last_index_per_reader;
494
495 assert_eq!(indices.get(&main_token), Some(&3));
496 assert_eq!(indices.get(&other_token), Some(&0));
497 }
498 }
499
500 linked_chunk.push_items_back(['d']);
501 linked_chunk.push_items_back(['e']);
502 linked_chunk.push_items_back(['f']);
503
504 // Scenario 2: “other“ takes the new updates, “main” doesn't take the
505 // new updates.
506 //
507 // 0 1 2 3 4 5 6
508 // +---+---+---+---+---+---+
509 // | a | b | c | d | e | f |
510 // +---+---+---+---+---+---+
511 //
512 // “main” won't move its index.
513 // “other” will move its index from 0 to 6.
514 {
515 let updates = linked_chunk.updates().unwrap();
516
517 assert_eq!(
518 updates.inner.write().unwrap().take_with_token(other_token),
519 &[
520 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
521 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
522 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
523 PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
524 PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
525 PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
526 ]
527 );
528
529 {
530 let inner = updates.inner.read().unwrap();
531
532 // Inspect number of updates in memory.
533 // It must be the same number as before as the garbage collector will be able to
534 // remove unused updates but at the next call…
535 assert_eq!(inner.len(), 6);
536
537 // Inspect the indices.
538 let indices = &inner.last_index_per_reader;
539
540 assert_eq!(indices.get(&main_token), Some(&3));
541 assert_eq!(indices.get(&other_token), Some(&6));
542 }
543 }
544
545 // Scenario 3: “other” take new updates, but there is none, “main”
546 // doesn't take new updates. The garbage collector will run and collect
547 // unused updates.
548 //
549 // 0 1 2 3
550 // +---+---+---+
551 // | d | e | f |
552 // +---+---+---+
553 //
554 // “main” will have its index updated from 3 to 0.
555 // “other” will have its index updated from 6 to 3.
556 {
557 let updates = linked_chunk.updates().unwrap();
558
559 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
560
561 {
562 let inner = updates.inner.read().unwrap();
563
564 // Inspect number of updates in memory.
565 // The garbage collector has removed unused updates.
566 assert_eq!(inner.len(), 3);
567
568 // Inspect the indices. They must have been adjusted.
569 let indices = &inner.last_index_per_reader;
570
571 assert_eq!(indices.get(&main_token), Some(&0));
572 assert_eq!(indices.get(&other_token), Some(&3));
573 }
574 }
575
576 linked_chunk.push_items_back(['g']);
577 linked_chunk.push_items_back(['h']);
578 linked_chunk.push_items_back(['i']);
579
580 // Scenario 4: both “main” and “other” take the new updates.
581 //
582 // 0 1 2 3 4 5 6
583 // +---+---+---+---+---+---+
584 // | d | e | f | g | h | i |
585 // +---+---+---+---+---+---+
586 //
587 // “main” will have its index updated from 0 to 3.
588 // “other” will have its index updated from 6 to 3.
589 {
590 let updates = linked_chunk.updates().unwrap();
591
592 assert_eq!(
593 updates.take(),
594 &[
595 PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
596 PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
597 PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
598 PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
599 PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
600 PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
601 ]
602 );
603 assert_eq!(
604 updates.inner.write().unwrap().take_with_token(other_token),
605 &[
606 PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
607 PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
608 PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
609 ]
610 );
611
612 {
613 let inner = updates.inner.read().unwrap();
614
615 // Inspect number of updates in memory.
616 // The garbage collector had a chance to collect the first 3 updates.
617 assert_eq!(inner.len(), 3);
618
619 // Inspect the indices.
620 let indices = &inner.last_index_per_reader;
621
622 assert_eq!(indices.get(&main_token), Some(&3));
623 assert_eq!(indices.get(&other_token), Some(&3));
624 }
625 }
626
627 // Scenario 5: no more updates but they both try to take new updates.
628 // The garbage collector will collect all updates as all of them as
629 // been read already.
630 //
631 // “main” will have its index updated from 0 to 0.
632 // “other” will have its index updated from 3 to 0.
633 {
634 let updates = linked_chunk.updates().unwrap();
635
636 assert!(updates.take().is_empty());
637 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
638
639 {
640 let inner = updates.inner.read().unwrap();
641
642 // Inspect number of updates in memory.
643 // The garbage collector had a chance to collect all updates.
644 assert_eq!(inner.len(), 0);
645
646 // Inspect the indices.
647 let indices = &inner.last_index_per_reader;
648
649 assert_eq!(indices.get(&main_token), Some(&0));
650 assert_eq!(indices.get(&other_token), Some(&0));
651 }
652 }
653 }
654
655 struct CounterWaker {
656 number_of_wakeup: Mutex<usize>,
657 }
658
659 impl Wake for CounterWaker {
660 fn wake(self: Arc<Self>) {
661 *self.number_of_wakeup.lock().unwrap() += 1;
662 }
663 }
664
665 #[test]
666 fn test_updates_stream() {
667 use super::Update::*;
668
669 let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
670 let waker = counter_waker.clone().into();
671 let mut context = Context::from_waker(&waker);
672
673 let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
674
675 let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
676 pin_mut!(updates_subscriber);
677
678 // Initial update, stream is ready.
679 assert_matches!(
680 updates_subscriber.as_mut().poll_next(&mut context),
681 Poll::Ready(Some(items)) => {
682 assert_eq!(
683 items,
684 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
685 );
686 }
687 );
688 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
689 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
690
691 // Let's generate an update.
692 linked_chunk.push_items_back(['a']);
693
694 // The waker must have been called.
695 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
696
697 // There is an update! Right after that, the stream is pending again.
698 assert_matches!(
699 updates_subscriber.as_mut().poll_next(&mut context),
700 Poll::Ready(Some(items)) => {
701 assert_eq!(
702 items,
703 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
704 );
705 }
706 );
707 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
708
709 // Let's generate two other updates.
710 linked_chunk.push_items_back(['b']);
711 linked_chunk.push_items_back(['c']);
712
713 // The waker must have been called only once for the two updates.
714 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
715
716 // We can consume the updates without the stream, but the stream continues to
717 // know it has updates.
718 assert_eq!(
719 linked_chunk.updates().unwrap().take(),
720 &[
721 NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None },
722 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
723 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
724 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
725 ]
726 );
727 assert_matches!(
728 updates_subscriber.as_mut().poll_next(&mut context),
729 Poll::Ready(Some(items)) => {
730 assert_eq!(
731 items,
732 &[
733 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
734 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
735 ]
736 );
737 }
738 );
739 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
740
741 // When dropping the `LinkedChunk`, it closes the stream.
742 drop(linked_chunk);
743 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
744
745 // Wakers calls have not changed.
746 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
747 }
748
749 #[test]
750 fn test_updates_multiple_streams() {
751 use super::Update::*;
752
753 let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
754 let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
755
756 let waker1 = counter_waker1.clone().into();
757 let waker2 = counter_waker2.clone().into();
758
759 let mut context1 = Context::from_waker(&waker1);
760 let mut context2 = Context::from_waker(&waker2);
761
762 let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
763
764 let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
765 pin_mut!(updates_subscriber1);
766
767 // Scope for `updates_subscriber2`.
768 let updates_subscriber2_token = {
769 let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
770 pin_mut!(updates_subscriber2);
771
772 // Initial updates, streams are ready.
773 assert_matches!(
774 updates_subscriber1.as_mut().poll_next(&mut context1),
775 Poll::Ready(Some(items)) => {
776 assert_eq!(
777 items,
778 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
779 );
780 }
781 );
782 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
783 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
784
785 assert_matches!(
786 updates_subscriber2.as_mut().poll_next(&mut context2),
787 Poll::Ready(Some(items)) => {
788 assert_eq!(
789 items,
790 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
791 );
792 }
793 );
794 assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
795 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);
796
797 // Let's generate an update.
798 linked_chunk.push_items_back(['a']);
799
800 // The wakers must have been called.
801 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
802 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);
803
804 // There is an update! Right after that, the streams are pending again.
805 assert_matches!(
806 updates_subscriber1.as_mut().poll_next(&mut context1),
807 Poll::Ready(Some(items)) => {
808 assert_eq!(
809 items,
810 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
811 );
812 }
813 );
814 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
815 assert_matches!(
816 updates_subscriber2.as_mut().poll_next(&mut context2),
817 Poll::Ready(Some(items)) => {
818 assert_eq!(
819 items,
820 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
821 );
822 }
823 );
824 assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
825
826 // Let's generate two other updates.
827 linked_chunk.push_items_back(['b']);
828 linked_chunk.push_items_back(['c']);
829
830 // A waker is consumed when called. The first call to `push_items_back` will
831 // call and consume the wakers. The second call to `push_items_back` will do
832 // nothing as the wakers have been consumed. New wakers will be registered on
833 // polling.
834 //
835 // So, the waker must have been called only once for the two updates.
836 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
837 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);
838
839 // Let's poll `updates_subscriber1` only.
840 assert_matches!(
841 updates_subscriber1.as_mut().poll_next(&mut context1),
842 Poll::Ready(Some(items)) => {
843 assert_eq!(
844 items,
845 &[
846 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
847 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
848 ]
849 );
850 }
851 );
852 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
853
854 // For the sake of this test, we also need to advance the main reader token.
855 let _ = linked_chunk.updates().unwrap().take();
856 let _ = linked_chunk.updates().unwrap().take();
857
858 // If we inspect the garbage collector state, `a`, `b` and `c` should still be
859 // present because not all of them have been consumed by `updates_subscriber2`
860 // yet.
861 {
862 let updates = linked_chunk.updates().unwrap();
863
864 let inner = updates.inner.read().unwrap();
865
866 // Inspect number of updates in memory.
867 // We get 2 because the garbage collector runs before data are taken, not after:
868 // `updates_subscriber2` has read `a` only, so `b` and `c` remain.
869 assert_eq!(inner.len(), 2);
870
871 // Inspect the indices.
872 let indices = &inner.last_index_per_reader;
873
874 assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
875 assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
876 }
877
878 // Poll `updates_subscriber1` again: there is no new update so it must be
879 // pending.
880 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
881
882 // The state of the garbage collector is unchanged: `a`, `b` and `c` are still
883 // in memory.
884 {
885 let updates = linked_chunk.updates().unwrap();
886
887 let inner = updates.inner.read().unwrap();
888
889 // Inspect number of updates in memory. Value is unchanged.
890 assert_eq!(inner.len(), 2);
891
892 // Inspect the indices. They are unchanged.
893 let indices = &inner.last_index_per_reader;
894
895 assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
896 assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
897 }
898
899 updates_subscriber2.token
900 // Drop `updates_subscriber2`!
901 };
902
903 // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
904 // still no new update, but it will run the garbage collector again, and this
905 // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
906 // collector must be empty.
907 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
908
909 // Inspect the garbage collector.
910 {
911 let updates = linked_chunk.updates().unwrap();
912
913 let inner = updates.inner.read().unwrap();
914
915 // Inspect number of updates in memory.
916 assert_eq!(inner.len(), 0);
917
918 // Inspect the indices.
919 let indices = &inner.last_index_per_reader;
920
921 assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
922 assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown!
923 }
924
925 // When dropping the `LinkedChunk`, it closes the stream.
926 drop(linked_chunk);
927 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
928 }
929}