Skip to main content

matrix_sdk/event_cache/
automatic_pagination.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::HashMap,
17    sync::{Arc, Weak},
18};
19
20use matrix_sdk_base::task_monitor::{BackgroundTaskHandle, TaskMonitor};
21use ruma::{OwnedRoomId, RoomId};
22use tokio::sync::mpsc;
23use tracing::{info, instrument, trace, warn};
24
25use crate::event_cache::EventCacheInner;
26
27/// State for running paginations in background tasks.
28///
29/// Shallow type, can be cloned cheaply.
30#[derive(Clone)]
31pub struct AutomaticPagination {
32    inner: Arc<AutomaticPaginationInner>,
33}
34
35#[cfg(not(tarpaulin_include))]
36impl std::fmt::Debug for AutomaticPagination {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("AutomaticPagination").finish_non_exhaustive()
39    }
40}
41
42impl AutomaticPagination {
43    /// Create a new [`AutomaticPagination`], spawning the background task to
44    /// handle incoming requests to run background paginations.
45    pub(super) fn new(event_cache: Weak<EventCacheInner>, task_monitor: &TaskMonitor) -> Self {
46        let (sender, receiver) = mpsc::unbounded_channel();
47
48        let task = task_monitor.spawn_infinite_task(
49            "event_cache::automatic_paginations_task",
50            automatic_paginations_task(event_cache, receiver),
51        );
52
53        Self { inner: Arc::new(AutomaticPaginationInner { _task: task, sender }) }
54    }
55
56    /// Request a single back-pagination to happen in the background for the
57    /// given room.
58    ///
59    /// Returns false, if the request couldn't be sent.
60    pub fn run_once(&self, room_id: &RoomId) -> bool {
61        // We don't want to do anything with the error type, as it only includes the
62        // request we just created, and not much more; there's no guarantee that
63        // retrying sending it would succeed, so let it drop, and report the
64        // result as a boolean, for informative purposes.
65        self.inner
66            .sender
67            .send(AutomaticPaginationRequest::PaginateRoomBackwards { room_id: room_id.to_owned() })
68            .is_ok()
69    }
70}
71
72struct AutomaticPaginationInner {
73    /// The task used to handle automatic pagination requests.
74    _task: BackgroundTaskHandle,
75
76    /// A sender for automatic pagination requests, that is shared with every
77    /// room.
78    ///
79    /// It's a `OnceLock` because its initialization is deferred to
80    /// [`EventCache::subscribe`].
81    sender: mpsc::UnboundedSender<AutomaticPaginationRequest>,
82}
83
84#[derive(Clone, Debug)]
85enum AutomaticPaginationRequest {
86    PaginateRoomBackwards { room_id: OwnedRoomId },
87}
88
89/// Listen to background automatic pagination requests, and execute them in
90/// real-time.
91#[instrument(skip_all)]
92async fn automatic_paginations_task(
93    inner: Weak<EventCacheInner>,
94    mut receiver: mpsc::UnboundedReceiver<AutomaticPaginationRequest>,
95) {
96    trace!("Spawning the automatic pagination task");
97
98    let mut room_pagination_credits = HashMap::new();
99
100    while let Some(request) = receiver.recv().await {
101        match request {
102            AutomaticPaginationRequest::PaginateRoomBackwards { room_id } => {
103                let Some(inner) = inner.upgrade() else {
104                    // The event cache has been dropped, exit the task.
105                    break;
106                };
107
108                let config = *inner.config.read().unwrap();
109
110                let credits = room_pagination_credits
111                    .entry(room_id.clone())
112                    .or_insert(config.room_pagination_per_room_credit);
113
114                if *credits == 0 {
115                    trace!(for_room = %room_id, "No more credits to paginate this room in the background, skipping");
116                    continue;
117                }
118
119                let pagination = match inner.all_caches_for_room(&room_id).await {
120                    Ok(caches) => caches.room.pagination(),
121                    Err(err) => {
122                        warn!(for_room = %room_id, "Failed to get the `Caches`: {err}");
123                        continue;
124                    }
125                };
126
127                trace!(for_room = %room_id, "automatic backpagination triggered");
128
129                match pagination.run_backwards_once(config.room_pagination_batch_size).await {
130                    Ok(outcome) => {
131                        // Pagination requests must be idempotent, so we only decrement credits if
132                        // we actually paginated something new.
133                        if !outcome.reached_start || !outcome.events.is_empty() {
134                            *credits -= 1;
135                        }
136                    }
137
138                    Err(err) => {
139                        warn!(for_room = %room_id, "Failed to run background pagination: {err}");
140                        // Don't decrement credits in this case, to allow a
141                        // retry later.
142                    }
143                }
144            }
145        }
146    }
147
148    // The sender has shut down, exit.
149    info!("Closing the automatic pagination task because receiver closed");
150}
151
152// MatrixMockServer et al. aren't available on wasm.
153#[cfg(all(test, not(target_arch = "wasm32")))]
154mod tests {
155    use std::time::Duration;
156
157    use assert_matches::assert_matches;
158    use eyeball_im::VectorDiff;
159    use matrix_sdk_base::sleep::sleep;
160    use matrix_sdk_test::{BOB, JoinedRoomBuilder, async_test, event_factory::EventFactory};
161    use ruma::{event_id, room_id};
162
163    use crate::{
164        assert_let_timeout,
165        event_cache::{EventsOrigin, RoomEventCacheUpdate},
166        test_utils::mocks::{MatrixMockServer, RoomMessagesResponseTemplate},
167    };
168
169    /// Test that we can send automatic pagination requests.
170    #[async_test]
171    async fn test_background_room_paginations() {
172        let server = MatrixMockServer::new().await;
173        let client = server.client_builder().build().await;
174
175        let event_cache = client.event_cache();
176        event_cache.config_mut().experimental_auto_backpagination = true;
177        event_cache.subscribe().unwrap();
178
179        let room_id = room_id!("!omelette:fromage.fr");
180        let f = EventFactory::new().room(room_id).sender(*BOB);
181
182        let room = server.sync_joined_room(&client, room_id).await;
183
184        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
185
186        // Starting with an empty, inactive room,
187        let (room_events, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
188        assert!(room_events.is_empty());
189        assert!(room_cache_updates.is_empty());
190
191        // We get a gappy sync (so as to have a previous-batch token),
192        server
193            .sync_room(
194                &client,
195                JoinedRoomBuilder::new(room_id)
196                    .set_timeline_limited()
197                    .set_timeline_prev_batch("prev_batch"),
198            )
199            .await;
200
201        {
202            assert_let_timeout!(
203                Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
204            );
205            assert_eq!(update.diffs.len(), 1);
206            assert_matches!(update.diffs[0], VectorDiff::Clear);
207            assert_matches!(update.origin, EventsOrigin::Sync);
208        }
209
210        // Set up the mock for /messages,
211        server
212            .mock_room_messages()
213            .ok(RoomMessagesResponseTemplate::default().events(vec![
214                f.text_msg("comté").event_id(event_id!("$2")),
215                f.text_msg("beaufort").event_id(event_id!("$1")),
216            ]))
217            .mock_once()
218            .mount()
219            .await;
220
221        // Send a request for a background pagination,
222        let automatic_pagination_api = event_cache.automatic_pagination().unwrap();
223        assert!(automatic_pagination_api.run_once(room_id));
224
225        // The room pagination happens in the background.
226        assert_let_timeout!(
227            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
228        );
229        assert_eq!(update.diffs.len(), 1);
230
231        assert_matches!(update.origin, EventsOrigin::Pagination);
232
233        let mut room_events = room_events.into();
234        for diff in update.diffs {
235            diff.apply(&mut room_events);
236        }
237
238        assert_eq!(room_events.len(), 2);
239        assert_eq!(room_events[0].event_id().unwrap(), event_id!("$1"));
240        assert_eq!(room_events[1].event_id().unwrap(), event_id!("$2"));
241
242        // And there's no more updates.
243        assert!(room_cache_updates.is_empty());
244    }
245
246    /// Test that the credit system works.
247    #[async_test]
248    async fn test_room_pagination_respects_credits_system() {
249        let server = MatrixMockServer::new().await;
250        let client = server.client_builder().build().await;
251
252        let event_cache = client.event_cache();
253        event_cache.config_mut().experimental_auto_backpagination = true;
254
255        // Only allow 1 background pagination per room, to test that the credit system
256        // is properly taken into account.
257        event_cache.config_mut().room_pagination_per_room_credit = 1;
258        event_cache.subscribe().unwrap();
259
260        let room_id = room_id!("!omelette:fromage.fr");
261        let f = EventFactory::new().room(room_id).sender(*BOB);
262
263        let room = server.sync_joined_room(&client, room_id).await;
264        let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
265
266        // Starting with an empty, inactive room,
267        let (room_events, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
268        assert!(room_events.is_empty());
269        assert!(room_cache_updates.is_empty());
270
271        server
272            .sync_room(
273                &client,
274                JoinedRoomBuilder::new(room_id)
275                    .set_timeline_limited()
276                    .set_timeline_prev_batch("prev_batch"),
277            )
278            .await;
279
280        {
281            assert_let_timeout!(
282                Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
283            );
284            assert_eq!(update.diffs.len(), 1);
285            assert_matches!(update.diffs[0], VectorDiff::Clear);
286            assert_matches!(update.origin, EventsOrigin::Sync);
287        }
288
289        // Set up the mock for /messages, so that it returns another prev-batch token,
290        server
291            .mock_room_messages()
292            .match_from("prev_batch")
293            .ok(RoomMessagesResponseTemplate::default()
294                .events(vec![
295                    f.text_msg("comté").event_id(event_id!("$2")),
296                    f.text_msg("beaufort").event_id(event_id!("$1")),
297                ])
298                .end_token("prev_batch_2"))
299            .mock_once()
300            .mount()
301            .await;
302
303        // Send a request for a background pagination,
304        let automatic_pagination_api = event_cache.automatic_pagination().unwrap();
305        assert!(automatic_pagination_api.run_once(room_id));
306
307        // The room pagination happens in the background.
308        assert_let_timeout!(
309            Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
310        );
311        assert_eq!(update.diffs.len(), 1);
312
313        assert_matches!(update.origin, EventsOrigin::Pagination);
314
315        let mut room_events = room_events.into();
316        for diff in update.diffs {
317            diff.apply(&mut room_events);
318        }
319
320        assert_eq!(room_events.len(), 2);
321        assert_eq!(room_events[0].event_id().unwrap(), event_id!("$1"));
322        assert_eq!(room_events[1].event_id().unwrap(), event_id!("$2"));
323
324        // And there's no more updates yet.
325        assert!(room_cache_updates.is_empty());
326
327        // One can send another request to back-paginate…
328        assert!(automatic_pagination_api.run_once(room_id));
329
330        sleep(Duration::from_millis(300)).await;
331        // But it doesn't happen, because we don't have enough credits for automatic
332        // backpagination.
333        assert!(room_cache_updates.is_empty());
334
335        // We can still manually backpaginate with success, though.
336        server
337            .mock_room_messages()
338            .match_from("prev_batch_2")
339            .ok(RoomMessagesResponseTemplate::default())
340            .mock_once()
341            .mount()
342            .await;
343
344        let outcome = room_event_cache.pagination().run_backwards_once(30).await.unwrap();
345        assert!(outcome.reached_start);
346    }
347}