matrix_sdk/event_cache/
automatic_pagination.rs1use std::{
16 collections::HashMap,
17 sync::{Arc, Weak},
18};
19
20use matrix_sdk_base::task_monitor::{BackgroundTaskHandle, TaskMonitor};
21use ruma::{OwnedRoomId, RoomId};
22use tokio::sync::mpsc;
23use tracing::{info, instrument, trace, warn};
24
25use crate::event_cache::EventCacheInner;
26
27#[derive(Clone)]
31pub struct AutomaticPagination {
32 inner: Arc<AutomaticPaginationInner>,
33}
34
35#[cfg(not(tarpaulin_include))]
36impl std::fmt::Debug for AutomaticPagination {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("AutomaticPagination").finish_non_exhaustive()
39 }
40}
41
42impl AutomaticPagination {
43 pub(super) fn new(event_cache: Weak<EventCacheInner>, task_monitor: &TaskMonitor) -> Self {
46 let (sender, receiver) = mpsc::unbounded_channel();
47
48 let task = task_monitor.spawn_infinite_task(
49 "event_cache::automatic_paginations_task",
50 automatic_paginations_task(event_cache, receiver),
51 );
52
53 Self { inner: Arc::new(AutomaticPaginationInner { _task: task, sender }) }
54 }
55
56 pub fn run_once(&self, room_id: &RoomId) -> bool {
61 self.inner
66 .sender
67 .send(AutomaticPaginationRequest::PaginateRoomBackwards { room_id: room_id.to_owned() })
68 .is_ok()
69 }
70}
71
72struct AutomaticPaginationInner {
73 _task: BackgroundTaskHandle,
75
76 sender: mpsc::UnboundedSender<AutomaticPaginationRequest>,
82}
83
84#[derive(Clone, Debug)]
85enum AutomaticPaginationRequest {
86 PaginateRoomBackwards { room_id: OwnedRoomId },
87}
88
89#[instrument(skip_all)]
92async fn automatic_paginations_task(
93 inner: Weak<EventCacheInner>,
94 mut receiver: mpsc::UnboundedReceiver<AutomaticPaginationRequest>,
95) {
96 trace!("Spawning the automatic pagination task");
97
98 let mut room_pagination_credits = HashMap::new();
99
100 while let Some(request) = receiver.recv().await {
101 match request {
102 AutomaticPaginationRequest::PaginateRoomBackwards { room_id } => {
103 let Some(inner) = inner.upgrade() else {
104 break;
106 };
107
108 let config = *inner.config.read().unwrap();
109
110 let credits = room_pagination_credits
111 .entry(room_id.clone())
112 .or_insert(config.room_pagination_per_room_credit);
113
114 if *credits == 0 {
115 trace!(for_room = %room_id, "No more credits to paginate this room in the background, skipping");
116 continue;
117 }
118
119 let pagination = match inner.all_caches_for_room(&room_id).await {
120 Ok(caches) => caches.room.pagination(),
121 Err(err) => {
122 warn!(for_room = %room_id, "Failed to get the `Caches`: {err}");
123 continue;
124 }
125 };
126
127 trace!(for_room = %room_id, "automatic backpagination triggered");
128
129 match pagination.run_backwards_once(config.room_pagination_batch_size).await {
130 Ok(outcome) => {
131 if !outcome.reached_start || !outcome.events.is_empty() {
134 *credits -= 1;
135 }
136 }
137
138 Err(err) => {
139 warn!(for_room = %room_id, "Failed to run background pagination: {err}");
140 }
143 }
144 }
145 }
146 }
147
148 info!("Closing the automatic pagination task because receiver closed");
150}
151
152#[cfg(all(test, not(target_arch = "wasm32")))]
154mod tests {
155 use std::time::Duration;
156
157 use assert_matches::assert_matches;
158 use eyeball_im::VectorDiff;
159 use matrix_sdk_base::sleep::sleep;
160 use matrix_sdk_test::{BOB, JoinedRoomBuilder, async_test, event_factory::EventFactory};
161 use ruma::{event_id, room_id};
162
163 use crate::{
164 assert_let_timeout,
165 event_cache::{EventsOrigin, RoomEventCacheUpdate},
166 test_utils::mocks::{MatrixMockServer, RoomMessagesResponseTemplate},
167 };
168
169 #[async_test]
171 async fn test_background_room_paginations() {
172 let server = MatrixMockServer::new().await;
173 let client = server.client_builder().build().await;
174
175 let event_cache = client.event_cache();
176 event_cache.config_mut().experimental_auto_backpagination = true;
177 event_cache.subscribe().unwrap();
178
179 let room_id = room_id!("!omelette:fromage.fr");
180 let f = EventFactory::new().room(room_id).sender(*BOB);
181
182 let room = server.sync_joined_room(&client, room_id).await;
183
184 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
185
186 let (room_events, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
188 assert!(room_events.is_empty());
189 assert!(room_cache_updates.is_empty());
190
191 server
193 .sync_room(
194 &client,
195 JoinedRoomBuilder::new(room_id)
196 .set_timeline_limited()
197 .set_timeline_prev_batch("prev_batch"),
198 )
199 .await;
200
201 {
202 assert_let_timeout!(
203 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
204 );
205 assert_eq!(update.diffs.len(), 1);
206 assert_matches!(update.diffs[0], VectorDiff::Clear);
207 assert_matches!(update.origin, EventsOrigin::Sync);
208 }
209
210 server
212 .mock_room_messages()
213 .ok(RoomMessagesResponseTemplate::default().events(vec![
214 f.text_msg("comté").event_id(event_id!("$2")),
215 f.text_msg("beaufort").event_id(event_id!("$1")),
216 ]))
217 .mock_once()
218 .mount()
219 .await;
220
221 let automatic_pagination_api = event_cache.automatic_pagination().unwrap();
223 assert!(automatic_pagination_api.run_once(room_id));
224
225 assert_let_timeout!(
227 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
228 );
229 assert_eq!(update.diffs.len(), 1);
230
231 assert_matches!(update.origin, EventsOrigin::Pagination);
232
233 let mut room_events = room_events.into();
234 for diff in update.diffs {
235 diff.apply(&mut room_events);
236 }
237
238 assert_eq!(room_events.len(), 2);
239 assert_eq!(room_events[0].event_id().unwrap(), event_id!("$1"));
240 assert_eq!(room_events[1].event_id().unwrap(), event_id!("$2"));
241
242 assert!(room_cache_updates.is_empty());
244 }
245
246 #[async_test]
248 async fn test_room_pagination_respects_credits_system() {
249 let server = MatrixMockServer::new().await;
250 let client = server.client_builder().build().await;
251
252 let event_cache = client.event_cache();
253 event_cache.config_mut().experimental_auto_backpagination = true;
254
255 event_cache.config_mut().room_pagination_per_room_credit = 1;
258 event_cache.subscribe().unwrap();
259
260 let room_id = room_id!("!omelette:fromage.fr");
261 let f = EventFactory::new().room(room_id).sender(*BOB);
262
263 let room = server.sync_joined_room(&client, room_id).await;
264 let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
265
266 let (room_events, mut room_cache_updates) = room_event_cache.subscribe().await.unwrap();
268 assert!(room_events.is_empty());
269 assert!(room_cache_updates.is_empty());
270
271 server
272 .sync_room(
273 &client,
274 JoinedRoomBuilder::new(room_id)
275 .set_timeline_limited()
276 .set_timeline_prev_batch("prev_batch"),
277 )
278 .await;
279
280 {
281 assert_let_timeout!(
282 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
283 );
284 assert_eq!(update.diffs.len(), 1);
285 assert_matches!(update.diffs[0], VectorDiff::Clear);
286 assert_matches!(update.origin, EventsOrigin::Sync);
287 }
288
289 server
291 .mock_room_messages()
292 .match_from("prev_batch")
293 .ok(RoomMessagesResponseTemplate::default()
294 .events(vec![
295 f.text_msg("comté").event_id(event_id!("$2")),
296 f.text_msg("beaufort").event_id(event_id!("$1")),
297 ])
298 .end_token("prev_batch_2"))
299 .mock_once()
300 .mount()
301 .await;
302
303 let automatic_pagination_api = event_cache.automatic_pagination().unwrap();
305 assert!(automatic_pagination_api.run_once(room_id));
306
307 assert_let_timeout!(
309 Ok(RoomEventCacheUpdate::UpdateTimelineEvents(update)) = room_cache_updates.recv()
310 );
311 assert_eq!(update.diffs.len(), 1);
312
313 assert_matches!(update.origin, EventsOrigin::Pagination);
314
315 let mut room_events = room_events.into();
316 for diff in update.diffs {
317 diff.apply(&mut room_events);
318 }
319
320 assert_eq!(room_events.len(), 2);
321 assert_eq!(room_events[0].event_id().unwrap(), event_id!("$1"));
322 assert_eq!(room_events[1].event_id().unwrap(), event_id!("$2"));
323
324 assert!(room_cache_updates.is_empty());
326
327 assert!(automatic_pagination_api.run_once(room_id));
329
330 sleep(Duration::from_millis(300)).await;
331 assert!(room_cache_updates.is_empty());
334
335 server
337 .mock_room_messages()
338 .match_from("prev_batch_2")
339 .ok(RoomMessagesResponseTemplate::default())
340 .mock_once()
341 .mount()
342 .await;
343
344 let outcome = room_event_cache.pagination().run_backwards_once(30).await.unwrap();
345 assert!(outcome.reached_start);
346 }
347}