1mod updates;
16
17use std::{cmp::Ordering, collections::BTreeSet, fmt, sync::Arc};
18
19use eyeball_im::VectorDiff;
20use futures_util::{StreamExt as _, stream};
21use matrix_sdk_base::{
22 apply_redaction,
23 event_cache::{Event, Gap},
24 linked_chunk::{LinkedChunkId, OwnedLinkedChunkId, Position, Update},
25 serde_helpers::extract_redaction_target,
26 sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
27 task_monitor::BackgroundTaskHandle,
28};
29use matrix_sdk_common::executor::spawn;
30use ruma::{
31 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId,
32 events::{relation::RelationType, room::redaction::SyncRoomRedactionEvent},
33 room_version_rules::RoomVersionRules,
34};
35use tokio::sync::broadcast::{Receiver, Sender};
36use tracing::{debug, instrument, trace, warn};
37
38pub(super) use self::updates::PinnedEventsCacheUpdateSender;
39#[cfg(feature = "e2e-encryption")]
40use super::super::redecryptor::ResolvedUtd;
41use super::{
42 super::{
43 EventCacheError, EventsOrigin, Result,
44 deduplicator::{DeduplicationOutcome, filter_duplicate_events},
45 persistence::{find_event, send_updates_to_store},
46 states::{
47 CacheStateLock, ReloadPreprocessing, StateLock, StateLockWriteGuard,
48 selectors::PinnedEventsStateSelector,
49 },
50 },
51 EventLocation, TimelineVectorDiffs,
52 event_linked_chunk::{EventLinkedChunk, sort_positions_descending},
53 room::RoomEventCacheLinkedChunkUpdate,
54};
55use crate::{Room, client::WeakClient, config::RequestConfig, room::WeakRoom};
56
57pub struct PinnedEventsCacheState {
58 room_id: OwnedRoomId,
60
61 own_user_id: OwnedUserId,
63
64 room_version_rules: RoomVersionRules,
66
67 chunk: EventLinkedChunk,
74
75 pub update_sender: PinnedEventsCacheUpdateSender,
77
78 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
83}
84
85#[cfg(not(tarpaulin_include))]
86impl fmt::Debug for PinnedEventsCacheState {
87 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88 f.debug_struct("PinnedEventsCacheState")
89 .field("room_id", &self.room_id)
90 .field("chunk", &self.chunk)
91 .finish_non_exhaustive()
92 }
93}
94
95impl<'a> StateLockWriteGuard<'a, PinnedEventsCacheState> {
96 #[must_use = "Propagate `VectorDiff` updates via `TimelineVectorDiffs`"]
102 pub async fn reload(
103 &mut self,
104 preprocessing: ReloadPreprocessing,
105 ) -> Result<Vec<VectorDiff<Event>>> {
106 match preprocessing {
107 ReloadPreprocessing::ForgetAll => {
108 self.state.chunk.reset();
110 self.propagate_changes().await?;
111 }
112
113 ReloadPreprocessing::None => {}
114 }
115
116 self.reload_from_storage().await?;
119
120 Ok(self.state.chunk.updates_as_vector_diffs())
121 }
122
123 async fn handle_sync(&mut self, timeline: Timeline) -> Result<()> {
124 let DeduplicationOutcome {
125 all_events: events,
126 in_memory_duplicated_event_ids,
127 in_store_duplicated_event_ids,
128 non_empty_all_duplicates: all_duplicates,
129 } = filter_duplicate_events(
130 &self.state.own_user_id,
131 &self.store,
132 LinkedChunkId::PinnedEvents(&self.state.room_id),
133 &self.state.chunk,
134 timeline.events,
135 )
136 .await?;
137
138 if all_duplicates {
139 return Ok(());
142 }
143
144 self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?;
149
150 self.state.chunk.push_live_events(None, &events);
152
153 self.propagate_changes().await?;
154 self.notify_subscribers(EventsOrigin::Sync);
155
156 for event in &events {
158 self.maybe_apply_new_redaction(event).await?;
160 }
161
162 Ok(())
163 }
164
165 #[instrument(skip_all)]
170 pub async fn remove_events(
171 &mut self,
172 in_memory_events: Vec<(OwnedEventId, Position)>,
173 in_store_events: Vec<(OwnedEventId, Position)>,
174 ) -> Result<()> {
175 if !in_store_events.is_empty() {
177 let mut positions = in_store_events
178 .into_iter()
179 .map(|(_event_id, position)| position)
180 .collect::<Vec<_>>();
181
182 sort_positions_descending(&mut positions);
183
184 let updates =
185 positions.into_iter().map(|pos| Update::RemoveItem { at: pos }).collect::<Vec<_>>();
186
187 self.apply_store_only_updates(updates).await?;
188 }
189
190 if in_memory_events.is_empty() {
192 return Ok(());
194 }
195
196 self.state
198 .chunk
199 .remove_events_by_position(
200 in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
201 )
202 .expect("failed to remove an event");
203
204 self.propagate_changes().await
205 }
206
207 async fn apply_store_only_updates(&mut self, updates: Vec<Update<Event, Gap>>) -> Result<()> {
214 self.state.chunk.order_tracker.map_updates(&updates);
215 self.send_updates_to_store(updates).await
216 }
217
218 #[instrument(skip_all)]
222 async fn maybe_apply_new_redaction(&mut self, event: &Event) -> Result<()> {
223 let Some(event_id) =
224 extract_redaction_target(event.raw(), &self.room_version_rules.redaction)
225 else {
226 return Ok(());
227 };
228
229 let Some((location, mut target_event)) = self.find_event(&event_id).await? else {
231 trace!("redacted event is missing from the linked chunk");
232 return Ok(());
233 };
234
235 let target_event_raw = target_event.raw();
236
237 if let Ok(deserialized) = target_event_raw.deserialize()
239 && deserialized.is_redacted()
240 {
241 return Ok(());
242 }
243
244 if let Some(redacted_event) = apply_redaction(
245 target_event_raw,
246 event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
247 &self.room_version_rules.redaction,
248 ) {
249 target_event.replace_raw(redacted_event.cast_unchecked());
254
255 self.replace_event_at(location, target_event.clone()).await?;
256 }
257
258 Ok(())
259 }
260
261 pub(super) async fn find_event(
263 &self,
264 event_id: &EventId,
265 ) -> Result<Option<(EventLocation, Event)>> {
266 find_event(event_id, &self.room_id, &self.chunk, &self.store).await
267 }
268
269 pub async fn replace_event_at(
276 &mut self,
277 location: EventLocation,
278 new_event: Event,
279 ) -> Result<()> {
280 match location {
281 EventLocation::Memory(position) => {
282 self.state
283 .chunk
284 .replace_event_at(position, new_event)
285 .expect("should have been a valid position of an item");
286 self.propagate_changes().await?;
289 }
290 EventLocation::Store => {
291 self.save_events([new_event]).await?;
292 }
293 }
294
295 Ok(())
296 }
297
298 pub async fn save_events(&mut self, events: impl IntoIterator<Item = Event>) -> Result<()> {
300 let store = self.store.clone();
301 let room_id = self.state.room_id.clone();
302 let events = events.into_iter().collect::<Vec<_>>();
303
304 spawn(async move {
306 for event in events {
307 store.save_event(&room_id, event).await?;
308 }
309
310 Result::Ok(())
311 })
312 .await
313 .expect("joining failed")?;
314
315 Ok(())
316 }
317
318 async fn reload_from_storage(&mut self) -> Result<()> {
321 let room_id = self.state.room_id.clone();
322 let linked_chunk_id = LinkedChunkId::PinnedEvents(&room_id);
323
324 let (last_chunk, chunk_id_gen) = self.store.load_last_chunk(linked_chunk_id).await?;
325
326 let Some(last_chunk) = last_chunk else {
327 if self.state.chunk.events().next().is_some() {
330 self.state.chunk.reset();
331 self.notify_subscribers(EventsOrigin::Sync);
332 }
333
334 return Ok(());
335 };
336
337 {
338 let mut current_chunk_identifier = last_chunk.identifier;
339 self.state.chunk.replace_with(Some(last_chunk), chunk_id_gen)?;
340
341 while let Some(previous_chunk) =
343 self.store.load_previous_chunk(linked_chunk_id, current_chunk_identifier).await?
344 {
345 current_chunk_identifier = previous_chunk.identifier;
346 self.state.chunk.insert_new_chunk_as_first(previous_chunk)?;
347 }
348 }
349
350 self.state.chunk.store_updates().take();
352
353 self.notify_subscribers(EventsOrigin::Cache);
355
356 Ok(())
357 }
358
359 async fn replace_all_events(&mut self, new_events: Vec<Event>) -> Result<()> {
360 trace!("resetting all pinned events in linked chunk");
361
362 let previous_pinned_event_ids = self.state.current_event_ids();
363
364 if new_events
365 .iter()
366 .filter_map(|e| e.event_id())
367 .map(ToOwned::to_owned)
368 .collect::<BTreeSet<_>>()
369 == previous_pinned_event_ids.into_iter().collect()
370 {
371 return Ok(());
373 }
374
375 if self.state.chunk.events().next().is_some() {
376 self.state.chunk.reset();
377 }
378
379 self.state.chunk.push_live_events(None, &new_events);
380 self.propagate_changes().await?;
381 self.notify_subscribers(EventsOrigin::Sync);
382
383 Ok(())
384 }
385
386 pub async fn propagate_changes(&mut self) -> Result<()> {
389 let updates = self.state.chunk.store_updates().take();
390
391 self.send_updates_to_store(updates).await
392 }
393
394 async fn send_updates_to_store(&mut self, updates: Vec<Update<Event, Gap>>) -> Result<()> {
395 let linked_chunk_id = OwnedLinkedChunkId::PinnedEvents(self.room_id.clone());
396
397 send_updates_to_store(
398 &self.store,
399 linked_chunk_id,
400 &self.state.linked_chunk_update_sender,
401 updates,
402 )
403 .await
404 }
405
406 fn notify_subscribers(&mut self, origin: EventsOrigin) {
408 let diffs = self.state.chunk.updates_as_vector_diffs();
409
410 if !diffs.is_empty() {
411 self.update_sender.send(TimelineVectorDiffs { diffs, origin });
412 }
413 }
414}
415
416impl PinnedEventsCacheState {
417 pub(super) fn current_event_ids(&self) -> Vec<OwnedEventId> {
419 self.chunk
420 .events()
421 .filter_map(|(_position, event)| event.event_id().map(ToOwned::to_owned))
422 .collect()
423 }
424}
425
426#[derive(Clone)]
430pub struct PinnedEventsCache {
431 inner: Arc<PinnedEventsCacheInner>,
432
433 _task: Arc<BackgroundTaskHandle>,
436}
437
438struct PinnedEventsCacheInner {
440 room_id: OwnedRoomId,
442
443 state: CacheStateLock<PinnedEventsStateSelector>,
447}
448
449impl PinnedEventsCache {
450 pub(in super::super) async fn new(
452 weak_room: &WeakRoom,
453 own_user_id: OwnedUserId,
454 room_version_rules: RoomVersionRules,
455 linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
456 state: &StateLock,
457 ) -> Result<Self> {
458 let room = weak_room.get().ok_or(EventCacheError::ClientDropped)?;
459 let room_id = room.room_id().to_owned();
460
461 let cache_state = state
462 .try_insert_once_with(
463 PinnedEventsStateSelector::new(room_id.clone()),
464 |_store_guard| async {
465 Ok(PinnedEventsCacheState {
466 room_id: room_id.clone(),
467 own_user_id,
468 room_version_rules,
469 chunk: EventLinkedChunk::new(),
470 update_sender: PinnedEventsCacheUpdateSender::new(),
471 linked_chunk_update_sender,
472 })
473 },
474 )
475 .await?;
476
477 let inner = Arc::new(PinnedEventsCacheInner { room_id, state: cache_state });
478
479 let task = room
480 .client()
481 .task_monitor()
482 .spawn_infinite_task(
483 "pinned_event_listener_task",
484 Self::pinned_event_listener_task(room, inner.clone()),
485 )
486 .abort_on_drop();
487
488 Ok(Self { inner, _task: Arc::new(task) })
489 }
490
491 pub(super) fn state(&self) -> &CacheStateLock<PinnedEventsStateSelector> {
493 &self.inner.state
494 }
495
496 pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
498 let guard = self.inner.state.read().await?;
499 let events = guard.state.chunk.events().map(|(_position, item)| item.clone()).collect();
500
501 let recv = guard.state.update_sender.new_pinned_events_receiver();
502
503 Ok((events, recv))
504 }
505
506 #[cfg(feature = "e2e-encryption")]
510 pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> {
511 let mut guard = self.inner.state.write().await?;
512
513 if guard.state.chunk.replace_utds(events) {
514 guard.propagate_changes().await?;
515 guard.notify_subscribers(EventsOrigin::Cache);
516 }
517
518 Ok(())
519 }
520
521 #[instrument(skip_all, fields(room_id = %self.inner.room_id))]
523 pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
524 self.handle_timeline(updates.timeline).await?;
525
526 Ok(())
527 }
528
529 #[instrument(skip_all, fields(room_id = %self.inner.room_id))]
531 pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
532 self.handle_timeline(updates.timeline).await?;
533
534 Ok(())
535 }
536
537 async fn handle_timeline(&self, timeline: Timeline) -> Result<()> {
540 if timeline.events.is_empty() {
541 return Ok(());
542 }
543
544 trace!("adding new {} events", timeline.events.len());
545
546 self.inner.state.write().await?.handle_sync(timeline).await?;
547
548 Ok(())
549 }
550
551 #[instrument(fields(%room_id = room.room_id()), skip(room, inner))]
552 async fn pinned_event_listener_task(room: Room, inner: Arc<PinnedEventsCacheInner>) {
553 debug!("pinned events listener task started");
554
555 let reload_from_network = async |room: Room| {
556 let events = match Self::reload_pinned_events(room).await {
557 Ok(Some(events)) => events,
558 Ok(None) => Vec::new(),
559 Err(err) => {
560 warn!("error when loading pinned events: {err}");
561 return;
562 }
563 };
564
565 match inner.state.write().await {
568 Ok(mut guard) => {
569 guard.replace_all_events(events).await.unwrap_or_else(|err| {
570 warn!("error when replacing pinned events: {err}");
571 });
572 }
573
574 Err(err) => {
575 warn!("error when acquiring write lock to replace pinned events: {err}");
576 }
577 }
578 };
579
580 match inner.state.write().await {
582 Ok(mut guard) => {
583 guard.reload_from_storage().await.unwrap_or_else(|err| {
585 warn!("error when reloading pinned events from storage, at start: {err}");
586 });
587
588 let actual_pinned_events = room.pinned_event_ids().unwrap_or_default();
590 let reloaded_set =
591 guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
592
593 if actual_pinned_events.len() != reloaded_set.len()
594 || actual_pinned_events.iter().any(|event_id| !reloaded_set.contains(event_id))
595 {
596 drop(guard);
598 reload_from_network(room.clone()).await;
599 }
600 }
601
602 Err(err) => {
603 warn!("error when acquiring write lock to initialize pinned events: {err}");
604 }
605 }
606
607 let weak_room =
608 WeakRoom::new(WeakClient::from_client(&room.client()), room.room_id().to_owned());
609
610 let mut stream = room.pinned_event_ids_stream();
611
612 drop(room);
613
614 while let Some(new_list) = stream.next().await {
616 trace!("handling update");
617
618 let guard = match inner.state.read().await {
619 Ok(guard) => guard,
620 Err(err) => {
621 warn!("error when acquiring read lock to handle pinned events update: {err}");
622 break;
623 }
624 };
625
626 let current_set = guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
628
629 if !new_list.is_empty()
630 && new_list.iter().all(|event_id| current_set.contains(event_id))
631 {
632 continue;
634 }
635
636 let Some(room) = weak_room.get() else {
637 debug!("room has been dropped, ending pinned events listener task");
638 break;
639 };
640
641 drop(guard);
642
643 reload_from_network(room).await;
645 }
646
647 debug!("pinned events listener task ended");
648 }
649
650 async fn reload_pinned_events(room: Room) -> Result<Option<Vec<Event>>> {
659 let (max_events_to_load, max_concurrent_requests) = {
660 let client = room.client();
661 let config = client.event_cache().config();
662 (config.max_pinned_events_to_load, config.max_pinned_events_concurrent_requests)
663 };
664
665 let pinned_event_ids: Vec<OwnedEventId> = room
666 .pinned_event_ids()
667 .unwrap_or_default()
668 .into_iter()
669 .rev()
670 .take(max_events_to_load)
671 .rev()
672 .collect();
673
674 if pinned_event_ids.is_empty() {
675 return Ok(Some(Vec::new()));
676 }
677
678 let mut num_successful_loads = 0;
679
680 let mut loaded_events: Vec<Event> =
681 stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
682 let room = room.clone();
683 let filter = vec![RelationType::Annotation, RelationType::Replacement];
684 let request_config = RequestConfig::default().retry_limit(3);
685
686 async move {
687 let (target, mut relations) = room
688 .load_or_fetch_event_with_relations(
689 &event_id,
690 Some(filter),
691 Some(request_config),
692 )
693 .await?;
694
695 relations.insert(0, target);
696 Ok::<_, crate::Error>(relations)
697 }
698 }))
699 .buffer_unordered(max_concurrent_requests)
700 .inspect(|result| {
702 if result.is_ok() {
703 num_successful_loads += 1;
704 }
705 })
706 .flat_map(stream::iter)
708 .flat_map(stream::iter)
710 .collect()
711 .await;
712
713 if num_successful_loads != pinned_event_ids.len() {
714 warn!(
715 "only successfully loaded {} out of {} pinned events",
716 num_successful_loads,
717 pinned_event_ids.len()
718 );
719 }
720
721 if loaded_events.is_empty() {
722 return Err(EventCacheError::UnableToLoadPinnedEvents);
725 }
726
727 loaded_events.sort_by(compare_pinned_items);
732
733 Ok(Some(loaded_events))
734 }
735}
736
737impl fmt::Debug for PinnedEventsCache {
738 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
739 f.debug_struct("PinnedEventsCache").finish_non_exhaustive()
740 }
741}
742
743fn compare_pinned_items(a: &Event, b: &Event) -> Ordering {
744 let a_time: Option<MilliSecondsSinceUnixEpoch> = a.timestamp_raw();
745 let b_time: Option<MilliSecondsSinceUnixEpoch> = b.timestamp_raw();
746
747 compare_by_optional_timestamp(a_time, b_time)
748}
749
750fn compare_by_optional_timestamp(
751 a: Option<MilliSecondsSinceUnixEpoch>,
752 b: Option<MilliSecondsSinceUnixEpoch>,
753) -> Ordering {
754 match (a, b) {
755 (None, None) => Ordering::Equal,
756 (None, Some(_)) => Ordering::Greater,
757 (Some(_), None) => Ordering::Less,
758 (Some(a), Some(b)) => a.cmp(&b),
759 }
760}
761
762#[cfg(not(target_family = "wasm"))]
763#[cfg(test)]
764mod tests {
765 use proptest::prelude::*;
766 use ruma::UInt;
767
768 use super::*;
769
770 fn any_timestamp() -> impl Strategy<Value = Option<MilliSecondsSinceUnixEpoch>> {
771 prop::option::of(
772 any::<u32>().prop_map(|value| MilliSecondsSinceUnixEpoch(UInt::from(value))),
773 )
774 }
775
776 #[test]
777 fn sort_pinned_events_never_panics_only_nones() {
778 let mut vec = vec![None; 100_000];
779 vec.sort_by(|a, b| compare_by_optional_timestamp(*a, *b))
780 }
781
782 proptest! {
783 #[test]
784 fn sort_pinned_events_never_panics(mut v in prop::collection::vec(any_timestamp(), 0..1000)) {
785 v.sort_by(
786 |a, b| compare_by_optional_timestamp(*a, *b))
787 }
788
789 #[test]
790 fn compare_pinned_events_reflexive(a in any_timestamp()) {
791 prop_assert_eq!(compare_by_optional_timestamp(a, a), Ordering::Equal);
792 }
793
794 #[test]
795 fn compare_pinned_events_antisymmetric(a in any_timestamp(), b in any_timestamp()) {
796 let ab = compare_by_optional_timestamp(a, b);
797 let ba = compare_by_optional_timestamp(b, a);
798
799 prop_assert_eq!(ab, ba.reverse());
800 }
801
802 #[test]
803 fn compare_pinned_events_transitive(
804 a in any_timestamp(),
805 b in any_timestamp(),
806 c in any_timestamp()
807 ) {
808 let ab = compare_by_optional_timestamp(a, b);
809 let bc = compare_by_optional_timestamp(b, c);
810 let ac = compare_by_optional_timestamp(a, c);
811
812 if ab == Ordering::Less && bc == Ordering::Less {
813 prop_assert_eq!(ac, Ordering::Less);
814 }
815
816 if ab == Ordering::Equal && bc == Ordering::Equal {
817 prop_assert_eq!(ac, Ordering::Equal);
818 }
819
820 if ab == Ordering::Greater && bc == Ordering::Greater {
821 prop_assert_eq!(ac, Ordering::Greater);
822 }
823 }
824 }
825}