matrix_sdk/event_cache/caches/
mod.rs1use eyeball::SharedObservable;
16use eyeball_im::VectorDiff;
17use matrix_sdk_base::{
18 ThreadingSupport,
19 event_cache::{Event, store::EventCacheStoreLock},
20 linked_chunk::Position,
21 sync::{JoinedRoomUpdate, LeftRoomUpdate},
22};
23use ruma::{OwnedRoomId, RoomId};
24use tokio::sync::{broadcast::Sender, mpsc};
25
26use super::{EventCacheError, EventsOrigin, Result};
27use crate::{client::WeakClient, room::WeakRoom};
28
29pub mod event_focused;
30pub mod event_linked_chunk;
31pub(super) mod lock;
32pub mod pagination;
33pub mod pinned_events;
34mod read_receipts;
35pub mod room;
36pub mod thread;
37
38#[derive(Debug)]
40pub(super) struct Caches {
41 pub room: room::RoomEventCache,
42}
43
44impl Caches {
45 pub async fn new(
47 weak_client: &WeakClient,
48 room_id: &RoomId,
49 generic_update_sender: Sender<room::RoomEventCacheGenericUpdate>,
50 linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
51 auto_shrink_sender: mpsc::Sender<OwnedRoomId>,
52 store: EventCacheStoreLock,
53 ) -> Result<Self> {
54 let Some(client) = weak_client.get() else {
55 return Err(EventCacheError::ClientDropped);
56 };
57
58 let weak_room = WeakRoom::new(weak_client.clone(), room_id.to_owned());
59
60 let room = client
61 .get_room(room_id)
62 .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
63 let room_version_rules = room.clone_info().room_version_rules_or_default();
64
65 let pagination_status = SharedObservable::new(pagination::SharedPaginationStatus::Idle {
66 hit_timeline_start: false,
67 });
68
69 let enabled_thread_support =
70 matches!(client.base_client().threading_support, ThreadingSupport::Enabled { .. });
71
72 let update_sender = room::RoomEventCacheUpdateSender::new(generic_update_sender.clone());
73
74 let own_user_id =
75 client.user_id().expect("the user must be logged in, at this point").to_owned();
76
77 let room_state = room::LockedRoomEventCacheState::new(
78 own_user_id,
79 room_id.to_owned(),
80 weak_room.clone(),
81 room_version_rules,
82 enabled_thread_support,
83 update_sender.clone(),
84 linked_chunk_update_sender,
85 store,
86 pagination_status.clone(),
87 )
88 .await?;
89
90 let timeline_is_not_empty =
91 room_state.read().await?.room_linked_chunk().revents().next().is_some();
92
93 let room_event_cache = room::RoomEventCache::new(
94 room_id.to_owned(),
95 weak_room,
96 room_state,
97 pagination_status,
98 auto_shrink_sender,
99 update_sender,
100 );
101
102 if timeline_is_not_empty {
105 let _ = generic_update_sender
106 .send(room::RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
107 }
108
109 Ok(Self { room: room_event_cache })
110 }
111
112 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
114 let Self { room } = &self;
115
116 room.handle_joined_room_update(updates).await?;
117
118 Ok(())
119 }
120
121 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
123 let Self { room } = &self;
124
125 room.handle_left_room_update(updates).await?;
126
127 Ok(())
128 }
129
130 pub async fn prepare_to_reset(&mut self) -> Result<ResetCaches<'_>> {
138 ResetCaches::new(self).await
139 }
140
141 #[cfg(feature = "e2e-encryption")]
145 pub async fn all_events(&self) -> Result<impl Iterator<Item = Event>> {
146 let events_from_room = self.room.events().await?;
147
148 Ok(events_from_room.into_iter())
149 }
150}
151
152pub(super) struct ResetCaches<'c> {
158 room_lock: (&'c room::RoomEventCache, room::RoomEventCacheStateLockWriteGuard<'c>),
159}
160
161impl<'c> ResetCaches<'c> {
162 async fn new(Caches { room }: &'c mut Caches) -> Result<Self> {
166 Ok(Self { room_lock: (room, room.state().write().await?) })
167 }
168
169 pub async fn reset_all(self) -> Result<()> {
176 let Self { room_lock: (room, mut room_state) } = self;
177
178 {
179 let updates_as_vector_diffs = room_state.reset().await?;
180 room.update_sender().send(
181 room::RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
182 diffs: updates_as_vector_diffs,
183 origin: EventsOrigin::Cache,
184 }),
185 Some(room::RoomEventCacheGenericUpdate { room_id: room.room_id().to_owned() }),
186 );
187 }
188
189 Ok(())
190 }
191}
192
193#[derive(Clone, Debug)]
195pub struct TimelineVectorDiffs {
196 pub diffs: Vec<VectorDiff<Event>>,
198 pub origin: EventsOrigin,
200}
201
202pub(super) enum EventLocation {
204 Memory(Position),
206
207 Store,
209}