matrix_sdk/event_cache/caches/thread/
pagination.rs1use std::sync::Arc;
16
17use eyeball::SharedObservable;
18use eyeball_im::VectorDiff;
19use matrix_sdk_base::{
20 event_cache::{Event, Gap},
21 linked_chunk::ChunkContent,
22};
23use ruma::api::Direction;
24use tracing::trace;
25
26use super::{
27 super::super::{
28 EventCacheError, EventsOrigin, Result, TimelineVectorDiffs,
29 caches::pagination::{
30 BackPaginationOutcome, LoadMoreEventsBackwardsOutcome, PaginatedCache, Pagination,
31 },
32 },
33 ThreadEventCacheInner,
34};
35use crate::{
36 event_cache::caches::pagination::SharedPaginationStatus,
37 room::{IncludeRelations, RelationsOptions},
38};
39
40#[derive(Clone)]
43struct ThreadEventCacheWrapper {
44 cache: Arc<ThreadEventCacheInner>,
45
46 dummy_pagination_status: SharedObservable<SharedPaginationStatus>,
49}
50
51#[allow(missing_debug_implementations)]
53pub struct ThreadPagination(Pagination<ThreadEventCacheWrapper>);
54
55impl ThreadPagination {
56 pub(super) fn new(cache: Arc<ThreadEventCacheInner>) -> Self {
58 Self(Pagination::new(ThreadEventCacheWrapper {
59 cache,
60 dummy_pagination_status: SharedObservable::new(SharedPaginationStatus::Idle {
61 hit_timeline_start: false,
62 }),
63 }))
64 }
65
66 pub async fn run_backwards_until(
77 &self,
78 num_requested_events: u16,
79 ) -> Result<BackPaginationOutcome> {
80 self.0.run_backwards_until(num_requested_events).await
81 }
82
83 pub async fn run_backwards_once(&self, batch_size: u16) -> Result<BackPaginationOutcome> {
88 self.0.run_backwards_once(batch_size).await
89 }
90}
91
92impl PaginatedCache for ThreadEventCacheWrapper {
93 fn status(&self) -> &SharedObservable<SharedPaginationStatus> {
94 &self.dummy_pagination_status
95 }
96
97 async fn load_more_events_backwards(&self) -> Result<LoadMoreEventsBackwardsOutcome> {
98 let state = self.cache.state.read().await?;
99
100 if let Some(prev_token) = state.thread_linked_chunk().rgap().map(|gap| gap.token) {
103 trace!(%prev_token, "thread chunk has at least a gap");
104
105 return Ok(LoadMoreEventsBackwardsOutcome::Gap {
106 prev_token: Some(prev_token),
107 waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
108 });
109 }
110
111 if let Some((_pos, event)) = state.thread_linked_chunk().events().next() {
114 let first_event_id =
115 event.event_id().expect("a linked chunk only stores events with IDs");
116
117 if first_event_id == self.cache.thread_id {
118 trace!("thread chunk is fully loaded and non-empty: reached_start=true");
119
120 return Ok(LoadMoreEventsBackwardsOutcome::StartOfTimeline);
121 }
122 }
123
124 Ok(LoadMoreEventsBackwardsOutcome::Gap {
127 prev_token: None,
128 waited_for_initial_prev_token: state.waited_for_initial_prev_token(),
129 })
130 }
131
132 async fn mark_has_waited_for_initial_prev_token(&self) -> Result<()> {
133 *self.cache.state.write().await?.waited_for_initial_prev_token_mut() = true;
134
135 Ok(())
136 }
137
138 async fn wait_for_prev_token(&self) {
139 }
141
142 async fn paginate_backwards_with_network(
143 &self,
144 batch_size: u16,
145 prev_token: &Option<String>,
146 ) -> Result<Option<(Vec<Event>, Option<String>)>> {
147 let Some(room) = self.cache.weak_room.get() else {
148 return Ok(None);
150 };
151
152 let options = RelationsOptions {
153 from: prev_token.clone(),
154 dir: Direction::Backward,
155 limit: Some(batch_size.into()),
156 include_relations: IncludeRelations::AllRelations,
157 recurse: true,
158 };
159
160 let response = room
161 .relations(self.cache.thread_id.clone(), options)
162 .await
163 .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?;
164
165 Ok(Some((response.chunk, response.next_batch_token)))
166 }
167
168 async fn conclude_backwards_pagination_from_disk(
169 &self,
170 _events: Vec<Event>,
171 _timeline_event_diffs: Vec<VectorDiff<Event>>,
172 _reached_start: bool,
173 ) -> BackPaginationOutcome {
174 unimplemented!("loading from disk for threads is not implemented yet");
175 }
176
177 async fn conclude_backwards_pagination_from_network(
178 &self,
179 mut events: Vec<Event>,
180 prev_token: Option<String>,
181 new_token: Option<String>,
182 ) -> Result<Option<BackPaginationOutcome>> {
183 let Some(room) = self.cache.weak_room.get() else {
184 return Ok(None);
186 };
187
188 let reached_start = new_token.is_none();
189
190 let root_event = if reached_start {
193 Some(
195 room.load_or_fetch_event(&self.cache.thread_id, None)
196 .await
197 .map_err(|err| EventCacheError::PaginationError(Arc::new(err)))?,
198 )
199 } else {
200 None
201 };
202
203 let mut state = self.cache.state.write().await?;
204
205 state.save_events(events.iter().cloned()).await?;
207
208 events.extend(root_event);
211
212 let prev_gap_id = if let Some(token) = prev_token {
213 let Some(gap_id) = state.thread_linked_chunk().chunk_identifier(|chunk| {
216 matches!(chunk.content(), ChunkContent::Gap(Gap { token: prev_token }) if *prev_token == token)
217 }) else {
218 return Ok(None);
221 };
222
223 Some(gap_id)
224 } else {
225 None
226 };
227
228 let topo_ordered_events = events.iter().cloned().rev().collect::<Vec<_>>();
231 let new_gap = new_token.map(|token| Gap { token });
232
233 let deduplication = state.filter_duplicate_events(topo_ordered_events);
234
235 let (events, new_gap) = if deduplication.non_empty_all_duplicates {
236 (Vec::new(), None)
239 } else {
240 assert!(
241 deduplication.in_store_duplicated_event_ids.is_empty(),
242 "persistent storage for threads is not implemented yet"
243 );
244 state.remove_events(deduplication.in_memory_duplicated_event_ids).await?;
245
246 (deduplication.all_events, new_gap)
248 };
249
250 let reached_start = state.thread_linked_chunk_mut().push_backwards_pagination_events(
252 prev_gap_id,
253 new_gap,
254 &events,
255 );
256
257 state.propagate_changes().await?;
258
259 let updates = state.thread_linked_chunk_mut().updates_as_vector_diffs();
261
262 if !updates.is_empty() {
263 let _ = state
265 .state
266 .sender
267 .send(TimelineVectorDiffs { diffs: updates, origin: EventsOrigin::Pagination });
268 }
269
270 Ok(Some(BackPaginationOutcome { reached_start, events }))
271 }
272}