matrix_sdk_common/linked_chunk/updates.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16 collections::HashMap,
17 sync::{Arc, RwLock},
18 task::Waker,
19};
20
21use super::{ChunkIdentifier, Position};
22
23/// Represent the updates that have happened inside a [`LinkedChunk`].
24///
25/// To retrieve the updates, use [`LinkedChunk::updates`].
26///
27/// These updates are useful to store a `LinkedChunk` in another form of
28/// storage, like a database or something similar.
29///
30/// [`LinkedChunk`]: super::LinkedChunk
31/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
32#[derive(Debug, Clone, PartialEq)]
33pub enum Update<Item, Gap> {
34 /// A new chunk of kind Items has been created.
35 NewItemsChunk {
36 /// The identifier of the previous chunk of this new chunk.
37 previous: Option<ChunkIdentifier>,
38
39 /// The identifier of the new chunk.
40 new: ChunkIdentifier,
41
42 /// The identifier of the next chunk of this new chunk.
43 next: Option<ChunkIdentifier>,
44 },
45
46 /// A new chunk of kind Gap has been created.
47 NewGapChunk {
48 /// The identifier of the previous chunk of this new chunk.
49 previous: Option<ChunkIdentifier>,
50
51 /// The identifier of the new chunk.
52 new: ChunkIdentifier,
53
54 /// The identifier of the next chunk of this new chunk.
55 next: Option<ChunkIdentifier>,
56
57 /// The content of the chunk.
58 gap: Gap,
59 },
60
61 /// A chunk has been removed.
62 RemoveChunk(ChunkIdentifier),
63
64 /// Items are pushed inside a chunk of kind Items.
65 PushItems {
66 /// The [`Position`] of the items.
67 ///
68 /// This value is given to prevent the need for position computations by
69 /// the update readers. Items are pushed, so the positions should be
70 /// incrementally computed from the previous items, which requires the
71 /// reading of the last previous item. With `at`, the update readers no
72 /// longer need to do so.
73 at: Position,
74
75 /// The items.
76 items: Vec<Item>,
77 },
78
79 /// An item has been replaced in the linked chunk.
80 ///
81 /// The `at` position MUST resolve to the actual position an existing *item*
82 /// (not a gap).
83 ReplaceItem {
84 /// The position of the item that's being replaced.
85 at: Position,
86
87 /// The new value for the item.
88 item: Item,
89 },
90
91 /// An item has been removed inside a chunk of kind Items.
92 RemoveItem {
93 /// The [`Position`] of the item.
94 at: Position,
95 },
96
97 /// The last items of a chunk have been detached, i.e. the chunk has been
98 /// truncated.
99 DetachLastItems {
100 /// The split position. Before this position (`..position`), items are
101 /// kept, from this position (`position..`), items are
102 /// detached.
103 at: Position,
104 },
105
106 /// Detached items (see [`Self::DetachLastItems`]) starts being reattached.
107 StartReattachItems,
108
109 /// Reattaching items (see [`Self::StartReattachItems`]) is finished.
110 EndReattachItems,
111
112 /// All chunks have been cleared, i.e. all items and all gaps have been
113 /// dropped.
114 Clear,
115}
116
117impl<Item, Gap> Update<Item, Gap> {
118 /// Get the items from the [`Update`] if any.
119 ///
120 /// This function is useful if you only care about the items from the
121 /// [`Update`] and not what kind of update it was and where the items
122 /// should be placed.
123 ///
124 /// [`Update`] variants which don't contain any items will return an empty
125 /// [`Vec`].
126 pub fn into_items(self) -> Vec<Item> {
127 match self {
128 Update::NewItemsChunk { .. }
129 | Update::NewGapChunk { .. }
130 | Update::RemoveChunk(_)
131 | Update::RemoveItem { .. }
132 | Update::DetachLastItems { .. }
133 | Update::StartReattachItems
134 | Update::EndReattachItems
135 | Update::Clear => vec![],
136 Update::PushItems { items, .. } => items,
137 Update::ReplaceItem { item, .. } => vec![item],
138 }
139 }
140}
141
142/// A collection of [`Update`]s that can be observed.
143///
144/// Get a value for this type with [`LinkedChunk::updates`].
145///
146/// [`LinkedChunk::updates`]: super::LinkedChunk::updates
147#[derive(Debug)]
148pub struct ObservableUpdates<Item, Gap> {
149 pub(super) inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
150}
151
152impl<Item, Gap> ObservableUpdates<Item, Gap> {
153 /// Create a new [`ObservableUpdates`].
154 pub(super) fn new() -> Self {
155 Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
156 }
157
158 /// Push a new update.
159 pub(super) fn push(&mut self, update: Update<Item, Gap>) {
160 self.inner.write().unwrap().push(update);
161 }
162
163 /// Clear all pending updates.
164 pub(super) fn clear_pending(&mut self) {
165 self.inner.write().unwrap().clear_pending();
166 }
167
168 /// Take new updates.
169 ///
170 /// Updates that have been taken will not be read again.
171 pub fn take(&mut self) -> Vec<Update<Item, Gap>>
172 where
173 Item: Clone,
174 Gap: Clone,
175 {
176 self.inner.write().unwrap().take().to_owned()
177 }
178
179 /// Subscribe to updates by using a [`Stream`].
180 #[cfg(test)]
181 pub(super) fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
182 // A subscriber is a new update reader, it needs its own token.
183 let token = self.new_reader_token();
184
185 UpdatesSubscriber::new(Arc::downgrade(&self.inner), token)
186 }
187
188 /// Generate a new [`ReaderToken`].
189 pub(super) fn new_reader_token(&mut self) -> ReaderToken {
190 let mut inner = self.inner.write().unwrap();
191
192 // Add 1 before reading the `last_token`, in this particular order, because the
193 // 0 token is reserved by `MAIN_READER_TOKEN`.
194 inner.last_token += 1;
195 let last_token = inner.last_token;
196
197 inner.last_index_per_reader.insert(last_token, 0);
198
199 last_token
200 }
201}
202
203/// A token used to represent readers that read the updates in
204/// [`UpdatesInner`].
205pub(super) type ReaderToken = usize;
206
207/// Inner type for [`ObservableUpdates`].
208///
209/// The particularity of this type is that multiple readers can read the
210/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
211/// [`ObservableUpdates`]) is considered to be the _main reader_ (it has the
212/// token [`Self::MAIN_READER_TOKEN`]).
213///
214/// An update that have been read by all readers are garbage collected to be
215/// removed from the memory. An update will never be read twice by the same
216/// reader.
217///
218/// Why do we need multiple readers? The public API reads the updates with
219/// [`ObservableUpdates::take`], but the private API must also read the updates
220/// for example with [`UpdatesSubscriber`]. Of course, they can be multiple
221/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple
222/// readers.
223#[derive(Debug)]
224pub(super) struct UpdatesInner<Item, Gap> {
225 /// All the updates that have not been read by all readers.
226 updates: Vec<Update<Item, Gap>>,
227
228 /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
229 /// A reader is identified by a [`ReaderToken`].
230 ///
231 /// To each reader token is associated an index that represents the index of
232 /// the last reading. It is used to never return the same update twice.
233 last_index_per_reader: HashMap<ReaderToken, usize>,
234
235 /// The last generated token. This is useful to generate new token.
236 last_token: ReaderToken,
237
238 /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed
239 /// everytime it is called.
240 wakers: Vec<Waker>,
241}
242
243impl<Item, Gap> UpdatesInner<Item, Gap> {
244 /// The token used by the main reader. See [`Self::take`] to learn more.
245 const MAIN_READER_TOKEN: ReaderToken = 0;
246
247 /// Create a new [`Self`].
248 fn new() -> Self {
249 Self {
250 updates: Vec::with_capacity(8),
251 last_index_per_reader: {
252 let mut map = HashMap::with_capacity(2);
253 map.insert(Self::MAIN_READER_TOKEN, 0);
254
255 map
256 },
257 last_token: Self::MAIN_READER_TOKEN,
258 wakers: Vec::with_capacity(2),
259 }
260 }
261
262 /// Push a new update.
263 fn push(&mut self, update: Update<Item, Gap>) {
264 self.updates.push(update);
265
266 // Wake them up \o/.
267 for waker in self.wakers.drain(..) {
268 waker.wake();
269 }
270 }
271
272 /// Clear all pending updates.
273 fn clear_pending(&mut self) {
274 self.updates.clear();
275
276 // Reset all the per-reader indices.
277 for idx in self.last_index_per_reader.values_mut() {
278 *idx = 0;
279 }
280
281 // No need to wake the wakers; they're waiting for a new update, and we
282 // just made them all disappear.
283 }
284
285 /// Take new updates; it considers the caller is the main reader, i.e. it
286 /// will use the [`Self::MAIN_READER_TOKEN`].
287 ///
288 /// Updates that have been read will never be read again by the current
289 /// reader.
290 ///
291 /// Learn more by reading [`Self::take_with_token`].
292 fn take(&mut self) -> &[Update<Item, Gap>] {
293 self.take_with_token(Self::MAIN_READER_TOKEN)
294 }
295
296 /// Take new updates with a particular reader token.
297 ///
298 /// Updates are stored in [`Self::updates`]. Multiple readers can read them.
299 /// A reader is identified by a [`ReaderToken`]. Every reader can
300 /// take/read/consume each update only once. An internal index is stored
301 /// per reader token to know where to start reading updates next time this
302 /// method is called.
303 pub(super) fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
304 // Let's garbage collect unused updates.
305 self.garbage_collect();
306
307 let index = self
308 .last_index_per_reader
309 .get_mut(&token)
310 .expect("Given `UpdatesToken` does not map to any index");
311
312 // Read new updates, and update the index.
313 let slice = &self.updates[*index..];
314 *index = self.updates.len();
315
316 slice
317 }
318
319 /// Has the given reader, identified by its [`ReaderToken`], some pending
320 /// updates, or has it consumed all the pending updates?
321 pub(super) fn is_reader_up_to_date(&self, token: ReaderToken) -> bool {
322 *self.last_index_per_reader.get(&token).expect("unknown reader token") == self.updates.len()
323 }
324
325 /// Return the number of updates in the buffer.
326 #[cfg(test)]
327 fn len(&self) -> usize {
328 self.updates.len()
329 }
330
331 /// Garbage collect unused updates. An update is considered unused when it's
332 /// been read by all readers.
333 ///
334 /// Basically, it reduces to finding the smallest last index for all
335 /// readers, and clear from 0 to that index.
336 fn garbage_collect(&mut self) {
337 let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
338
339 if min_index > 0 {
340 let _ = self.updates.drain(0..min_index);
341
342 // Let's shift the indices to the left by `min_index` to preserve them.
343 for index in self.last_index_per_reader.values_mut() {
344 *index -= min_index;
345 }
346 }
347 }
348}
349
350/// A subscriber to [`ObservableUpdates`]. It is helpful to receive updates via
351/// a [`Stream`].
352#[cfg(test)]
353pub(super) struct UpdatesSubscriber<Item, Gap> {
354 /// Weak reference to [`UpdatesInner`].
355 ///
356 /// Using a weak reference allows [`ObservableUpdates`] to be dropped
357 /// freely even if a subscriber exists.
358 updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>,
359
360 /// The token to read the updates.
361 token: ReaderToken,
362}
363
364#[cfg(test)]
365impl<Item, Gap> UpdatesSubscriber<Item, Gap> {
366 /// Create a new [`Self`].
367 #[cfg(test)]
368 fn new(updates: std::sync::Weak<RwLock<UpdatesInner<Item, Gap>>>, token: ReaderToken) -> Self {
369 Self { updates, token }
370 }
371}
372
373#[cfg(test)]
374impl<Item, Gap> futures_core::Stream for UpdatesSubscriber<Item, Gap>
375where
376 Item: Clone,
377 Gap: Clone,
378{
379 type Item = Vec<Update<Item, Gap>>;
380
381 fn poll_next(
382 self: std::pin::Pin<&mut Self>,
383 context: &mut std::task::Context<'_>,
384 ) -> std::task::Poll<Option<Self::Item>> {
385 let Some(updates) = self.updates.upgrade() else {
386 // The `ObservableUpdates` has been dropped. It's time to close this stream.
387 return std::task::Poll::Ready(None);
388 };
389
390 let mut updates = updates.write().unwrap();
391 let the_updates = updates.take_with_token(self.token);
392
393 // No updates.
394 if the_updates.is_empty() {
395 // Let's register the waker.
396 updates.wakers.push(context.waker().clone());
397
398 // The stream is pending.
399 return std::task::Poll::Pending;
400 }
401
402 // There is updates! Let's forward them in this stream.
403 std::task::Poll::Ready(Some(the_updates.to_owned()))
404 }
405}
406
407#[cfg(test)]
408impl<Item, Gap> Drop for UpdatesSubscriber<Item, Gap> {
409 fn drop(&mut self) {
410 // Remove `Self::token` from `UpdatesInner::last_index_per_reader`.
411 // This is important so that the garbage collector can do its jobs correctly
412 // without a dead dangling reader token.
413 if let Some(updates) = self.updates.upgrade() {
414 let mut updates = updates.write().unwrap();
415
416 // Remove the reader token from `UpdatesInner`.
417 // It's safe to ignore the result of `remove` here: `None` means the token was
418 // already removed (note: it should be unreachable).
419 let _ = updates.last_index_per_reader.remove(&self.token);
420 }
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use std::{
427 sync::{Arc, Mutex},
428 task::{Context, Poll, Wake},
429 };
430
431 use assert_matches::assert_matches;
432 use futures_core::Stream;
433 use futures_util::pin_mut;
434
435 use super::{super::LinkedChunk, ChunkIdentifier, Position, UpdatesInner};
436 use crate::linked_chunk::Update;
437
438 #[test]
439 fn test_updates_take_and_garbage_collector() {
440 use super::Update::*;
441
442 let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
443
444 // Simulate another updates “reader”, it can a subscriber.
445 let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
446 let other_token = {
447 let updates = linked_chunk.updates().unwrap();
448 let mut inner = updates.inner.write().unwrap();
449 inner.last_token += 1;
450
451 let other_token = inner.last_token;
452 inner.last_index_per_reader.insert(other_token, 0);
453
454 other_token
455 };
456
457 // There is an initial update.
458 {
459 let updates = linked_chunk.updates().unwrap();
460
461 assert_eq!(
462 updates.take(),
463 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
464 );
465 assert_eq!(
466 updates.inner.write().unwrap().take_with_token(other_token),
467 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }],
468 );
469 }
470
471 // No new update.
472 {
473 let updates = linked_chunk.updates().unwrap();
474
475 assert!(updates.take().is_empty());
476 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
477 }
478
479 linked_chunk.push_items_back(['a']);
480 linked_chunk.push_items_back(['b']);
481 linked_chunk.push_items_back(['c']);
482
483 // Scenario 1: “main” takes the new updates, “other” doesn't take the new
484 // updates.
485 //
486 // 0 1 2 3
487 // +---+---+---+
488 // | a | b | c |
489 // +---+---+---+
490 //
491 // “main” will move its index from 0 to 3.
492 // “other” won't move its index.
493 {
494 let updates = linked_chunk.updates().unwrap();
495
496 {
497 // Inspect number of updates in memory.
498 assert_eq!(updates.inner.read().unwrap().len(), 3);
499 }
500
501 assert_eq!(
502 updates.take(),
503 &[
504 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
505 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
506 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
507 ]
508 );
509
510 {
511 let inner = updates.inner.read().unwrap();
512
513 // Inspect number of updates in memory.
514 // It must be the same number as before as the garbage collector weren't not
515 // able to remove any unused updates.
516 assert_eq!(inner.len(), 3);
517
518 // Inspect the indices.
519 let indices = &inner.last_index_per_reader;
520
521 assert_eq!(indices.get(&main_token), Some(&3));
522 assert_eq!(indices.get(&other_token), Some(&0));
523 }
524 }
525
526 linked_chunk.push_items_back(['d']);
527 linked_chunk.push_items_back(['e']);
528 linked_chunk.push_items_back(['f']);
529
530 // Scenario 2: “other“ takes the new updates, “main” doesn't take the
531 // new updates.
532 //
533 // 0 1 2 3 4 5 6
534 // +---+---+---+---+---+---+
535 // | a | b | c | d | e | f |
536 // +---+---+---+---+---+---+
537 //
538 // “main” won't move its index.
539 // “other” will move its index from 0 to 6.
540 {
541 let updates = linked_chunk.updates().unwrap();
542
543 assert_eq!(
544 updates.inner.write().unwrap().take_with_token(other_token),
545 &[
546 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
547 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
548 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
549 PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
550 PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
551 PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
552 ]
553 );
554
555 {
556 let inner = updates.inner.read().unwrap();
557
558 // Inspect number of updates in memory.
559 // It must be the same number as before as the garbage collector will be able to
560 // remove unused updates but at the next call…
561 assert_eq!(inner.len(), 6);
562
563 // Inspect the indices.
564 let indices = &inner.last_index_per_reader;
565
566 assert_eq!(indices.get(&main_token), Some(&3));
567 assert_eq!(indices.get(&other_token), Some(&6));
568 }
569 }
570
571 // Scenario 3: “other” take new updates, but there is none, “main”
572 // doesn't take new updates. The garbage collector will run and collect
573 // unused updates.
574 //
575 // 0 1 2 3
576 // +---+---+---+
577 // | d | e | f |
578 // +---+---+---+
579 //
580 // “main” will have its index updated from 3 to 0.
581 // “other” will have its index updated from 6 to 3.
582 {
583 let updates = linked_chunk.updates().unwrap();
584
585 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
586
587 {
588 let inner = updates.inner.read().unwrap();
589
590 // Inspect number of updates in memory.
591 // The garbage collector has removed unused updates.
592 assert_eq!(inner.len(), 3);
593
594 // Inspect the indices. They must have been adjusted.
595 let indices = &inner.last_index_per_reader;
596
597 assert_eq!(indices.get(&main_token), Some(&0));
598 assert_eq!(indices.get(&other_token), Some(&3));
599 }
600 }
601
602 linked_chunk.push_items_back(['g']);
603 linked_chunk.push_items_back(['h']);
604 linked_chunk.push_items_back(['i']);
605
606 // Scenario 4: both “main” and “other” take the new updates.
607 //
608 // 0 1 2 3 4 5 6
609 // +---+---+---+---+---+---+
610 // | d | e | f | g | h | i |
611 // +---+---+---+---+---+---+
612 //
613 // “main” will have its index updated from 0 to 3.
614 // “other” will have its index updated from 6 to 3.
615 {
616 let updates = linked_chunk.updates().unwrap();
617
618 assert_eq!(
619 updates.take(),
620 &[
621 PushItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
622 PushItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
623 PushItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
624 PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
625 PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
626 PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
627 ]
628 );
629 assert_eq!(
630 updates.inner.write().unwrap().take_with_token(other_token),
631 &[
632 PushItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
633 PushItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
634 PushItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
635 ]
636 );
637
638 {
639 let inner = updates.inner.read().unwrap();
640
641 // Inspect number of updates in memory.
642 // The garbage collector had a chance to collect the first 3 updates.
643 assert_eq!(inner.len(), 3);
644
645 // Inspect the indices.
646 let indices = &inner.last_index_per_reader;
647
648 assert_eq!(indices.get(&main_token), Some(&3));
649 assert_eq!(indices.get(&other_token), Some(&3));
650 }
651 }
652
653 // Scenario 5: no more updates but they both try to take new updates.
654 // The garbage collector will collect all updates as all of them as
655 // been read already.
656 //
657 // “main” will have its index updated from 0 to 0.
658 // “other” will have its index updated from 3 to 0.
659 {
660 let updates = linked_chunk.updates().unwrap();
661
662 assert!(updates.take().is_empty());
663 assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
664
665 {
666 let inner = updates.inner.read().unwrap();
667
668 // Inspect number of updates in memory.
669 // The garbage collector had a chance to collect all updates.
670 assert_eq!(inner.len(), 0);
671
672 // Inspect the indices.
673 let indices = &inner.last_index_per_reader;
674
675 assert_eq!(indices.get(&main_token), Some(&0));
676 assert_eq!(indices.get(&other_token), Some(&0));
677 }
678 }
679 }
680
681 struct CounterWaker {
682 number_of_wakeup: Mutex<usize>,
683 }
684
685 impl Wake for CounterWaker {
686 fn wake(self: Arc<Self>) {
687 *self.number_of_wakeup.lock().unwrap() += 1;
688 }
689 }
690
691 #[test]
692 fn test_updates_stream() {
693 use super::Update::*;
694
695 let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
696 let waker = counter_waker.clone().into();
697 let mut context = Context::from_waker(&waker);
698
699 let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
700
701 let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
702 pin_mut!(updates_subscriber);
703
704 // Initial update, stream is ready.
705 assert_matches!(
706 updates_subscriber.as_mut().poll_next(&mut context),
707 Poll::Ready(Some(items)) => {
708 assert_eq!(
709 items,
710 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
711 );
712 }
713 );
714 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
715 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
716
717 // Let's generate an update.
718 linked_chunk.push_items_back(['a']);
719
720 // The waker must have been called.
721 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
722
723 // There is an update! Right after that, the stream is pending again.
724 assert_matches!(
725 updates_subscriber.as_mut().poll_next(&mut context),
726 Poll::Ready(Some(items)) => {
727 assert_eq!(
728 items,
729 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
730 );
731 }
732 );
733 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
734
735 // Let's generate two other updates.
736 linked_chunk.push_items_back(['b']);
737 linked_chunk.push_items_back(['c']);
738
739 // The waker must have been called only once for the two updates.
740 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
741
742 // We can consume the updates without the stream, but the stream continues to
743 // know it has updates.
744 assert_eq!(
745 linked_chunk.updates().unwrap().take(),
746 &[
747 NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None },
748 PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
749 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
750 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
751 ]
752 );
753 assert_matches!(
754 updates_subscriber.as_mut().poll_next(&mut context),
755 Poll::Ready(Some(items)) => {
756 assert_eq!(
757 items,
758 &[
759 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
760 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
761 ]
762 );
763 }
764 );
765 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
766
767 // When dropping the `LinkedChunk`, it closes the stream.
768 drop(linked_chunk);
769 assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
770
771 // Wakers calls have not changed.
772 assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
773 }
774
775 #[test]
776 fn test_updates_multiple_streams() {
777 use super::Update::*;
778
779 let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
780 let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
781
782 let waker1 = counter_waker1.clone().into();
783 let waker2 = counter_waker2.clone().into();
784
785 let mut context1 = Context::from_waker(&waker1);
786 let mut context2 = Context::from_waker(&waker2);
787
788 let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
789
790 let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe();
791 pin_mut!(updates_subscriber1);
792
793 // Scope for `updates_subscriber2`.
794 let updates_subscriber2_token = {
795 let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe();
796 pin_mut!(updates_subscriber2);
797
798 // Initial updates, streams are ready.
799 assert_matches!(
800 updates_subscriber1.as_mut().poll_next(&mut context1),
801 Poll::Ready(Some(items)) => {
802 assert_eq!(
803 items,
804 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
805 );
806 }
807 );
808 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
809 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0);
810
811 assert_matches!(
812 updates_subscriber2.as_mut().poll_next(&mut context2),
813 Poll::Ready(Some(items)) => {
814 assert_eq!(
815 items,
816 &[NewItemsChunk { previous: None, new: ChunkIdentifier(0), next: None }]
817 );
818 }
819 );
820 assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
821 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0);
822
823 // Let's generate an update.
824 linked_chunk.push_items_back(['a']);
825
826 // The wakers must have been called.
827 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1);
828 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1);
829
830 // There is an update! Right after that, the streams are pending again.
831 assert_matches!(
832 updates_subscriber1.as_mut().poll_next(&mut context1),
833 Poll::Ready(Some(items)) => {
834 assert_eq!(
835 items,
836 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
837 );
838 }
839 );
840 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
841 assert_matches!(
842 updates_subscriber2.as_mut().poll_next(&mut context2),
843 Poll::Ready(Some(items)) => {
844 assert_eq!(
845 items,
846 &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
847 );
848 }
849 );
850 assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending);
851
852 // Let's generate two other updates.
853 linked_chunk.push_items_back(['b']);
854 linked_chunk.push_items_back(['c']);
855
856 // A waker is consumed when called. The first call to `push_items_back` will
857 // call and consume the wakers. The second call to `push_items_back` will do
858 // nothing as the wakers have been consumed. New wakers will be registered on
859 // polling.
860 //
861 // So, the waker must have been called only once for the two updates.
862 assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2);
863 assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2);
864
865 // Let's poll `updates_subscriber1` only.
866 assert_matches!(
867 updates_subscriber1.as_mut().poll_next(&mut context1),
868 Poll::Ready(Some(items)) => {
869 assert_eq!(
870 items,
871 &[
872 PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
873 PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
874 ]
875 );
876 }
877 );
878 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
879
880 // For the sake of this test, we also need to advance the main reader token.
881 let _ = linked_chunk.updates().unwrap().take();
882 let _ = linked_chunk.updates().unwrap().take();
883
884 // If we inspect the garbage collector state, `a`, `b` and `c` should still be
885 // present because not all of them have been consumed by `updates_subscriber2`
886 // yet.
887 {
888 let updates = linked_chunk.updates().unwrap();
889
890 let inner = updates.inner.read().unwrap();
891
892 // Inspect number of updates in memory.
893 // We get 2 because the garbage collector runs before data are taken, not after:
894 // `updates_subscriber2` has read `a` only, so `b` and `c` remain.
895 assert_eq!(inner.len(), 2);
896
897 // Inspect the indices.
898 let indices = &inner.last_index_per_reader;
899
900 assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
901 assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
902 }
903
904 // Poll `updates_subscriber1` again: there is no new update so it must be
905 // pending.
906 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
907
908 // The state of the garbage collector is unchanged: `a`, `b` and `c` are still
909 // in memory.
910 {
911 let updates = linked_chunk.updates().unwrap();
912
913 let inner = updates.inner.read().unwrap();
914
915 // Inspect number of updates in memory. Value is unchanged.
916 assert_eq!(inner.len(), 2);
917
918 // Inspect the indices. They are unchanged.
919 let indices = &inner.last_index_per_reader;
920
921 assert_eq!(indices.get(&updates_subscriber1.token), Some(&2));
922 assert_eq!(indices.get(&updates_subscriber2.token), Some(&0));
923 }
924
925 updates_subscriber2.token
926 // Drop `updates_subscriber2`!
927 };
928
929 // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again:
930 // still no new update, but it will run the garbage collector again, and this
931 // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage
932 // collector must be empty.
933 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending);
934
935 // Inspect the garbage collector.
936 {
937 let updates = linked_chunk.updates().unwrap();
938
939 let inner = updates.inner.read().unwrap();
940
941 // Inspect number of updates in memory.
942 assert_eq!(inner.len(), 0);
943
944 // Inspect the indices.
945 let indices = &inner.last_index_per_reader;
946
947 assert_eq!(indices.get(&updates_subscriber1.token), Some(&0));
948 assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown!
949 }
950
951 // When dropping the `LinkedChunk`, it closes the stream.
952 drop(linked_chunk);
953 assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None));
954 }
955
956 #[test]
957 fn test_update_into_items() {
958 let updates: Update<_, u32> =
959 Update::PushItems { at: Position::new(ChunkIdentifier(0), 0), items: vec![1, 2, 3] };
960
961 assert_eq!(updates.into_items(), vec![1, 2, 3]);
962
963 let updates: Update<u32, u32> = Update::Clear;
964 assert!(updates.into_items().is_empty());
965
966 let updates: Update<u32, u32> =
967 Update::RemoveItem { at: Position::new(ChunkIdentifier(0), 0) };
968 assert!(updates.into_items().is_empty());
969
970 let updates: Update<u32, u32> =
971 Update::ReplaceItem { at: Position::new(ChunkIdentifier(0), 0), item: 42 };
972 assert_eq!(updates.into_items(), vec![42]);
973 }
974}