matrix_sdk/event_cache/caches/
mod.rs1use eyeball::SharedObservable;
16use eyeball_im::VectorDiff;
17use matrix_sdk_base::{
18 ThreadingSupport,
19 event_cache::{Event, store::EventCacheStoreLock},
20 linked_chunk::Position,
21 sync::{JoinedRoomUpdate, LeftRoomUpdate},
22};
23use ruma::{OwnedRoomId, RoomId};
24use tokio::sync::{broadcast::Sender, mpsc};
25
26use super::{EventCacheError, EventsOrigin, Result};
27use crate::{
28 client::WeakClient, event_cache::automatic_pagination::AutomaticPagination, room::WeakRoom,
29};
30
31pub mod event_focused;
32pub mod event_linked_chunk;
33pub(super) mod lock;
34pub mod pagination;
35pub mod pinned_events;
36mod read_receipts;
37pub mod room;
38pub mod thread;
39
40#[derive(Debug)]
42pub(super) struct Caches {
43 pub room: room::RoomEventCache,
44}
45
46impl Caches {
47 pub async fn new(
49 weak_client: &WeakClient,
50 room_id: &RoomId,
51 generic_update_sender: Sender<room::RoomEventCacheGenericUpdate>,
52 linked_chunk_update_sender: Sender<room::RoomEventCacheLinkedChunkUpdate>,
53 auto_shrink_sender: mpsc::Sender<OwnedRoomId>,
54 store: EventCacheStoreLock,
55 automatic_pagination: Option<AutomaticPagination>,
56 ) -> Result<Self> {
57 let Some(client) = weak_client.get() else {
58 return Err(EventCacheError::ClientDropped);
59 };
60
61 let weak_room = WeakRoom::new(weak_client.clone(), room_id.to_owned());
62
63 let room = client
64 .get_room(room_id)
65 .ok_or_else(|| EventCacheError::RoomNotFound { room_id: room_id.to_owned() })?;
66 let room_version_rules = room.clone_info().room_version_rules_or_default();
67
68 let pagination_status = SharedObservable::new(pagination::SharedPaginationStatus::Idle {
69 hit_timeline_start: false,
70 });
71
72 let enabled_thread_support =
73 matches!(client.base_client().threading_support, ThreadingSupport::Enabled { .. });
74
75 let update_sender = room::RoomEventCacheUpdateSender::new(generic_update_sender.clone());
76
77 let own_user_id =
78 client.user_id().expect("the user must be logged in, at this point").to_owned();
79
80 let room_state = room::LockedRoomEventCacheState::new(
81 own_user_id,
82 room_id.to_owned(),
83 weak_room.clone(),
84 room_version_rules,
85 enabled_thread_support,
86 update_sender.clone(),
87 linked_chunk_update_sender,
88 store,
89 pagination_status.clone(),
90 automatic_pagination,
91 )
92 .await?;
93
94 let timeline_is_not_empty =
95 room_state.read().await?.room_linked_chunk().revents().next().is_some();
96
97 let room_event_cache = room::RoomEventCache::new(
98 room_id.to_owned(),
99 weak_room,
100 room_state,
101 pagination_status,
102 auto_shrink_sender,
103 update_sender,
104 );
105
106 if timeline_is_not_empty {
109 let _ = generic_update_sender
110 .send(room::RoomEventCacheGenericUpdate { room_id: room_id.to_owned() });
111 }
112
113 Ok(Self { room: room_event_cache })
114 }
115
116 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
118 let Self { room } = &self;
119
120 room.handle_joined_room_update(updates).await?;
121
122 Ok(())
123 }
124
125 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
127 let Self { room } = &self;
128
129 room.handle_left_room_update(updates).await?;
130
131 Ok(())
132 }
133
134 pub async fn prepare_to_reset(&mut self) -> Result<ResetCaches<'_>> {
142 ResetCaches::new(self).await
143 }
144
145 #[cfg(feature = "e2e-encryption")]
149 pub async fn all_events(&self) -> Result<impl Iterator<Item = Event>> {
150 let events_from_room = self.room.events().await?;
151
152 Ok(events_from_room.into_iter())
153 }
154}
155
156pub(super) struct ResetCaches<'c> {
162 room_lock: (&'c room::RoomEventCache, room::RoomEventCacheStateLockWriteGuard<'c>),
163}
164
165impl<'c> ResetCaches<'c> {
166 async fn new(Caches { room }: &'c mut Caches) -> Result<Self> {
170 Ok(Self { room_lock: (room, room.state().write().await?) })
171 }
172
173 pub async fn reset_all(self) -> Result<()> {
180 let Self { room_lock: (room, mut room_state) } = self;
181
182 {
183 let updates_as_vector_diffs = room_state.reset().await?;
184 room.update_sender().send(
185 room::RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs {
186 diffs: updates_as_vector_diffs,
187 origin: EventsOrigin::Cache,
188 }),
189 Some(room::RoomEventCacheGenericUpdate { room_id: room.room_id().to_owned() }),
190 );
191 }
192
193 Ok(())
194 }
195}
196
197#[derive(Clone, Debug)]
199pub struct TimelineVectorDiffs {
200 pub diffs: Vec<VectorDiff<Event>>,
202 pub origin: EventsOrigin,
204}
205
206pub(super) enum EventLocation {
208 Memory(Position),
210
211 Store,
213}