Skip to main content

matrix_sdk/event_cache/caches/pinned_events/
mod.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod updates;
16
17use std::{cmp::Ordering, collections::BTreeSet, fmt, sync::Arc};
18
19use eyeball_im::VectorDiff;
20use futures_util::{StreamExt as _, stream};
21use matrix_sdk_base::{
22    apply_redaction,
23    event_cache::{Event, Gap},
24    linked_chunk::{LinkedChunkId, OwnedLinkedChunkId, Position, Update},
25    serde_helpers::extract_redaction_target,
26    sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
27    task_monitor::BackgroundTaskHandle,
28};
29use matrix_sdk_common::executor::spawn;
30use ruma::{
31    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId,
32    events::{relation::RelationType, room::redaction::SyncRoomRedactionEvent},
33    room_version_rules::RoomVersionRules,
34};
35use tokio::sync::broadcast::{Receiver, Sender};
36use tracing::{debug, instrument, trace, warn};
37
38pub(super) use self::updates::PinnedEventsCacheUpdateSender;
39#[cfg(feature = "e2e-encryption")]
40use super::super::redecryptor::ResolvedUtd;
41use super::{
42    super::{
43        EventCacheError, EventsOrigin, Result,
44        deduplicator::{DeduplicationOutcome, filter_duplicate_events},
45        persistence::{find_event, send_updates_to_store},
46        states::{
47            CacheStateLock, ReloadPreprocessing, StateLock, StateLockWriteGuard,
48            selectors::PinnedEventsStateSelector,
49        },
50    },
51    EventLocation, TimelineVectorDiffs,
52    event_linked_chunk::{EventLinkedChunk, sort_positions_descending},
53    room::RoomEventCacheLinkedChunkUpdate,
54};
55use crate::{Room, client::WeakClient, config::RequestConfig, room::WeakRoom};
56
57pub struct PinnedEventsCacheState {
58    /// The ID of the room owning this list of pinned events.
59    room_id: OwnedRoomId,
60
61    /// The user's own user id.
62    own_user_id: OwnedUserId,
63
64    /// The rules for the version of this room.
65    room_version_rules: RoomVersionRules,
66
67    /// The linked chunk representing this room's pinned events.
68    ///
69    /// This linked chunk also contains related events. The events are sorted in
70    /// the chronological order (oldest to newest), since it would be otherwise
71    /// impossible to order them correctly, given that we fetch their
72    /// relations over time.
73    chunk: EventLinkedChunk,
74
75    /// Update sender for this pinned events cache.
76    pub update_sender: PinnedEventsCacheUpdateSender,
77
78    /// A sender for the globally observable linked chunk updates that happened
79    /// during a sync or a back-pagination.
80    ///
81    /// See also [`super::super::EventCacheInner::linked_chunk_update_sender`].
82    linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
83}
84
85#[cfg(not(tarpaulin_include))]
86impl fmt::Debug for PinnedEventsCacheState {
87    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
88        f.debug_struct("PinnedEventsCacheState")
89            .field("room_id", &self.room_id)
90            .field("chunk", &self.chunk)
91            .finish_non_exhaustive()
92    }
93}
94
95impl<'a> StateLockWriteGuard<'a, PinnedEventsCacheState> {
96    /// Reload the pinned-events: only the last events will be reloaded,
97    /// shrinking the in-memory size of the cache.
98    ///
99    /// If `preprocessing` is set to [`ReloadPreprocessing::ForgetAll`], all
100    /// events will be erased before reloaded.
101    #[must_use = "Propagate `VectorDiff` updates via `TimelineVectorDiffs`"]
102    pub async fn reload(
103        &mut self,
104        preprocessing: ReloadPreprocessing,
105    ) -> Result<Vec<VectorDiff<Event>>> {
106        match preprocessing {
107            ReloadPreprocessing::ForgetAll => {
108                // Clear the `LinkedChunk` and broadcast the updates to the store.
109                self.state.chunk.reset();
110                self.propagate_changes().await?;
111            }
112
113            ReloadPreprocessing::None => {}
114        }
115
116        // The task will notice there is a desynchronisation and will reload from
117        // network.
118        self.reload_from_storage().await?;
119
120        Ok(self.state.chunk.updates_as_vector_diffs())
121    }
122
123    async fn handle_sync(&mut self, timeline: Timeline) -> Result<()> {
124        let DeduplicationOutcome {
125            all_events: events,
126            in_memory_duplicated_event_ids,
127            in_store_duplicated_event_ids,
128            non_empty_all_duplicates: all_duplicates,
129        } = filter_duplicate_events(
130            &self.state.own_user_id,
131            &self.store,
132            LinkedChunkId::PinnedEvents(&self.state.room_id),
133            &self.state.chunk,
134            timeline.events,
135        )
136        .await?;
137
138        if all_duplicates {
139            // If all events are duplicates, we don't need to do anything; ignore
140            // the new events.
141            return Ok(());
142        }
143
144        // Remove the old duplicated events.
145        //
146        // We don't have to worry about the removals can change the position of the
147        // existing events, because we are pushing all _new_ `events` at the back.
148        self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids).await?;
149
150        // We've found new relations; append them to the linked chunk.
151        self.state.chunk.push_live_events(None, &events);
152
153        self.propagate_changes().await?;
154        self.notify_subscribers(EventsOrigin::Sync);
155
156        // Do stuff for each event.
157        for event in &events {
158            // Handle redaction.
159            self.maybe_apply_new_redaction(event).await?;
160        }
161
162        Ok(())
163    }
164
165    /// Remove events by their position, in `EventLinkedChunk`.
166    ///
167    /// This method is purposely isolated because it must ensure that
168    /// positions are sorted appropriately or it can be disastrous.
169    #[instrument(skip_all)]
170    pub async fn remove_events(
171        &mut self,
172        in_memory_events: Vec<(OwnedEventId, Position)>,
173        in_store_events: Vec<(OwnedEventId, Position)>,
174    ) -> Result<()> {
175        // In-store events.
176        if !in_store_events.is_empty() {
177            let mut positions = in_store_events
178                .into_iter()
179                .map(|(_event_id, position)| position)
180                .collect::<Vec<_>>();
181
182            sort_positions_descending(&mut positions);
183
184            let updates =
185                positions.into_iter().map(|pos| Update::RemoveItem { at: pos }).collect::<Vec<_>>();
186
187            self.apply_store_only_updates(updates).await?;
188        }
189
190        // In-memory events.
191        if in_memory_events.is_empty() {
192            // Nothing else to do, return early.
193            return Ok(());
194        }
195
196        // `remove_events_by_position` is responsible of sorting positions.
197        self.state
198            .chunk
199            .remove_events_by_position(
200                in_memory_events.into_iter().map(|(_event_id, position)| position).collect(),
201            )
202            .expect("failed to remove an event");
203
204        self.propagate_changes().await
205    }
206
207    /// Apply some updates that are effective only on the store itself.
208    ///
209    /// This method should be used only for updates that happen *outside*
210    /// the in-memory linked chunk. Such updates must be applied
211    /// onto the ordering tracker as well as to the persistent
212    /// storage.
213    async fn apply_store_only_updates(&mut self, updates: Vec<Update<Event, Gap>>) -> Result<()> {
214        self.state.chunk.order_tracker.map_updates(&updates);
215        self.send_updates_to_store(updates).await
216    }
217
218    /// If the given event is a redaction, try to retrieve the
219    /// to-be-redacted event in the chunk, and replace it by the
220    /// redacted form.
221    #[instrument(skip_all)]
222    async fn maybe_apply_new_redaction(&mut self, event: &Event) -> Result<()> {
223        let Some(event_id) =
224            extract_redaction_target(event.raw(), &self.room_version_rules.redaction)
225        else {
226            return Ok(());
227        };
228
229        // Replace the redacted event by a redacted form, if we knew about it.
230        let Some((location, mut target_event)) = self.find_event(&event_id).await? else {
231            trace!("redacted event is missing from the linked chunk");
232            return Ok(());
233        };
234
235        let target_event_raw = target_event.raw();
236
237        // Don't redact already redacted events.
238        if let Ok(deserialized) = target_event_raw.deserialize()
239            && deserialized.is_redacted()
240        {
241            return Ok(());
242        }
243
244        if let Some(redacted_event) = apply_redaction(
245            target_event_raw,
246            event.raw().cast_ref_unchecked::<SyncRoomRedactionEvent>(),
247            &self.room_version_rules.redaction,
248        ) {
249            // It's safe to cast `redacted_event` here:
250            // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
251            //   when calling .raw(), so it's still one under the hood.
252            // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
253            target_event.replace_raw(redacted_event.cast_unchecked());
254
255            self.replace_event_at(location, target_event.clone()).await?;
256        }
257
258        Ok(())
259    }
260
261    /// See documentation of [`find_event`].
262    pub(super) async fn find_event(
263        &self,
264        event_id: &EventId,
265    ) -> Result<Option<(EventLocation, Event)>> {
266        find_event(event_id, &self.room_id, &self.chunk, &self.store).await
267    }
268
269    /// Replaces a single event, be it saved in memory or in the store.
270    ///
271    /// If it was saved in memory, this will emit a notification to
272    /// observers that a single item has been replaced. Otherwise,
273    /// such a notification is not emitted, because observers are
274    /// unlikely to observe the store updates directly.
275    pub async fn replace_event_at(
276        &mut self,
277        location: EventLocation,
278        new_event: Event,
279    ) -> Result<()> {
280        match location {
281            EventLocation::Memory(position) => {
282                self.state
283                    .chunk
284                    .replace_event_at(position, new_event)
285                    .expect("should have been a valid position of an item");
286                // We just changed the in-memory representation; synchronize this with
287                // the store.
288                self.propagate_changes().await?;
289            }
290            EventLocation::Store => {
291                self.save_events([new_event]).await?;
292            }
293        }
294
295        Ok(())
296    }
297
298    /// Save events into the database, without notifying observers.
299    pub async fn save_events(&mut self, events: impl IntoIterator<Item = Event>) -> Result<()> {
300        let store = self.store.clone();
301        let room_id = self.state.room_id.clone();
302        let events = events.into_iter().collect::<Vec<_>>();
303
304        // Spawn a task so the save is uninterrupted by task cancellation.
305        spawn(async move {
306            for event in events {
307                store.save_event(&room_id, event).await?;
308            }
309
310            Result::Ok(())
311        })
312        .await
313        .expect("joining failed")?;
314
315        Ok(())
316    }
317
318    /// Reload all the pinned events from storage, replacing the current linked
319    /// chunk.
320    async fn reload_from_storage(&mut self) -> Result<()> {
321        let room_id = self.state.room_id.clone();
322        let linked_chunk_id = LinkedChunkId::PinnedEvents(&room_id);
323
324        let (last_chunk, chunk_id_gen) = self.store.load_last_chunk(linked_chunk_id).await?;
325
326        let Some(last_chunk) = last_chunk else {
327            // No pinned events stored, make sure the in-memory linked chunk is sync'd (i.e.
328            // empty), and return.
329            if self.state.chunk.events().next().is_some() {
330                self.state.chunk.reset();
331                self.notify_subscribers(EventsOrigin::Sync);
332            }
333
334            return Ok(());
335        };
336
337        {
338            let mut current_chunk_identifier = last_chunk.identifier;
339            self.state.chunk.replace_with(Some(last_chunk), chunk_id_gen)?;
340
341            // Reload the entire chunk.
342            while let Some(previous_chunk) =
343                self.store.load_previous_chunk(linked_chunk_id, current_chunk_identifier).await?
344            {
345                current_chunk_identifier = previous_chunk.identifier;
346                self.state.chunk.insert_new_chunk_as_first(previous_chunk)?;
347            }
348        }
349
350        // Empty store updates, since we just reloaded from storage.
351        self.state.chunk.store_updates().take();
352
353        // Let observers know about it.
354        self.notify_subscribers(EventsOrigin::Cache);
355
356        Ok(())
357    }
358
359    async fn replace_all_events(&mut self, new_events: Vec<Event>) -> Result<()> {
360        trace!("resetting all pinned events in linked chunk");
361
362        let previous_pinned_event_ids = self.state.current_event_ids();
363
364        if new_events
365            .iter()
366            .filter_map(|e| e.event_id())
367            .map(ToOwned::to_owned)
368            .collect::<BTreeSet<_>>()
369            == previous_pinned_event_ids.into_iter().collect()
370        {
371            // No change in the list of pinned events.
372            return Ok(());
373        }
374
375        if self.state.chunk.events().next().is_some() {
376            self.state.chunk.reset();
377        }
378
379        self.state.chunk.push_live_events(None, &new_events);
380        self.propagate_changes().await?;
381        self.notify_subscribers(EventsOrigin::Sync);
382
383        Ok(())
384    }
385
386    /// Propagate the changes in this linked chunk to observers, and save the
387    /// changes on disk.
388    pub async fn propagate_changes(&mut self) -> Result<()> {
389        let updates = self.state.chunk.store_updates().take();
390
391        self.send_updates_to_store(updates).await
392    }
393
394    async fn send_updates_to_store(&mut self, updates: Vec<Update<Event, Gap>>) -> Result<()> {
395        let linked_chunk_id = OwnedLinkedChunkId::PinnedEvents(self.room_id.clone());
396
397        send_updates_to_store(
398            &self.store,
399            linked_chunk_id,
400            &self.state.linked_chunk_update_sender,
401            updates,
402        )
403        .await
404    }
405
406    /// Notify subscribers of timeline updates.
407    fn notify_subscribers(&mut self, origin: EventsOrigin) {
408        let diffs = self.state.chunk.updates_as_vector_diffs();
409
410        if !diffs.is_empty() {
411            self.update_sender.send(TimelineVectorDiffs { diffs, origin });
412        }
413    }
414}
415
416impl PinnedEventsCacheState {
417    /// Return a list of the current event IDs in this linked chunk.
418    pub(super) fn current_event_ids(&self) -> Vec<OwnedEventId> {
419        self.chunk
420            .events()
421            .filter_map(|(_position, event)| event.event_id().map(ToOwned::to_owned))
422            .collect()
423    }
424}
425
426/// All the information related to room's pinned events..
427///
428/// Cloning is shallow, and thus is cheap to do.
429#[derive(Clone)]
430pub struct PinnedEventsCache {
431    inner: Arc<PinnedEventsCacheInner>,
432
433    /// The task handling the refreshing of pinned events for this specific
434    /// room.
435    _task: Arc<BackgroundTaskHandle>,
436}
437
438/// The (non-cloneable) details of the `PinnedEventsCache`.
439struct PinnedEventsCacheInner {
440    /// The ID of the room owning this list of pinned events.
441    room_id: OwnedRoomId,
442
443    /// State of this `PinnedEventsCache`.
444    ///
445    /// It is behind an `Arc` because it is shared with the task.
446    state: CacheStateLock<PinnedEventsStateSelector>,
447}
448
449impl PinnedEventsCache {
450    /// Creates a new [`PinnedEventsCache`] for the given room.
451    pub(in super::super) async fn new(
452        weak_room: &WeakRoom,
453        own_user_id: OwnedUserId,
454        room_version_rules: RoomVersionRules,
455        linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
456        state: &StateLock,
457    ) -> Result<Self> {
458        let room = weak_room.get().ok_or(EventCacheError::ClientDropped)?;
459        let room_id = room.room_id().to_owned();
460
461        let cache_state = state
462            .try_insert_once_with(
463                PinnedEventsStateSelector::new(room_id.clone()),
464                |_store_guard| async {
465                    Ok(PinnedEventsCacheState {
466                        room_id: room_id.clone(),
467                        own_user_id,
468                        room_version_rules,
469                        chunk: EventLinkedChunk::new(),
470                        update_sender: PinnedEventsCacheUpdateSender::new(),
471                        linked_chunk_update_sender,
472                    })
473                },
474            )
475            .await?;
476
477        let inner = Arc::new(PinnedEventsCacheInner { room_id, state: cache_state });
478
479        let task = room
480            .client()
481            .task_monitor()
482            .spawn_infinite_task(
483                "pinned_event_listener_task",
484                Self::pinned_event_listener_task(room, inner.clone()),
485            )
486            .abort_on_drop();
487
488        Ok(Self { inner, _task: Arc::new(task) })
489    }
490
491    /// Return a reference to the state.
492    pub(super) fn state(&self) -> &CacheStateLock<PinnedEventsStateSelector> {
493        &self.inner.state
494    }
495
496    /// Subscribe to live events from this room's pinned events cache.
497    pub async fn subscribe(&self) -> Result<(Vec<Event>, Receiver<TimelineVectorDiffs>)> {
498        let guard = self.inner.state.read().await?;
499        let events = guard.state.chunk.events().map(|(_position, item)| item.clone()).collect();
500
501        let recv = guard.state.update_sender.new_pinned_events_receiver();
502
503        Ok((events, recv))
504    }
505
506    /// Try to locate the events in the linked chunk corresponding to the given
507    /// list of decrypted events, and replace them, while alerting observers
508    /// about the update.
509    #[cfg(feature = "e2e-encryption")]
510    pub(in super::super) async fn replace_utds(&self, events: &[ResolvedUtd]) -> Result<()> {
511        let mut guard = self.inner.state.write().await?;
512
513        if guard.state.chunk.replace_utds(events) {
514            guard.propagate_changes().await?;
515            guard.notify_subscribers(EventsOrigin::Cache);
516        }
517
518        Ok(())
519    }
520
521    /// Handle a [`JoinedRoomUpdate`].
522    #[instrument(skip_all, fields(room_id = %self.inner.room_id))]
523    pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
524        self.handle_timeline(updates.timeline).await?;
525
526        Ok(())
527    }
528
529    /// Handle a [`LeftRoomUpdate`].
530    #[instrument(skip_all, fields(room_id = %self.inner.room_id))]
531    pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> {
532        self.handle_timeline(updates.timeline).await?;
533
534        Ok(())
535    }
536
537    /// Handle a [`Timeline`], i.e. new events received by a sync for this
538    /// thread.
539    async fn handle_timeline(&self, timeline: Timeline) -> Result<()> {
540        if timeline.events.is_empty() {
541            return Ok(());
542        }
543
544        trace!("adding new {} events", timeline.events.len());
545
546        self.inner.state.write().await?.handle_sync(timeline).await?;
547
548        Ok(())
549    }
550
551    #[instrument(fields(%room_id = room.room_id()), skip(room, inner))]
552    async fn pinned_event_listener_task(room: Room, inner: Arc<PinnedEventsCacheInner>) {
553        debug!("pinned events listener task started");
554
555        let reload_from_network = async |room: Room| {
556            let events = match Self::reload_pinned_events(room).await {
557                Ok(Some(events)) => events,
558                Ok(None) => Vec::new(),
559                Err(err) => {
560                    warn!("error when loading pinned events: {err}");
561                    return;
562                }
563            };
564
565            // Replace the whole linked chunk with those new events, and propagate updates
566            // to the observers.
567            match inner.state.write().await {
568                Ok(mut guard) => {
569                    guard.replace_all_events(events).await.unwrap_or_else(|err| {
570                        warn!("error when replacing pinned events: {err}");
571                    });
572                }
573
574                Err(err) => {
575                    warn!("error when acquiring write lock to replace pinned events: {err}");
576                }
577            }
578        };
579
580        // Reload the pinned events from the storage first.
581        match inner.state.write().await {
582            Ok(mut guard) => {
583                // On startup, reload the pinned events from storage.
584                guard.reload_from_storage().await.unwrap_or_else(|err| {
585                    warn!("error when reloading pinned events from storage, at start: {err}");
586                });
587
588                // Compare the initial list of pinned events to the one in the linked chunk.
589                let actual_pinned_events = room.pinned_event_ids().unwrap_or_default();
590                let reloaded_set =
591                    guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
592
593                if actual_pinned_events.len() != reloaded_set.len()
594                    || actual_pinned_events.iter().any(|event_id| !reloaded_set.contains(event_id))
595                {
596                    // Reload the list of pinned events from network.
597                    drop(guard);
598                    reload_from_network(room.clone()).await;
599                }
600            }
601
602            Err(err) => {
603                warn!("error when acquiring write lock to initialize pinned events: {err}");
604            }
605        }
606
607        let weak_room =
608            WeakRoom::new(WeakClient::from_client(&room.client()), room.room_id().to_owned());
609
610        let mut stream = room.pinned_event_ids_stream();
611
612        drop(room);
613
614        // Whenever the list of pinned events changes, reload it.
615        while let Some(new_list) = stream.next().await {
616            trace!("handling update");
617
618            let guard = match inner.state.read().await {
619                Ok(guard) => guard,
620                Err(err) => {
621                    warn!("error when acquiring read lock to handle pinned events update: {err}");
622                    break;
623                }
624            };
625
626            // Compare to the current linked chunk.
627            let current_set = guard.state.current_event_ids().into_iter().collect::<BTreeSet<_>>();
628
629            if !new_list.is_empty()
630                && new_list.iter().all(|event_id| current_set.contains(event_id))
631            {
632                // All the events in the pinned list are the same, don't reload.
633                continue;
634            }
635
636            let Some(room) = weak_room.get() else {
637                debug!("room has been dropped, ending pinned events listener task");
638                break;
639            };
640
641            drop(guard);
642
643            // Event IDs differ, so reload all the pinned events.
644            reload_from_network(room).await;
645        }
646
647        debug!("pinned events listener task ended");
648    }
649
650    /// Loads the pinned events in this room, using the cache first and then
651    /// requesting the event from the homeserver if it couldn't be found.
652    /// This method will perform as many concurrent requests for events as
653    /// `max_concurrent_requests` allows, to avoid overwhelming the server.
654    ///
655    /// Returns `None` if the list of pinned events hasn't changed since the
656    /// previous time we loaded them. May return an error if there was an
657    /// issue fetching the full events.
658    async fn reload_pinned_events(room: Room) -> Result<Option<Vec<Event>>> {
659        let (max_events_to_load, max_concurrent_requests) = {
660            let client = room.client();
661            let config = client.event_cache().config();
662            (config.max_pinned_events_to_load, config.max_pinned_events_concurrent_requests)
663        };
664
665        let pinned_event_ids: Vec<OwnedEventId> = room
666            .pinned_event_ids()
667            .unwrap_or_default()
668            .into_iter()
669            .rev()
670            .take(max_events_to_load)
671            .rev()
672            .collect();
673
674        if pinned_event_ids.is_empty() {
675            return Ok(Some(Vec::new()));
676        }
677
678        let mut num_successful_loads = 0;
679
680        let mut loaded_events: Vec<Event> =
681            stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
682                let room = room.clone();
683                let filter = vec![RelationType::Annotation, RelationType::Replacement];
684                let request_config = RequestConfig::default().retry_limit(3);
685
686                async move {
687                    let (target, mut relations) = room
688                        .load_or_fetch_event_with_relations(
689                            &event_id,
690                            Some(filter),
691                            Some(request_config),
692                        )
693                        .await?;
694
695                    relations.insert(0, target);
696                    Ok::<_, crate::Error>(relations)
697                }
698            }))
699            .buffer_unordered(max_concurrent_requests)
700            // Count successful queries.
701            .inspect(|result| {
702                if result.is_ok() {
703                    num_successful_loads += 1;
704                }
705            })
706            // Get rid of error results.
707            .flat_map(stream::iter)
708            // Flatten the list of `Vec<Event>` into a list of `Event`.
709            .flat_map(stream::iter)
710            .collect()
711            .await;
712
713        if num_successful_loads != pinned_event_ids.len() {
714            warn!(
715                "only successfully loaded {} out of {} pinned events",
716                num_successful_loads,
717                pinned_event_ids.len()
718            );
719        }
720
721        if loaded_events.is_empty() {
722            // If the list of loaded events is empty, we ran into an error to load *all* the
723            // pinned events, which needs to be reported to the caller.
724            return Err(EventCacheError::UnableToLoadPinnedEvents);
725        }
726
727        // Since we have all the events and their related events, we can't nicely sort
728        // them, since we've lost all ordering information from using /event or
729        // /relations. Resort to sorting using chronological ordering (oldest ->
730        // newest).
731        loaded_events.sort_by(compare_pinned_items);
732
733        Ok(Some(loaded_events))
734    }
735}
736
737impl fmt::Debug for PinnedEventsCache {
738    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
739        f.debug_struct("PinnedEventsCache").finish_non_exhaustive()
740    }
741}
742
743fn compare_pinned_items(a: &Event, b: &Event) -> Ordering {
744    let a_time: Option<MilliSecondsSinceUnixEpoch> = a.timestamp_raw();
745    let b_time: Option<MilliSecondsSinceUnixEpoch> = b.timestamp_raw();
746
747    compare_by_optional_timestamp(a_time, b_time)
748}
749
750fn compare_by_optional_timestamp(
751    a: Option<MilliSecondsSinceUnixEpoch>,
752    b: Option<MilliSecondsSinceUnixEpoch>,
753) -> Ordering {
754    match (a, b) {
755        (None, None) => Ordering::Equal,
756        (None, Some(_)) => Ordering::Greater,
757        (Some(_), None) => Ordering::Less,
758        (Some(a), Some(b)) => a.cmp(&b),
759    }
760}
761
762#[cfg(not(target_family = "wasm"))]
763#[cfg(test)]
764mod tests {
765    use proptest::prelude::*;
766    use ruma::UInt;
767
768    use super::*;
769
770    fn any_timestamp() -> impl Strategy<Value = Option<MilliSecondsSinceUnixEpoch>> {
771        prop::option::of(
772            any::<u32>().prop_map(|value| MilliSecondsSinceUnixEpoch(UInt::from(value))),
773        )
774    }
775
776    #[test]
777    fn sort_pinned_events_never_panics_only_nones() {
778        let mut vec = vec![None; 100_000];
779        vec.sort_by(|a, b| compare_by_optional_timestamp(*a, *b))
780    }
781
782    proptest! {
783    #[test]
784    fn sort_pinned_events_never_panics(mut v in prop::collection::vec(any_timestamp(), 0..1000)) {
785        v.sort_by(
786            |a, b| compare_by_optional_timestamp(*a, *b))
787    }
788
789    #[test]
790    fn compare_pinned_events_reflexive(a in any_timestamp()) {
791        prop_assert_eq!(compare_by_optional_timestamp(a, a), Ordering::Equal);
792    }
793
794    #[test]
795    fn compare_pinned_events_antisymmetric(a in any_timestamp(), b in any_timestamp()) {
796        let ab = compare_by_optional_timestamp(a, b);
797        let ba = compare_by_optional_timestamp(b, a);
798
799        prop_assert_eq!(ab, ba.reverse());
800    }
801
802    #[test]
803    fn compare_pinned_events_transitive(
804        a in any_timestamp(),
805        b in any_timestamp(),
806        c in any_timestamp()
807    ) {
808        let ab = compare_by_optional_timestamp(a, b);
809        let bc = compare_by_optional_timestamp(b, c);
810        let ac = compare_by_optional_timestamp(a, c);
811
812        if ab == Ordering::Less && bc == Ordering::Less {
813            prop_assert_eq!(ac, Ordering::Less);
814        }
815
816        if ab == Ordering::Equal && bc == Ordering::Equal {
817            prop_assert_eq!(ac, Ordering::Equal);
818        }
819
820        if ab == Ordering::Greater && bc == Ordering::Greater {
821            prop_assert_eq!(ac, Ordering::Greater);
822        }
823    }
824    }
825}