Skip to main content

matrix_sdk/paginators/
thread.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Paginator facilities for a thread.
16//!
17//! See also the documentation for the [`ThreadedEventsLoader`] struct.
18
19use std::{
20    fmt::Formatter,
21    future::Future,
22    sync::{Arc, Mutex},
23};
24
25use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm, deserialized_responses::TimelineEvent};
26use ruma::{EventId, OwnedEventId, UInt, api::Direction};
27
28use crate::{
29    Error, Room,
30    paginators::{PaginationResult, PaginationToken, PaginationTokens, PaginatorError},
31    room::{IncludeRelations, Relations, RelationsOptions},
32};
33
34/// A paginable thread interface, useful for testing purposes.
35pub trait PaginableThread: SendOutsideWasm + SyncOutsideWasm {
36    /// Runs a /relations query for the given thread, with the given options.
37    fn relations(
38        &self,
39        thread_root: OwnedEventId,
40        opts: RelationsOptions,
41    ) -> impl Future<Output = Result<Relations, Error>> + SendOutsideWasm;
42
43    /// Load an event, given its event ID.
44    fn load_event(
45        &self,
46        event_id: &OwnedEventId,
47    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
48}
49
50impl PaginableThread for Room {
51    async fn relations(
52        &self,
53        thread_root: OwnedEventId,
54        opts: RelationsOptions,
55    ) -> Result<Relations, Error> {
56        self.relations(thread_root, opts).await
57    }
58
59    async fn load_event(&self, event_id: &OwnedEventId) -> Result<TimelineEvent, Error> {
60        self.event(event_id, None).await
61    }
62}
63
64/// A paginator for a thread of events.
65pub struct ThreadedEventsLoader<P: PaginableThread> {
66    /// Room provider for the paginated thread.
67    room: P,
68
69    /// The thread root event ID (the event that started the thread).
70    root_event_id: OwnedEventId,
71
72    /// The current pagination tokens, which are used to keep track of the
73    /// pagination state.
74    tokens: Mutex<PaginationTokens>,
75}
76
77impl<P: PaginableThread> ThreadedEventsLoader<P> {
78    /// Create a new [`ThreadedEventsLoader`], given a room implementation.
79    pub fn new(room: P, root_event_id: OwnedEventId, tokens: PaginationTokens) -> Self {
80        Self { room, root_event_id, tokens: Mutex::new(tokens) }
81    }
82
83    /// Run a single pagination backwards, returning the next set of events and
84    /// information whether we've reached the start of the thread.
85    ///
86    /// Note: when the thread start is reached, the root event *will* be
87    /// included in the result.
88    pub async fn paginate_backwards(
89        &self,
90        num_events: UInt,
91    ) -> Result<PaginationResult, PaginatorError> {
92        let token = {
93            let token = &self.tokens.lock().unwrap().previous;
94
95            match token {
96                PaginationToken::None => None,
97                PaginationToken::HasMore(token) => Some(token.clone()),
98                PaginationToken::HitEnd => {
99                    return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true });
100                }
101            }
102        };
103
104        let options = RelationsOptions {
105            from: token,
106            dir: Direction::Backward,
107            limit: Some(num_events),
108            include_relations: IncludeRelations::AllRelations,
109            recurse: true,
110        };
111
112        let mut result = self
113            .room
114            .relations(self.root_event_id.to_owned(), options)
115            .await
116            .map_err(|error| PaginatorError::SdkError(Arc::new(error)))?;
117
118        let hit_end_of_timeline = result.next_batch_token.is_none();
119
120        // Update the stored tokens
121        {
122            let mut tokens = self.tokens.lock().unwrap();
123
124            tokens.previous = match result.next_batch_token {
125                Some(val) => PaginationToken::HasMore(val),
126                None => PaginationToken::HitEnd,
127            };
128        }
129
130        // Finally insert the thread root if at the end of the timeline going backwards
131        if hit_end_of_timeline {
132            let root_event = self
133                .room
134                .load_event(&self.root_event_id)
135                .await
136                .map_err(|err| PaginatorError::SdkError(Arc::new(err)))?;
137
138            result.chunk.push(root_event);
139        }
140
141        Ok(PaginationResult { events: result.chunk, hit_end_of_timeline })
142    }
143
144    /// Run a single pagination forwards, returning the next set of events and
145    /// information whether we've reached the end of the thread.
146    pub async fn paginate_forwards(
147        &self,
148        num_events: UInt,
149    ) -> Result<PaginationResult, PaginatorError> {
150        let token = {
151            let token = &self.tokens.lock().unwrap().next;
152
153            match token {
154                PaginationToken::None => None,
155                PaginationToken::HasMore(token) => Some(token.clone()),
156                PaginationToken::HitEnd => {
157                    return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true });
158                }
159            }
160        };
161
162        let options = RelationsOptions {
163            from: token,
164            dir: Direction::Forward,
165            limit: Some(num_events),
166            include_relations: IncludeRelations::AllRelations,
167            recurse: true,
168        };
169
170        let result = self
171            .room
172            .relations(self.root_event_id.to_owned(), options)
173            .await
174            .map_err(|error| PaginatorError::SdkError(Arc::new(error)))?;
175
176        let hit_end_of_timeline = result.next_batch_token.is_none();
177
178        // Update the stored tokens
179        {
180            let mut tokens = self.tokens.lock().unwrap();
181
182            tokens.next = match result.next_batch_token {
183                Some(val) => PaginationToken::HasMore(val),
184                None => PaginationToken::HitEnd,
185            };
186        }
187
188        Ok(PaginationResult { events: result.chunk, hit_end_of_timeline })
189    }
190
191    /// Returns the root [`EventId`] for the thread.
192    pub fn thread_root_event_id(&self) -> &EventId {
193        &self.root_event_id
194    }
195}
196
197#[cfg(not(tarpaulin_include))]
198impl<P: PaginableThread> std::fmt::Debug for ThreadedEventsLoader<P> {
199    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200        f.debug_struct("ThreadedEventsLoader").finish()
201    }
202}