matrix_sdk_ui/timeline/subscriber.rs
1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16 pin::Pin,
17 sync::Arc,
18 task::{Context, Poll},
19};
20
21use eyeball::Subscriber;
22use eyeball_im::{VectorDiff, VectorSubscriberBatchedStream};
23use eyeball_im_util::vector::{Skip, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26use pin_project_lite::pin_project;
27
28use super::{controller::ObservableItems, item::TimelineItem, TimelineDropHandle};
29
30pin_project! {
31 /// A stream that wraps a [`TimelineDropHandle`] so that the `Timeline`
32 /// isn't dropped until the `Stream` is dropped.
33 pub(super) struct TimelineWithDropHandle<S> {
34 #[pin]
35 inner: S,
36 drop_handle: Arc<TimelineDropHandle>,
37 }
38}
39
40impl<S> TimelineWithDropHandle<S> {
41 /// Create a new [`WithTimelineDropHandle`].
42 pub(super) fn new(inner: S, drop_handle: Arc<TimelineDropHandle>) -> Self {
43 Self { inner, drop_handle }
44 }
45}
46
47impl<S> Stream for TimelineWithDropHandle<S>
48where
49 S: Stream,
50{
51 type Item = S::Item;
52
53 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54 self.project().inner.poll_next(context)
55 }
56}
57
58pin_project! {
59 /// A type that creates a proper `Timeline` subscriber.
60 ///
61 /// This type implements [`Stream`], so that it's entirely transparent for
62 /// all consumers expecting an `impl Stream`.
63 ///
64 /// This `Stream` pipes `VectorDiff`s from [`ObservableItems`] into a batched
65 /// stream ([`VectorSubscriberBatchedStream`]), and then applies a skip
66 /// higher-order stream ([`Skip`]).
67 ///
68 /// `Skip` works by skipping the first _n_ values, where _n_ is referred
69 /// as `count`. Here, this `count` value is defined by a `Stream<Item =
70 /// usize>` (see [`Skip::dynamic_skip_with_initial_count`]). Everytime
71 /// the `count` stream produces a value, `Skip` adjusts its output.
72 /// `count` is managed by [`SkipCount`][skip::SkipCount], and is hold in
73 /// `TimelineMetadata::subscriber_skip_count`.
74 pub(super) struct TimelineSubscriber {
75 #[pin]
76 inner: Skip<VectorSubscriberBatchedStream<Arc<TimelineItem>>, Subscriber<usize>>,
77 }
78}
79
80impl TimelineSubscriber {
81 /// Creates a [`TimelineSubscriber`], in addition to the initial values of
82 /// the subscriber.
83 pub(super) fn new(
84 observable_items: &ObservableItems,
85 observable_skip_count: &skip::SkipCount,
86 ) -> (Vector<Arc<TimelineItem>>, Self) {
87 let (initial_values, stream) = observable_items
88 .subscribe()
89 .into_values_and_batched_stream()
90 .dynamic_skip_with_initial_count(
91 // The `SkipCount` value may have been modified before the subscriber is
92 // created. Let's use the current value instead of hardcoding it to 0.
93 observable_skip_count.get(),
94 observable_skip_count.subscribe(),
95 );
96
97 (initial_values, Self { inner: stream })
98 }
99}
100
101impl Stream for TimelineSubscriber {
102 type Item = Vec<VectorDiff<Arc<TimelineItem>>>;
103
104 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105 self.project().inner.poll_next(context)
106 }
107}
108
109pub mod skip {
110 use eyeball::{SharedObservable, Subscriber};
111
112 use super::super::controller::TimelineFocusKind;
113
114 const MAXIMUM_NUMBER_OF_INITIAL_ITEMS: usize = 20;
115
116 /// `SkipCount` helps to manage the `count` value used by the [`Skip`]
117 /// higher-order stream used by the [`TimelineSubscriber`]. See its
118 /// documentation to learn more.
119 ///
120 /// [`Skip`]: eyeball_im_util::vector::Skip
121 /// [`TimelineSubscriber`]: super::TimelineSubscriber
122 #[derive(Clone, Debug)]
123 pub struct SkipCount {
124 count: SharedObservable<usize>,
125 }
126
127 impl SkipCount {
128 /// Create a [`SkipCount`] with a default `count` value set to 0.
129 pub fn new() -> Self {
130 Self { count: SharedObservable::new(0) }
131 }
132
133 /// Compute the `count` value for [the `Skip` higher-order
134 /// stream][`Skip`].
135 ///
136 /// This is useful when new items are inserted, removed and so on.
137 ///
138 /// [`Skip`]: eyeball_im_util::vector::Skip
139 pub fn compute_next(
140 &self,
141 previous_number_of_items: usize,
142 next_number_of_items: usize,
143 ) -> usize {
144 let current_count = self.count.get();
145
146 // Initial states: no items are present.
147 if previous_number_of_items == 0 {
148 // Adjust the count to provide a maximum number of initial items. We want to
149 // skip the first items until we get a certain number of items to display.
150 //
151 // | `next_number_of_items` | `MAX…` | output | will display |
152 // |------------------------|--------|--------|--------------|
153 // | 60 | 20 | 40 | 20 items |
154 // | 10 | 20 | 0 | 10 items |
155 // | 0 | 20 | 0 | 0 item |
156 //
157 next_number_of_items.saturating_sub(MAXIMUM_NUMBER_OF_INITIAL_ITEMS)
158 }
159 // Not the initial state: there are items.
160 else {
161 // There are less items than before. Shift to the left `count` by the difference
162 // between `previous_number_of_items` and `next_number_of_items` to keep the
163 // same number of items in the stream as much as possible.
164 //
165 // This is not a backwards pagination, it cannot “go below 0”, however this is
166 // necessary to handle the case where the timeline is cleared and
167 // the number of items becomes 0 for example.
168 if next_number_of_items < previous_number_of_items {
169 current_count.saturating_sub(previous_number_of_items - next_number_of_items)
170 }
171 // Return `current_count` with no modification, we don't want to adjust the
172 // count, we want to see all initial items and new items.
173 else {
174 current_count
175 }
176 }
177 }
178
179 /// Compute the `count` value for [the `Skip` higher-order
180 /// stream][`Skip`] when a backwards pagination is happening.
181 ///
182 /// It returns the new value for `count` in addition to
183 /// `Some(number_of_items)` to fulfill the page up to `page_size`,
184 /// `None` otherwise. For example, assuming a `page_size` of 15,
185 /// if the `count` moves from 10 to 0, then 10 new items will
186 /// appear in the stream, but 5 are missing because they aren't
187 /// present in the stream: the stream has reached its beginning:
188 /// `Some(5)` will be returned. This is useful
189 /// for the pagination mechanism to fill the timeline with more items,
190 /// either from a storage, or from the network.
191 ///
192 /// [`Skip`]: eyeball_im_util::vector::Skip
193 pub fn compute_next_when_paginating_backwards(
194 &self,
195 page_size: usize,
196 ) -> (usize, Option<usize>) {
197 let current_count = self.count.get();
198
199 // We skip the values from the start of the timeline; paginating backwards means
200 // we have to reduce the count until reaching 0.
201 //
202 // | `current_count` | `page_size` | output |
203 // |-----------------|-------------|----------------|
204 // | 50 | 20 | (30, None) |
205 // | 30 | 20 | (10, None) |
206 // | 10 | 20 | (0, Some(10)) |
207 // | 0 | 20 | (0, Some(20)) |
208 // ^ ^^^^^^^^
209 // | |
210 // | it needs 20 items to fulfill the
211 // | page size
212 // count becomes 0
213 //
214 if current_count >= page_size {
215 (current_count - page_size, None)
216 } else {
217 (0, Some(page_size - current_count))
218 }
219 }
220
221 /// Compute the `count` value for [the `Skip` higher-order
222 /// stream][`Skip`] when a forwards pagination is happening.
223 ///
224 /// The `page_size` is present to mimic the
225 /// [`compute_count_when_paginating_backwards`] function but it is
226 /// actually useless for the current implementation.
227 ///
228 /// [`Skip`]: eyeball_im_util::vector::Skip
229 #[allow(unused)] // this is not used yet because only a live timeline is using it, but as soon as
230 // other kind of timelines will use it, we would need it, it's better to have
231 // this in case of; everything is tested, the logic is made more robust.
232 pub fn compute_next_when_paginating_forwards(&self, _page_size: usize) -> usize {
233 // Nothing to do, the count remains unchanged as we skip the first values, not
234 // the last values; paginating forwards will add items at the end, not at the
235 // start of the timeline.
236 self.count.get()
237 }
238
239 /// Get the current count value.
240 pub fn get(&self) -> usize {
241 self.count.get()
242 }
243
244 /// Subscribe to updates of the count value.
245 pub fn subscribe(&self) -> Subscriber<usize> {
246 self.count.subscribe()
247 }
248
249 /// Update the skip count if and only if the timeline has a live focus
250 /// ([`TimelineFocusKind::Live`]).
251 pub fn update(&self, count: usize, focus_kind: &TimelineFocusKind) {
252 if matches!(focus_kind, TimelineFocusKind::Live) {
253 self.count.set_if_not_eq(count);
254 }
255 }
256 }
257
258 #[cfg(test)]
259 mod tests {
260 use super::SkipCount;
261
262 #[test]
263 fn test_compute_count_from_underflowing_initial_states() {
264 let skip_count = SkipCount::new();
265
266 // Initial state with too few new items. None is skipped.
267 let previous_number_of_items = 0;
268 let next_number_of_items = previous_number_of_items + 10;
269 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
270 assert_eq!(count, 0);
271 skip_count.count.set(count);
272
273 // Add 5 new items. The count stays at 0 because we don't want to skip the
274 // previous items.
275 let previous_number_of_items = next_number_of_items;
276 let next_number_of_items = previous_number_of_items + 5;
277 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
278 assert_eq!(count, 0);
279 skip_count.count.set(count);
280
281 // Add 20 new items. The count stays at 0 because we don't want to
282 // skip the previous items.
283 let previous_number_of_items = next_number_of_items;
284 let next_number_of_items = previous_number_of_items + 20;
285 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
286 assert_eq!(count, 0);
287 skip_count.count.set(count);
288
289 // Remove a certain number of items. The count stays at 0 because it was
290 // previously 0, no items are skipped, nothing to adjust.
291 let previous_number_of_items = next_number_of_items;
292 let next_number_of_items = previous_number_of_items - 4;
293 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
294 assert_eq!(count, 0);
295 skip_count.count.set(count);
296
297 // Remove all items. The count goes to 0 (regardless it was 0 before).
298 let previous_number_of_items = next_number_of_items;
299 let next_number_of_items = 0;
300 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
301 assert_eq!(count, 0);
302 }
303
304 #[test]
305 fn test_compute_count_from_overflowing_initial_states() {
306 let skip_count = SkipCount::new();
307
308 // Initial state with too much new items. Some are skipped.
309 let previous_number_of_items = 0;
310 let next_number_of_items = previous_number_of_items + 30;
311 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
312 assert_eq!(count, 10);
313 skip_count.count.set(count);
314
315 // Add 5 new items. The count stays at 10 because we don't want to skip the
316 // previous items.
317 let previous_number_of_items = next_number_of_items;
318 let next_number_of_items = previous_number_of_items + 5;
319 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
320 assert_eq!(count, 10);
321 skip_count.count.set(count);
322
323 // Add 20 new items. The count stays at 10 because we don't want to
324 // skip the previous items.
325 let previous_number_of_items = next_number_of_items;
326 let next_number_of_items = previous_number_of_items + 20;
327 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
328 assert_eq!(count, 10);
329 skip_count.count.set(count);
330
331 // Remove a certain number of items. The count is reduced by 5 so that the same
332 // number of items are presented.
333 let previous_number_of_items = next_number_of_items;
334 let next_number_of_items = previous_number_of_items - 4;
335 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
336 assert_eq!(count, 6);
337 skip_count.count.set(count);
338
339 // Remove all items. The count goes to 0 (regardless it was 6 before).
340 let previous_number_of_items = next_number_of_items;
341 let next_number_of_items = 0;
342 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
343 assert_eq!(count, 0);
344 }
345
346 #[test]
347 fn test_compute_count_when_paginating_backwards_from_underflowing_initial_states() {
348 let skip_count = SkipCount::new();
349
350 // Initial state with too few new items. None is skipped.
351 let previous_number_of_items = 0;
352 let next_number_of_items = previous_number_of_items + 10;
353 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
354 assert_eq!(count, 0);
355 skip_count.count.set(count);
356
357 // Add 30 new items. The count stays at 0 because we don't want to skip the
358 // previous items.
359 let previous_number_of_items = next_number_of_items;
360 let next_number_of_items = previous_number_of_items + 30;
361 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
362 assert_eq!(count, 0);
363 skip_count.count.set(count);
364
365 let page_size = 20;
366
367 // Paginate backwards.
368 let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
369 assert_eq!(count, 0);
370 assert_eq!(needs, Some(20));
371 }
372
373 #[test]
374 fn test_compute_count_when_paginating_backwards_from_overflowing_initial_states() {
375 let skip_count = SkipCount::new();
376
377 // Initial state with too much new items. Some are skipped.
378 let previous_number_of_items = 0;
379 let next_number_of_items = previous_number_of_items + 50;
380 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
381 assert_eq!(count, 30);
382 skip_count.count.set(count);
383
384 // Add 30 new items. The count stays at 30 because we don't want to
385 // skip the previous items.
386 let previous_number_of_items = next_number_of_items;
387 let next_number_of_items = previous_number_of_items + 30;
388 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
389 assert_eq!(count, 30);
390 skip_count.count.set(count);
391
392 let page_size = 20;
393
394 // Paginate backwards. The count shifts by `page_size`, and the page is full.
395 let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
396 assert_eq!(count, 10);
397 assert_eq!(needs, None);
398 skip_count.count.set(count);
399
400 // Paginate backwards. The count shifts by `page_size` but reaches 0 before the
401 // page becomes full. It needs 10 more items to fulfill the page.
402 let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
403 assert_eq!(count, 0);
404 assert_eq!(needs, Some(10));
405 }
406
407 #[test]
408 fn test_compute_count_when_paginating_forwards_from_underflowing_initial_states() {
409 let skip_count = SkipCount::new();
410
411 // Initial state with too few new items. None is skipped.
412 let previous_number_of_items = 0;
413 let next_number_of_items = previous_number_of_items + 10;
414 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
415 assert_eq!(count, 0);
416 skip_count.count.set(count);
417
418 // Add 30 new items. The count stays at 0 because we don't want to skip the
419 // previous items.
420 let previous_number_of_items = next_number_of_items;
421 let next_number_of_items = previous_number_of_items + 30;
422 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
423 assert_eq!(count, 0);
424 skip_count.count.set(count);
425
426 let page_size = 20;
427
428 // Paginate forwards. The count remains unchanged.
429 let count = skip_count.compute_next_when_paginating_forwards(page_size);
430 assert_eq!(count, 0);
431 }
432
433 #[test]
434 fn test_compute_count_when_paginating_forwards_from_overflowing_initial_states() {
435 let skip_count = SkipCount::new();
436
437 // Initial state with too much new items. Some are skipped.
438 let previous_number_of_items = 0;
439 let next_number_of_items = previous_number_of_items + 50;
440 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
441 assert_eq!(count, 30);
442 skip_count.count.set(count);
443
444 // Add 30 new items. The count stays at 30 because we don't want to
445 // skip the previous items.
446 let previous_number_of_items = next_number_of_items;
447 let next_number_of_items = previous_number_of_items + 30;
448 let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
449 assert_eq!(count, 30);
450 skip_count.count.set(count);
451
452 let page_size = 20;
453
454 // Paginate forwards. The count remains unchanged.
455 let count = skip_count.compute_next_when_paginating_forwards(page_size);
456 assert_eq!(count, 30);
457 }
458 }
459}