matrix_sdk/paginators/
thread.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Paginator facilities for a thread.
16//!
17//! See also the documentation for the [`ThreadedEventsLoader`] struct.
18
19use std::{fmt::Formatter, future::Future, sync::Mutex};
20
21use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm, deserialized_responses::TimelineEvent};
22use ruma::{EventId, OwnedEventId, UInt, api::Direction};
23
24use crate::{
25    Error, Room,
26    paginators::{PaginationResult, PaginationToken, PaginationTokens, PaginatorError},
27    room::{IncludeRelations, Relations, RelationsOptions},
28};
29
30/// A paginable thread interface, useful for testing purposes.
31pub trait PaginableThread: SendOutsideWasm + SyncOutsideWasm {
32    /// Runs a /relations query for the given thread, with the given options.
33    fn relations(
34        &self,
35        thread_root: OwnedEventId,
36        opts: RelationsOptions,
37    ) -> impl Future<Output = Result<Relations, Error>> + SendOutsideWasm;
38
39    /// Load an event, given its event ID.
40    fn load_event(
41        &self,
42        event_id: &OwnedEventId,
43    ) -> impl Future<Output = Result<TimelineEvent, Error>> + SendOutsideWasm;
44}
45
46impl PaginableThread for Room {
47    async fn relations(
48        &self,
49        thread_root: OwnedEventId,
50        opts: RelationsOptions,
51    ) -> Result<Relations, Error> {
52        self.relations(thread_root, opts).await
53    }
54
55    async fn load_event(&self, event_id: &OwnedEventId) -> Result<TimelineEvent, Error> {
56        self.event(event_id, None).await
57    }
58}
59
60/// A paginator for a thread of events.
61pub struct ThreadedEventsLoader<P: PaginableThread> {
62    /// Room provider for the paginated thread.
63    room: P,
64
65    /// The thread root event ID (the event that started the thread).
66    root_event_id: OwnedEventId,
67
68    /// The current pagination tokens, which are used to keep track of the
69    /// pagination state.
70    tokens: Mutex<PaginationTokens>,
71}
72
73impl<P: PaginableThread> ThreadedEventsLoader<P> {
74    /// Create a new [`ThreadedEventsLoader`], given a room implementation.
75    pub fn new(room: P, root_event_id: OwnedEventId, tokens: PaginationTokens) -> Self {
76        Self { room, root_event_id, tokens: Mutex::new(tokens) }
77    }
78
79    /// Run a single pagination backwards, returning the next set of events and
80    /// information whether we've reached the start of the thread.
81    ///
82    /// Note: when the thread start is reached, the root event *will* be
83    /// included in the result.
84    pub async fn paginate_backwards(
85        &self,
86        num_events: UInt,
87    ) -> Result<PaginationResult, PaginatorError> {
88        let token = {
89            let token = &self.tokens.lock().unwrap().previous;
90
91            match token {
92                PaginationToken::None => None,
93                PaginationToken::HasMore(token) => Some(token.clone()),
94                PaginationToken::HitEnd => {
95                    return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true });
96                }
97            }
98        };
99
100        let options = RelationsOptions {
101            from: token,
102            dir: Direction::Backward,
103            limit: Some(num_events),
104            include_relations: IncludeRelations::AllRelations,
105            recurse: true,
106        };
107
108        let mut result = self
109            .room
110            .relations(self.root_event_id.to_owned(), options)
111            .await
112            .map_err(|error| PaginatorError::SdkError(Box::new(error)))?;
113
114        let hit_end_of_timeline = result.next_batch_token.is_none();
115
116        // Update the stored tokens
117        {
118            let mut tokens = self.tokens.lock().unwrap();
119
120            tokens.previous = match result.next_batch_token {
121                Some(val) => PaginationToken::HasMore(val),
122                None => PaginationToken::HitEnd,
123            };
124        }
125
126        // Finally insert the thread root if at the end of the timeline going backwards
127        if hit_end_of_timeline {
128            let root_event = self
129                .room
130                .load_event(&self.root_event_id)
131                .await
132                .map_err(|err| PaginatorError::SdkError(Box::new(err)))?;
133
134            result.chunk.push(root_event);
135        }
136
137        Ok(PaginationResult { events: result.chunk, hit_end_of_timeline })
138    }
139
140    /// Run a single pagination forwards, returning the next set of events and
141    /// information whether we've reached the end of the thread.
142    pub async fn paginate_forwards(
143        &self,
144        num_events: UInt,
145    ) -> Result<PaginationResult, PaginatorError> {
146        let token = {
147            let token = &self.tokens.lock().unwrap().next;
148
149            match token {
150                PaginationToken::None => None,
151                PaginationToken::HasMore(token) => Some(token.clone()),
152                PaginationToken::HitEnd => {
153                    return Ok(PaginationResult { events: Vec::new(), hit_end_of_timeline: true });
154                }
155            }
156        };
157
158        let options = RelationsOptions {
159            from: token,
160            dir: Direction::Forward,
161            limit: Some(num_events),
162            include_relations: IncludeRelations::AllRelations,
163            recurse: true,
164        };
165
166        let result = self
167            .room
168            .relations(self.root_event_id.to_owned(), options)
169            .await
170            .map_err(|error| PaginatorError::SdkError(Box::new(error)))?;
171
172        let hit_end_of_timeline = result.next_batch_token.is_none();
173
174        // Update the stored tokens
175        {
176            let mut tokens = self.tokens.lock().unwrap();
177
178            tokens.next = match result.next_batch_token {
179                Some(val) => PaginationToken::HasMore(val),
180                None => PaginationToken::HitEnd,
181            };
182        }
183
184        Ok(PaginationResult { events: result.chunk, hit_end_of_timeline })
185    }
186
187    /// Returns the root [`EventId`] for the thread.
188    pub fn thread_root_event_id(&self) -> &EventId {
189        &self.root_event_id
190    }
191}
192
193#[cfg(not(tarpaulin_include))]
194impl<P: PaginableThread> std::fmt::Debug for ThreadedEventsLoader<P> {
195    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
196        f.debug_struct("ThreadedEventsLoader").finish()
197    }
198}