matrix_sdk/event_cache/caches/
mod.rs1use std::{collections::HashMap, ops::Deref, sync::Arc};
16
17use eyeball::SharedObservable;
18use eyeball_im::VectorDiff;
19use matrix_sdk_base::{
20 ThreadingSupport,
21 event_cache::Event,
22 linked_chunk::Position,
23 sync::{JoinedRoomUpdate, LeftRoomUpdate},
24};
25use ruma::{OwnedEventId, OwnedRoomId, RoomId, room_version_rules::RoomVersionRules};
26use tokio::sync::{
27 OnceCell, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock, broadcast::Sender, mpsc,
28};
29
30use super::{
31 EventCacheError, EventsOrigin, Result, automatic_pagination::AutomaticPagination, states,
32};
33use crate::{client::WeakClient, room::WeakRoom};
34
35mod aggregator;
36pub mod event_focused;
37pub mod event_linked_chunk;
38pub mod pagination;
39pub mod pinned_events;
40mod read_receipts;
41pub mod room;
42pub mod thread;
43
44#[derive(Debug)]
46pub(super) struct Caches {
47 pub room: room::RoomEventCache,
51
52 pub threads: Arc<RwLock<HashMap<OwnedEventId, thread::ThreadEventCache>>>,
57
58 pub pinned_events: OnceCell<pinned_events::PinnedEventsCache>,
62
63 pub event_focused:
68 Arc<RwLock<HashMap<event_focused::EventFocusedCacheKey, event_focused::EventFocusedCache>>>,
69
70 internals: CachesInternals,
72}
73
74#[derive(Debug)]
75struct CachesInternals {
76 state: states::StateLock,
77 linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
78 room_version_rules: RoomVersionRules,
79}
80
81impl Caches {
82 pub async fn new(
84 weak_client: &WeakClient,
85 room_id: &RoomId,
86 generic_update_sender: Sender<room::RoomEventCacheGenericUpdate>,
87 linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
88 auto_shrink_sender: mpsc::Sender<OwnedRoomId>,
89 state: &states::StateLock,
90 automatic_pagination: Option<AutomaticPagination>,
91 ) -> Result<Self> {
92 let Some(client) = weak_client.get() else {
93 return Err(EventCacheError::ClientDropped);
94 };
95
96 let weak_room = WeakRoom::new(weak_client.clone(), room_id.to_owned());
97
98 let room = client
99 .get_room(room_id)
100 .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
101 let room_version_rules = room.clone_info().room_version_rules_or_default();
102
103 let pagination_status = SharedObservable::new(pagination::SharedPaginationStatus::Idle {
104 hit_timeline_start: false,
105 });
106
107 let enabled_thread_support =
108 matches!(client.base_client().threading_support, ThreadingSupport::Enabled { .. });
109
110 let update_sender = room::RoomEventCacheUpdateSender::new(generic_update_sender.clone());
111
112 let own_user_id =
113 client.user_id().expect("the user must be logged in, at this point").to_owned();
114
115 let room_state = state
116 .try_insert_once_with(
117 states::selectors::RoomStateSelector::new(room_id.to_owned()),
118 |store_guard| {
119 room::RoomEventCacheState::new(
120 own_user_id.clone(),
121 room_id.to_owned(),
122 weak_room.clone(),
123 room_version_rules.clone(),
124 enabled_thread_support,
125 update_sender.clone(),
126 linked_chunk_update_sender.clone(),
127 store_guard,
128 pagination_status.clone(),
129 automatic_pagination,
130 )
131 },
132 )
133 .await?;
134
135 let timeline_is_not_empty =
136 room_state.read().await?.room_linked_chunk().revents().next().is_some();
137
138 let room_event_cache = room::RoomEventCache::new(
139 room_id.to_owned(),
140 weak_room,
141 own_user_id,
142 room_state,
143 pagination_status,
144 auto_shrink_sender,
145 update_sender,
146 );
147
148 if timeline_is_not_empty {
151 let _ = generic_update_sender
152 .send(room::RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
153 }
154
155 Ok(Self {
156 room: room_event_cache,
157 threads: Arc::new(RwLock::new(HashMap::new())),
158 pinned_events: OnceCell::new(),
159 event_focused: Arc::new(RwLock::new(HashMap::new())),
160 internals: CachesInternals {
161 state: state.clone(),
162 linked_chunk_update_sender,
163 room_version_rules,
164 },
165 })
166 }
167
168 pub fn room(&self) -> &room::RoomEventCache {
172 &self.room
173 }
174
175 pub async fn thread(
183 &self,
184 thread_id: OwnedEventId,
185 ) -> Result<
186 OwnedRwLockReadGuard<
187 HashMap<OwnedEventId, thread::ThreadEventCache>,
188 thread::ThreadEventCache,
189 >,
190 > {
191 Ok(
192 match OwnedRwLockWriteGuard::try_downgrade_map(
193 self.threads.clone().write_owned().await,
194 |threads| threads.get(&thread_id),
195 ) {
196 Ok(locked_cache) => locked_cache,
198 Err(mut threads) => {
200 let room = &self.room;
201 let cache = thread::ThreadEventCache::new(
202 room.room_id().to_owned(),
203 thread_id.clone(),
204 room.own_user_id().to_owned(),
205 self.internals.room_version_rules.clone(),
206 room.weak_room().to_owned(),
207 &self.internals.state,
208 room.update_sender().generic_update_sender().clone(),
209 self.internals.linked_chunk_update_sender.clone(),
210 )
211 .await?;
212
213 threads.insert(thread_id.clone(), cache);
214
215 OwnedRwLockWriteGuard::downgrade_map(threads, |threads| {
216 threads.get(&thread_id).unwrap()
217 })
218 }
219 },
220 )
221 }
222
223 pub async fn pinned_events(&self) -> Result<&pinned_events::PinnedEventsCache> {
227 self.pinned_events
228 .get_or_try_init(|| {
229 pinned_events::PinnedEventsCache::new(
230 self.room.weak_room(),
231 self.room.own_user_id().clone(),
232 self.internals.room_version_rules.clone(),
233 self.internals.linked_chunk_update_sender.clone(),
234 &self.internals.state,
235 )
236 })
237 .await
238 }
239
240 pub async fn event_focused(
244 &self,
245 event_id: OwnedEventId,
246 thread_mode: event_focused::EventFocusThreadMode,
247 number_of_initial_events: u16,
248 ) -> Result<
249 OwnedRwLockReadGuard<
250 HashMap<event_focused::EventFocusedCacheKey, event_focused::EventFocusedCache>,
251 event_focused::EventFocusedCache,
252 >,
253 > {
254 let key = event_focused::EventFocusedCacheKey { focused_event_id: event_id, thread_mode };
255
256 Ok(
257 match OwnedRwLockWriteGuard::try_downgrade_map(
258 self.event_focused.clone().write_owned().await,
259 |event_focused_caches| event_focused_caches.get(&key),
260 ) {
261 Ok(locked_cache) => locked_cache,
263 Err(mut event_focused_caches) => {
265 let cache = event_focused::EventFocusedCache::new(
266 self.room.weak_room().clone(),
267 key.clone(),
268 &self.internals.state,
269 self.internals.linked_chunk_update_sender.clone(),
270 )
271 .await?;
272 cache.start_from(number_of_initial_events, thread_mode).await?;
273
274 event_focused_caches.insert(key.clone(), cache);
275
276 OwnedRwLockWriteGuard::downgrade_map(
277 event_focused_caches,
278 |event_focused_caches| event_focused_caches.get(&key).unwrap(),
279 )
280 }
281 },
282 )
283 }
284
285 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
287 let Self { room, threads, pinned_events, event_focused, internals } = &self;
288
289 {
291 let mut updates = updates.clone();
292 updates.timeline = aggregator::aggregate_timeline_for_room(updates.timeline);
293
294 room.handle_joined_room_update(updates).await?;
295 }
296
297 {
299 let mut updates = updates.clone();
300 updates.account_data.clear();
301 updates.ambiguity_changes.clear();
302
303 let timeline_for_threads = aggregator::aggregate_timeline_for_threads(
304 &updates.timeline,
305 threads.read().await.deref(),
306 room.state().read().await?,
307 &internals.room_version_rules.redaction,
308 )
309 .await?;
310
311 for (thread_id, timeline) in timeline_for_threads {
312 let mut updates = updates.clone();
313 updates.timeline = timeline;
314
315 let thread = self.thread(thread_id).await?;
316 thread.handle_joined_room_update(updates).await?;
317
318 let new_thread_summary =
319 thread.state().read().await?.compute_thread_summary().await?;
320
321 room.update_thread_summary(thread.thread_id(), new_thread_summary).await?;
322 }
323 }
324
325 if let Some(pinned_events) = pinned_events.get() {
327 let mut updates = updates.clone();
328 updates.timeline = aggregator::aggregate_timeline_for_pinned_events(
329 &updates.timeline,
330 &pinned_events.state().read().await?.current_event_ids(),
331 &internals.room_version_rules.redaction,
332 );
333
334 pinned_events.handle_joined_room_update(updates).await?;
335 }
336
337 {
339 let _ = event_focused;
342 }
343
344 Ok(())
345 }
346
347 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
349 let Self { room, threads, pinned_events, event_focused, internals } = &self;
350
351 {
353 let mut updates = updates.clone();
354 updates.timeline = aggregator::aggregate_timeline_for_room(updates.timeline);
355
356 room.handle_left_room_update(updates).await?;
357 }
358
359 {
361 let mut updates = updates.clone();
362 updates.account_data.clear();
363 updates.ambiguity_changes.clear();
364
365 let timeline_for_threads = aggregator::aggregate_timeline_for_threads(
366 &updates.timeline,
367 threads.read().await.deref(),
368 room.state().read().await?,
369 &internals.room_version_rules.redaction,
370 )
371 .await?;
372
373 for (thread_id, timeline) in timeline_for_threads {
374 let mut updates = updates.clone();
375 updates.timeline = timeline;
376
377 let thread = self.thread(thread_id).await?;
378 thread.handle_left_room_update(updates).await?;
379
380 let new_thread_summary =
381 thread.state().read().await?.compute_thread_summary().await?;
382
383 room.update_thread_summary(thread.thread_id(), new_thread_summary).await?;
384 }
385 }
386
387 if let Some(pinned_events) = pinned_events.get() {
389 let mut updates = updates.clone();
390 updates.timeline = aggregator::aggregate_timeline_for_pinned_events(
391 &updates.timeline,
392 &pinned_events.state().read().await?.current_event_ids(),
393 &internals.room_version_rules.redaction,
394 );
395
396 pinned_events.handle_left_room_update(updates).await?;
397 }
398
399 {
401 let _ = event_focused;
404 }
405
406 Ok(())
407 }
408
409 #[cfg(feature = "e2e-encryption")]
414 pub async fn all_in_memory_events(&self) -> Result<impl Iterator<Item = Event>> {
415 let mut events = self.room.events().await?;
420
421 {
423 let event_focused = self.event_focused.read().await;
424
425 for event_focused in event_focused.values() {
426 events.extend(event_focused.events().await?);
427 }
428 }
429
430 Ok(events.into_iter())
431 }
432
433 #[cfg(feature = "e2e-encryption")]
442 pub async fn all_events_of_type(
443 &self,
444 event_type: Option<&str>,
445 session_id: Option<&str>,
446 ) -> Result<impl Iterator<Item = Event>> {
447 let mut events = {
450 let state = self.internals.state.read().await?;
451
452 state.store.get_room_events(self.room.room_id(), event_type, session_id).await?
453 };
454
455 {
458 let event_focused = self.event_focused.read().await;
459
460 for event_focused in event_focused.values() {
461 events.extend(
462 event_focused
463 .events()
464 .await?
465 .into_iter()
466 .filter(|event| event_type == event.kind.event_type().as_deref())
467 .filter(|event| session_id == event.kind.session_id()),
468 );
469 }
470 }
471
472 Ok(events.into_iter())
473 }
474}
475
476#[derive(Clone, Debug)]
478pub struct TimelineVectorDiffs {
479 pub diffs: Vec<VectorDiff<Event>>,
481 pub origin: EventsOrigin,
483}
484
485#[derive(Debug)]
487pub(super) enum EventLocation {
488 Memory(Position),
490
491 Store,
493}