matrix_sdk/event_cache/room/
threads.rs1use std::collections::BTreeSet;
18
19use eyeball_im::VectorDiff;
20use matrix_sdk_base::{
21 event_cache::{Event, Gap},
22 linked_chunk::{ChunkContent, OwnedLinkedChunkId, Position},
23};
24use ruma::{OwnedEventId, OwnedRoomId};
25use tokio::sync::broadcast::{Receiver, Sender};
26use tracing::trace;
27
28use crate::event_cache::{
29 deduplicator::DeduplicationOutcome,
30 room::{events::EventLinkedChunk, LoadMoreEventsBackwardsOutcome},
31 BackPaginationOutcome, EventsOrigin, RoomEventCacheLinkedChunkUpdate,
32};
33
34#[derive(Clone, Debug)]
36pub struct ThreadEventCacheUpdate {
37 pub diffs: Vec<VectorDiff<Event>>,
39 pub origin: EventsOrigin,
41}
42
43pub(crate) struct ThreadEventCache {
45 room_id: OwnedRoomId,
47
48 thread_root: OwnedEventId,
51
52 chunk: EventLinkedChunk,
54
55 sender: Sender<ThreadEventCacheUpdate>,
57
58 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
63}
64
65impl ThreadEventCache {
66 pub fn new(
68 room_id: OwnedRoomId,
69 thread_root: OwnedEventId,
70 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
71 ) -> Self {
72 Self {
73 chunk: EventLinkedChunk::new(),
74 sender: Sender::new(32),
75 room_id,
76 thread_root,
77 linked_chunk_update_sender,
78 }
79 }
80
81 pub fn subscribe(&self) -> (Vec<Event>, Receiver<ThreadEventCacheUpdate>) {
83 let events = self.chunk.events().map(|(_position, item)| item.clone()).collect();
84
85 let recv = self.sender.subscribe();
86
87 (events, recv)
88 }
89
90 pub fn clear(&mut self) {
92 self.chunk.reset();
93
94 let diffs = self.chunk.updates_as_vector_diffs();
95 if !diffs.is_empty() {
96 let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Cache });
97 }
98 }
99
100 fn propagate_changes(&mut self) {
103 let updates = self.chunk.store_updates().take();
106
107 let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate {
108 updates,
109 linked_chunk_id: OwnedLinkedChunkId::Thread(
110 self.room_id.clone(),
111 self.thread_root.clone(),
112 ),
113 });
114 }
115
116 pub fn add_live_events(&mut self, events: Vec<Event>) {
119 if events.is_empty() {
120 return;
121 }
122
123 let deduplication = self.filter_duplicate_events(events);
124
125 if deduplication.non_empty_all_duplicates {
126 return;
129 }
130
131 self.remove_in_memory_duplicated_events(deduplication.in_memory_duplicated_event_ids);
133 assert!(
134 deduplication.in_store_duplicated_event_ids.is_empty(),
135 "persistent storage for threads is not implemented yet"
136 );
137
138 let events = deduplication.all_events;
139
140 self.chunk.push_live_events(None, &events);
141
142 self.propagate_changes();
143
144 let diffs = self.chunk.updates_as_vector_diffs();
145 if !diffs.is_empty() {
146 let _ = self.sender.send(ThreadEventCacheUpdate { diffs, origin: EventsOrigin::Sync });
147 }
148 }
149
150 pub fn load_more_events_backwards(&self) -> LoadMoreEventsBackwardsOutcome {
155 if let Some(prev_token) = self.chunk.rgap().map(|gap| gap.prev_token) {
158 trace!(%prev_token, "thread chunk has at least a gap");
159 return LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) };
160 }
161
162 if let Some((_pos, event)) = self.chunk.events().next() {
165 let first_event_id =
166 event.event_id().expect("a linked chunk only stores events with IDs");
167
168 if first_event_id == self.thread_root {
169 trace!("thread chunk is fully loaded and non-empty: reached_start=true");
170 return LoadMoreEventsBackwardsOutcome::StartOfTimeline;
171 }
172 }
173
174 LoadMoreEventsBackwardsOutcome::Gap { prev_token: None }
177 }
178
179 fn filter_duplicate_events(&self, mut new_events: Vec<Event>) -> DeduplicationOutcome {
185 let mut new_event_ids = BTreeSet::new();
186
187 new_events.retain(|event| {
188 event.event_id().is_some_and(|event_id| new_event_ids.insert(event_id))
191 });
192
193 let in_memory_duplicated_event_ids: Vec<_> = self
194 .chunk
195 .events()
196 .filter_map(|(position, event)| {
197 let event_id = event.event_id()?;
198 new_event_ids.contains(&event_id).then_some((event_id, position))
199 })
200 .collect();
201
202 let in_store_duplicated_event_ids = Vec::new();
204
205 let at_least_one_event = !new_events.is_empty();
206 let all_duplicates = (in_memory_duplicated_event_ids.len()
207 + in_store_duplicated_event_ids.len())
208 == new_events.len();
209 let non_empty_all_duplicates = at_least_one_event && all_duplicates;
210
211 DeduplicationOutcome {
212 all_events: new_events,
213 in_memory_duplicated_event_ids,
214 in_store_duplicated_event_ids,
215 non_empty_all_duplicates,
216 }
217 }
218
219 fn remove_in_memory_duplicated_events(
225 &mut self,
226 in_memory_duplicated_event_ids: Vec<(OwnedEventId, Position)>,
227 ) {
228 self.chunk
230 .remove_events_by_position(
231 in_memory_duplicated_event_ids
232 .iter()
233 .map(|(_event_id, position)| *position)
234 .collect(),
235 )
236 .expect("we collected the position of the events to remove just before");
237 }
238
239 pub fn finish_network_pagination(
245 &mut self,
246 prev_token: Option<String>,
247 new_token: Option<String>,
248 events: Vec<Event>,
249 ) -> Option<BackPaginationOutcome> {
250 let prev_gap_id = if let Some(token) = prev_token {
253 let gap_id = self.chunk.chunk_identifier(|chunk| {
256 matches!(chunk.content(), ChunkContent::Gap(Gap { ref prev_token }) if *prev_token == token)
257 })?;
258
259 Some(gap_id)
260 } else {
261 None
262 };
263
264 let topo_ordered_events = events.iter().cloned().rev().collect::<Vec<_>>();
267 let new_gap = new_token.map(|token| Gap { prev_token: token });
268
269 let deduplication = self.filter_duplicate_events(topo_ordered_events);
270
271 let (events, new_gap) = if deduplication.non_empty_all_duplicates {
272 (Vec::new(), None)
275 } else {
276 assert!(
277 deduplication.in_store_duplicated_event_ids.is_empty(),
278 "persistent storage for threads is not implemented yet"
279 );
280 self.remove_in_memory_duplicated_events(deduplication.in_memory_duplicated_event_ids);
281
282 (deduplication.all_events, new_gap)
284 };
285
286 let reached_start = self.chunk.finish_back_pagination(prev_gap_id, new_gap, &events);
288
289 self.propagate_changes();
290
291 let updates = self.chunk.updates_as_vector_diffs();
293 if !updates.is_empty() {
294 let _ = self
296 .sender
297 .send(ThreadEventCacheUpdate { diffs: updates, origin: EventsOrigin::Pagination });
298 }
299
300 Some(BackPaginationOutcome { reached_start, events })
301 }
302
303 pub fn latest_event_id(&self) -> Option<OwnedEventId> {
305 self.chunk.revents().next().and_then(|(_position, event)| event.event_id())
306 }
307}