matrix_sdk_ui/timeline/
subscriber.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    pin::Pin,
17    sync::Arc,
18    task::{Context, Poll},
19};
20
21use eyeball::Subscriber;
22use eyeball_im::{VectorDiff, VectorSubscriberBatchedStream};
23use eyeball_im_util::vector::{Skip, VectorObserverExt};
24use futures_core::Stream;
25use imbl::Vector;
26use pin_project_lite::pin_project;
27
28use super::{controller::ObservableItems, item::TimelineItem, TimelineDropHandle};
29
30pin_project! {
31    /// A stream that wraps a [`TimelineDropHandle`] so that the `Timeline`
32    /// isn't dropped until the `Stream` is dropped.
33    pub(super) struct TimelineWithDropHandle<S> {
34        #[pin]
35        inner: S,
36        drop_handle: Arc<TimelineDropHandle>,
37    }
38}
39
40impl<S> TimelineWithDropHandle<S> {
41    /// Create a new [`WithTimelineDropHandle`].
42    pub(super) fn new(inner: S, drop_handle: Arc<TimelineDropHandle>) -> Self {
43        Self { inner, drop_handle }
44    }
45}
46
47impl<S> Stream for TimelineWithDropHandle<S>
48where
49    S: Stream,
50{
51    type Item = S::Item;
52
53    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
54        self.project().inner.poll_next(context)
55    }
56}
57
58pin_project! {
59    /// A type that creates a proper `Timeline` subscriber.
60    ///
61    /// This type implements [`Stream`], so that it's entirely transparent for
62    /// all consumers expecting an `impl Stream`.
63    ///
64    /// This `Stream` pipes `VectorDiff`s from [`ObservableItems`] into a batched
65    /// stream ([`VectorSubscriberBatchedStream`]), and then applies a skip
66    /// higher-order stream ([`Skip`]).
67    ///
68    /// `Skip` works by skipping the first _n_ values, where _n_ is referred
69    /// as `count`. Here, this `count` value is defined by a `Stream<Item =
70    /// usize>` (see [`Skip::dynamic_skip_with_initial_count`]). Everytime
71    /// the `count` stream produces a value, `Skip` adjusts its output.
72    /// `count` is managed by [`SkipCount`][skip::SkipCount], and is hold in
73    /// `TimelineMetadata::subscriber_skip_count`.
74    pub(super) struct TimelineSubscriber {
75        #[pin]
76        inner: Skip<VectorSubscriberBatchedStream<Arc<TimelineItem>>, Subscriber<usize>>,
77    }
78}
79
80impl TimelineSubscriber {
81    /// Creates a [`TimelineSubscriber`], in addition to the initial values of
82    /// the subscriber.
83    pub(super) fn new(
84        observable_items: &ObservableItems,
85        observable_skip_count: &skip::SkipCount,
86    ) -> (Vector<Arc<TimelineItem>>, Self) {
87        let (initial_values, stream) = observable_items
88            .subscribe()
89            .into_values_and_batched_stream()
90            .dynamic_skip_with_initial_count(
91                // The `SkipCount` value may have been modified before the subscriber is
92                // created. Let's use the current value instead of hardcoding it to 0.
93                observable_skip_count.get(),
94                observable_skip_count.subscribe(),
95            );
96
97        (initial_values, Self { inner: stream })
98    }
99}
100
101impl Stream for TimelineSubscriber {
102    type Item = Vec<VectorDiff<Arc<TimelineItem>>>;
103
104    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
105        self.project().inner.poll_next(context)
106    }
107}
108
109pub mod skip {
110    use eyeball::{SharedObservable, Subscriber};
111
112    use super::super::controller::TimelineFocusKind;
113
114    const MAXIMUM_NUMBER_OF_INITIAL_ITEMS: usize = 20;
115
116    /// `SkipCount` helps to manage the `count` value used by the [`Skip`]
117    /// higher-order stream used by the [`TimelineSubscriber`]. See its
118    /// documentation to learn more.
119    ///
120    /// [`Skip`]: eyeball_im_util::vector::Skip
121    /// [`TimelineSubscriber`]: super::TimelineSubscriber
122    #[derive(Clone, Debug)]
123    pub struct SkipCount {
124        count: SharedObservable<usize>,
125    }
126
127    impl SkipCount {
128        /// Create a [`SkipCount`] with a default `count` value set to 0.
129        pub fn new() -> Self {
130            Self { count: SharedObservable::new(0) }
131        }
132
133        /// Compute the `count` value for [the `Skip` higher-order
134        /// stream][`Skip`].
135        ///
136        /// This is useful when new items are inserted, removed and so on.
137        ///
138        /// [`Skip`]: eyeball_im_util::vector::Skip
139        pub fn compute_next(
140            &self,
141            previous_number_of_items: usize,
142            next_number_of_items: usize,
143        ) -> usize {
144            let current_count = self.count.get();
145
146            // Initial states: no items are present.
147            if previous_number_of_items == 0 {
148                // Adjust the count to provide a maximum number of initial items. We want to
149                // skip the first items until we get a certain number of items to display.
150                //
151                // | `next_number_of_items` | `MAX…` | output | will display |
152                // |------------------------|--------|--------|--------------|
153                // | 60                     | 20     | 40     | 20 items     |
154                // | 10                     | 20     | 0      | 10 items     |
155                // | 0                      | 20     | 0      | 0 item       |
156                //
157                next_number_of_items.saturating_sub(MAXIMUM_NUMBER_OF_INITIAL_ITEMS)
158            }
159            // Not the initial state: there are items.
160            else {
161                // There are less items than before. Shift to the left `count` by the difference
162                // between `previous_number_of_items` and `next_number_of_items` to keep the
163                // same number of items in the stream as much as possible.
164                //
165                // This is not a backwards pagination, it cannot “go below 0”, however this is
166                // necessary to handle the case where the timeline is cleared and
167                // the number of items becomes 0 for example.
168                if next_number_of_items < previous_number_of_items {
169                    current_count.saturating_sub(previous_number_of_items - next_number_of_items)
170                }
171                // Return `current_count` with no modification, we don't want to adjust the
172                // count, we want to see all initial items and new items.
173                else {
174                    current_count
175                }
176            }
177        }
178
179        /// Compute the `count` value for [the `Skip` higher-order
180        /// stream][`Skip`] when a backwards pagination is happening.
181        ///
182        /// It returns the new value for `count` in addition to
183        /// `Some(number_of_items)` to fulfill the page up to `page_size`,
184        /// `None` otherwise. For example, assuming a `page_size` of 15,
185        /// if the `count` moves from 10 to 0, then 10 new items will
186        /// appear in the stream, but 5 are missing because they aren't
187        /// present in the stream: the stream has reached its beginning:
188        /// `Some(5)` will be returned. This is useful
189        /// for the pagination mechanism to fill the timeline with more items,
190        /// either from a storage, or from the network.
191        ///
192        /// [`Skip`]: eyeball_im_util::vector::Skip
193        pub fn compute_next_when_paginating_backwards(
194            &self,
195            page_size: usize,
196        ) -> (usize, Option<usize>) {
197            let current_count = self.count.get();
198
199            // We skip the values from the start of the timeline; paginating backwards means
200            // we have to reduce the count until reaching 0.
201            //
202            // | `current_count` | `page_size` | output         |
203            // |-----------------|-------------|----------------|
204            // | 50              | 20          | (30, None)     |
205            // | 30              | 20          | (10, None)     |
206            // | 10              | 20          | (0, Some(10))  |
207            // | 0               | 20          | (0, Some(20))  |
208            //                                    ^  ^^^^^^^^
209            //                                    |  |
210            //                                    |  it needs 20 items to fulfill the
211            //                                    |  page size
212            //                                    count becomes 0
213            //
214            if current_count >= page_size {
215                (current_count - page_size, None)
216            } else {
217                (0, Some(page_size - current_count))
218            }
219        }
220
221        /// Compute the `count` value for [the `Skip` higher-order
222        /// stream][`Skip`] when a forwards pagination is happening.
223        ///
224        /// The `page_size` is present to mimic the
225        /// [`compute_count_when_paginating_backwards`] function but it is
226        /// actually useless for the current implementation.
227        ///
228        /// [`Skip`]: eyeball_im_util::vector::Skip
229        #[allow(unused)] // this is not used yet because only a live timeline is using it, but as soon as
230                         // other kind of timelines will use it, we would need it, it's better to have
231                         // this in case of; everything is tested, the logic is made more robust.
232        pub fn compute_next_when_paginating_forwards(&self, _page_size: usize) -> usize {
233            // Nothing to do, the count remains unchanged as we skip the first values, not
234            // the last values; paginating forwards will add items at the end, not at the
235            // start of the timeline.
236            self.count.get()
237        }
238
239        /// Get the current count value.
240        pub fn get(&self) -> usize {
241            self.count.get()
242        }
243
244        /// Subscribe to updates of the count value.
245        pub fn subscribe(&self) -> Subscriber<usize> {
246            self.count.subscribe()
247        }
248
249        /// Update the skip count if and only if the timeline has a live focus
250        /// ([`TimelineFocusKind::Live`]).
251        pub fn update(&self, count: usize, focus_kind: &TimelineFocusKind) {
252            if matches!(focus_kind, TimelineFocusKind::Live) {
253                self.count.set_if_not_eq(count);
254            }
255        }
256    }
257
258    #[cfg(test)]
259    mod tests {
260        use super::SkipCount;
261
262        #[test]
263        fn test_compute_count_from_underflowing_initial_states() {
264            let skip_count = SkipCount::new();
265
266            // Initial state with too few new items. None is skipped.
267            let previous_number_of_items = 0;
268            let next_number_of_items = previous_number_of_items + 10;
269            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
270            assert_eq!(count, 0);
271            skip_count.count.set(count);
272
273            // Add 5 new items. The count stays at 0 because we don't want to skip the
274            // previous items.
275            let previous_number_of_items = next_number_of_items;
276            let next_number_of_items = previous_number_of_items + 5;
277            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
278            assert_eq!(count, 0);
279            skip_count.count.set(count);
280
281            // Add 20 new items. The count stays at 0 because we don't want to
282            // skip the previous items.
283            let previous_number_of_items = next_number_of_items;
284            let next_number_of_items = previous_number_of_items + 20;
285            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
286            assert_eq!(count, 0);
287            skip_count.count.set(count);
288
289            // Remove a certain number of items. The count stays at 0 because it was
290            // previously 0, no items are skipped, nothing to adjust.
291            let previous_number_of_items = next_number_of_items;
292            let next_number_of_items = previous_number_of_items - 4;
293            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
294            assert_eq!(count, 0);
295            skip_count.count.set(count);
296
297            // Remove all items. The count goes to 0 (regardless it was 0 before).
298            let previous_number_of_items = next_number_of_items;
299            let next_number_of_items = 0;
300            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
301            assert_eq!(count, 0);
302        }
303
304        #[test]
305        fn test_compute_count_from_overflowing_initial_states() {
306            let skip_count = SkipCount::new();
307
308            // Initial state with too much new items. Some are skipped.
309            let previous_number_of_items = 0;
310            let next_number_of_items = previous_number_of_items + 30;
311            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
312            assert_eq!(count, 10);
313            skip_count.count.set(count);
314
315            // Add 5 new items. The count stays at 10 because we don't want to skip the
316            // previous items.
317            let previous_number_of_items = next_number_of_items;
318            let next_number_of_items = previous_number_of_items + 5;
319            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
320            assert_eq!(count, 10);
321            skip_count.count.set(count);
322
323            // Add 20 new items. The count stays at 10 because we don't want to
324            // skip the previous items.
325            let previous_number_of_items = next_number_of_items;
326            let next_number_of_items = previous_number_of_items + 20;
327            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
328            assert_eq!(count, 10);
329            skip_count.count.set(count);
330
331            // Remove a certain number of items. The count is reduced by 5 so that the same
332            // number of items are presented.
333            let previous_number_of_items = next_number_of_items;
334            let next_number_of_items = previous_number_of_items - 4;
335            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
336            assert_eq!(count, 6);
337            skip_count.count.set(count);
338
339            // Remove all items. The count goes to 0 (regardless it was 6 before).
340            let previous_number_of_items = next_number_of_items;
341            let next_number_of_items = 0;
342            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
343            assert_eq!(count, 0);
344        }
345
346        #[test]
347        fn test_compute_count_when_paginating_backwards_from_underflowing_initial_states() {
348            let skip_count = SkipCount::new();
349
350            // Initial state with too few new items. None is skipped.
351            let previous_number_of_items = 0;
352            let next_number_of_items = previous_number_of_items + 10;
353            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
354            assert_eq!(count, 0);
355            skip_count.count.set(count);
356
357            // Add 30 new items. The count stays at 0 because we don't want to skip the
358            // previous items.
359            let previous_number_of_items = next_number_of_items;
360            let next_number_of_items = previous_number_of_items + 30;
361            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
362            assert_eq!(count, 0);
363            skip_count.count.set(count);
364
365            let page_size = 20;
366
367            // Paginate backwards.
368            let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
369            assert_eq!(count, 0);
370            assert_eq!(needs, Some(20));
371        }
372
373        #[test]
374        fn test_compute_count_when_paginating_backwards_from_overflowing_initial_states() {
375            let skip_count = SkipCount::new();
376
377            // Initial state with too much new items. Some are skipped.
378            let previous_number_of_items = 0;
379            let next_number_of_items = previous_number_of_items + 50;
380            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
381            assert_eq!(count, 30);
382            skip_count.count.set(count);
383
384            // Add 30 new items. The count stays at 30 because we don't want to
385            // skip the previous items.
386            let previous_number_of_items = next_number_of_items;
387            let next_number_of_items = previous_number_of_items + 30;
388            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
389            assert_eq!(count, 30);
390            skip_count.count.set(count);
391
392            let page_size = 20;
393
394            // Paginate backwards. The count shifts by `page_size`, and the page is full.
395            let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
396            assert_eq!(count, 10);
397            assert_eq!(needs, None);
398            skip_count.count.set(count);
399
400            // Paginate backwards. The count shifts by `page_size` but reaches 0 before the
401            // page becomes full. It needs 10 more items to fulfill the page.
402            let (count, needs) = skip_count.compute_next_when_paginating_backwards(page_size);
403            assert_eq!(count, 0);
404            assert_eq!(needs, Some(10));
405        }
406
407        #[test]
408        fn test_compute_count_when_paginating_forwards_from_underflowing_initial_states() {
409            let skip_count = SkipCount::new();
410
411            // Initial state with too few new items. None is skipped.
412            let previous_number_of_items = 0;
413            let next_number_of_items = previous_number_of_items + 10;
414            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
415            assert_eq!(count, 0);
416            skip_count.count.set(count);
417
418            // Add 30 new items. The count stays at 0 because we don't want to skip the
419            // previous items.
420            let previous_number_of_items = next_number_of_items;
421            let next_number_of_items = previous_number_of_items + 30;
422            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
423            assert_eq!(count, 0);
424            skip_count.count.set(count);
425
426            let page_size = 20;
427
428            // Paginate forwards. The count remains unchanged.
429            let count = skip_count.compute_next_when_paginating_forwards(page_size);
430            assert_eq!(count, 0);
431        }
432
433        #[test]
434        fn test_compute_count_when_paginating_forwards_from_overflowing_initial_states() {
435            let skip_count = SkipCount::new();
436
437            // Initial state with too much new items. Some are skipped.
438            let previous_number_of_items = 0;
439            let next_number_of_items = previous_number_of_items + 50;
440            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
441            assert_eq!(count, 30);
442            skip_count.count.set(count);
443
444            // Add 30 new items. The count stays at 30 because we don't want to
445            // skip the previous items.
446            let previous_number_of_items = next_number_of_items;
447            let next_number_of_items = previous_number_of_items + 30;
448            let count = skip_count.compute_next(previous_number_of_items, next_number_of_items);
449            assert_eq!(count, 30);
450            skip_count.count.set(count);
451
452            let page_size = 20;
453
454            // Paginate forwards. The count remains unchanged.
455            let count = skip_count.compute_next_when_paginating_forwards(page_size);
456            assert_eq!(count, 30);
457        }
458    }
459}