matrix_sdk_ui/timeline/controller/
read_receipts.rs

1// Copyright 2023 Kévin Commaille
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{cmp::Ordering, collections::HashMap};
16
17use futures_core::Stream;
18use indexmap::IndexMap;
19use ruma::{
20    events::receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
21    EventId, OwnedEventId, OwnedUserId, UserId,
22};
23use tokio::sync::watch;
24use tokio_stream::wrappers::WatchStream;
25use tracing::{debug, error, instrument, trace, warn};
26
27use super::{
28    rfind_event_by_id, AllRemoteEvents, FullEventMeta, ObservableItemsTransaction,
29    RelativePosition, RoomDataProvider, TimelineMetadata, TimelineState,
30};
31use crate::timeline::{controller::TimelineStateTransaction, TimelineItem};
32
33/// In-memory caches for read receipts.
34#[derive(Clone, Debug, Default)]
35pub(super) struct ReadReceipts {
36    /// Map of public read receipts on events.
37    ///
38    /// Event ID => User ID => Read receipt of the user.
39    by_event: HashMap<OwnedEventId, IndexMap<OwnedUserId, Receipt>>,
40
41    /// In-memory cache of all latest read receipts by user.
42    ///
43    /// User ID => Receipt type => Read receipt of the user of the given
44    /// type.
45    latest_by_user: HashMap<OwnedUserId, HashMap<ReceiptType, (OwnedEventId, Receipt)>>,
46
47    /// A sender to notify of changes to the receipts of our own user.
48    own_user_read_receipts_changed_sender: watch::Sender<()>,
49}
50
51impl ReadReceipts {
52    /// Empty the caches.
53    pub(super) fn clear(&mut self) {
54        self.by_event.clear();
55        self.latest_by_user.clear();
56    }
57
58    /// Subscribe to changes in the read receipts of our own user.
59    pub(super) fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
60        let subscriber = self.own_user_read_receipts_changed_sender.subscribe();
61        WatchStream::from_changes(subscriber)
62    }
63
64    /// Read the latest read receipt of the given type for the given user, from
65    /// the in-memory cache.
66    fn get_latest(
67        &self,
68        user_id: &UserId,
69        receipt_type: &ReceiptType,
70    ) -> Option<&(OwnedEventId, Receipt)> {
71        self.latest_by_user.get(user_id).and_then(|map| map.get(receipt_type))
72    }
73
74    /// Insert or update in the local cache the latest read receipt for the
75    /// given user.
76    fn upsert_latest(
77        &mut self,
78        user_id: OwnedUserId,
79        receipt_type: ReceiptType,
80        read_receipt: (OwnedEventId, Receipt),
81    ) {
82        self.latest_by_user.entry(user_id).or_default().insert(receipt_type, read_receipt);
83    }
84
85    /// Update the timeline items with the given read receipt if it is more
86    /// recent than the current one.
87    ///
88    /// In the process, if applicable, this method updates the inner maps to use
89    /// the new receipt. If `is_own_user_id` is `false`, it also updates the
90    /// receipts on the corresponding timeline items.
91    ///
92    /// Currently this method only works reliably if the timeline was started
93    /// from the end of the timeline.
94    #[instrument(skip_all, fields(user_id = %new_receipt.user_id, event_id = %new_receipt.event_id))]
95    fn maybe_update_read_receipt(
96        &mut self,
97        new_receipt: FullReceipt<'_>,
98        is_own_user_id: bool,
99        timeline_items: &mut ObservableItemsTransaction<'_>,
100    ) {
101        let all_events = timeline_items.all_remote_events();
102
103        // Get old receipt.
104        let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type);
105
106        if old_receipt
107            .as_ref()
108            .is_some_and(|(old_receipt_event_id, _)| old_receipt_event_id == new_receipt.event_id)
109        {
110            // The receipt has not changed so there is nothing to do.
111            if !is_own_user_id {
112                trace!("receipt hasn't changed, nothing to do");
113            }
114            return;
115        }
116
117        let old_event_id = old_receipt.map(|(event_id, _)| event_id);
118
119        // Find receipts positions.
120        let mut old_receipt_pos = None;
121        let mut old_item_pos = None;
122        let mut old_item_event_id = None;
123        let mut new_receipt_pos = None;
124        let mut new_item_pos = None;
125        let mut new_item_event_id = None;
126
127        for (pos, event) in all_events.iter().rev().enumerate() {
128            if old_receipt_pos.is_none() && old_event_id == Some(&event.event_id) {
129                old_receipt_pos = Some(pos);
130            }
131
132            // The receipt should appear on the first event that is visible.
133            if old_receipt_pos.is_some() && old_item_event_id.is_none() && event.visible {
134                old_item_pos = event.timeline_item_index;
135                old_item_event_id = Some(event.event_id.clone());
136            }
137
138            if new_receipt_pos.is_none() && new_receipt.event_id == event.event_id {
139                new_receipt_pos = Some(pos);
140            }
141
142            // The receipt should appear on the first event that is visible.
143            if new_receipt_pos.is_some() && new_item_event_id.is_none() && event.visible {
144                new_item_pos = event.timeline_item_index;
145                new_item_event_id = Some(event.event_id.clone());
146            }
147
148            if old_item_event_id.is_some() && new_item_event_id.is_some() {
149                // We have everything we need, stop.
150                break;
151            }
152        }
153
154        // Check if the old receipt is more recent than the new receipt.
155        if let Some(old_receipt_pos) = old_receipt_pos {
156            let Some(new_receipt_pos) = new_receipt_pos else {
157                // The old receipt is more recent since we can't find the new receipt in the
158                // timeline and we supposedly have all events since the end of the timeline.
159                if !is_own_user_id {
160                    trace!("we had a previous read receipt, but couldn't find the event targeted by the new read receipt in the timeline, exiting");
161                }
162                return;
163            };
164
165            if old_receipt_pos < new_receipt_pos {
166                // The old receipt is more recent than the new one.
167                if !is_own_user_id {
168                    trace!("the previous read receipt is more recent than the new one, exiting");
169                }
170                return;
171            }
172        }
173
174        // The new receipt is deemed more recent from now on because:
175        // - If old_receipt_pos is Some, we already checked all the cases where it
176        //   wouldn't be more recent.
177        // - If both old_receipt_pos and new_receipt_pos are None, they are both
178        //   explicit read receipts so the server should only send us a more recent
179        //   receipt.
180        // - If old_receipt_pos is None and new_receipt_pos is Some, the new receipt is
181        //   more recent because it has a place in the timeline.
182
183        if !is_own_user_id {
184            trace!(from_event = ?old_event_id, from_visible_event = ?old_item_event_id, to_event = ?new_receipt.event_id, to_visible_event = ?new_item_event_id, ?old_item_pos, ?new_item_pos, "moving read receipt");
185
186            // Remove the old receipt from the old event.
187            if let Some(old_event_id) = old_event_id.cloned() {
188                self.remove_event_receipt_for_user(&old_event_id, new_receipt.user_id);
189            }
190
191            // Add the new receipt to the new event.
192            self.add_event_receipt_for_user(
193                new_receipt.event_id.to_owned(),
194                new_receipt.user_id.to_owned(),
195                new_receipt.receipt.clone(),
196            );
197        }
198
199        // Update the receipt of the user.
200        self.upsert_latest(
201            new_receipt.user_id.to_owned(),
202            new_receipt.receipt_type,
203            (new_receipt.event_id.to_owned(), new_receipt.receipt.clone()),
204        );
205
206        if is_own_user_id {
207            self.own_user_read_receipts_changed_sender.send_replace(());
208            // This receipt cannot change items in the timeline.
209            return;
210        }
211
212        if new_item_event_id == old_item_event_id {
213            // The receipt did not change in the timeline.
214            return;
215        }
216
217        let timeline_update = ReadReceiptTimelineUpdate {
218            old_item_pos,
219            old_event_id: old_item_event_id,
220            new_item_pos,
221            new_event_id: new_item_event_id,
222        };
223
224        timeline_update.apply(
225            timeline_items,
226            new_receipt.user_id.to_owned(),
227            new_receipt.receipt.clone(),
228        );
229    }
230
231    /// Returns the cached receipts by user for a given `event_id`.
232    fn get_event_receipts(&self, event_id: &EventId) -> Option<&IndexMap<OwnedUserId, Receipt>> {
233        self.by_event.get(event_id)
234    }
235
236    /// Mark the given event as seen by the user with the given receipt.
237    fn add_event_receipt_for_user(
238        &mut self,
239        event_id: OwnedEventId,
240        user_id: OwnedUserId,
241        receipt: Receipt,
242    ) {
243        self.by_event.entry(event_id).or_default().insert(user_id, receipt);
244    }
245
246    /// Unmark the given event as seen by the user.
247    fn remove_event_receipt_for_user(&mut self, event_id: &EventId, user_id: &UserId) {
248        if let Some(map) = self.by_event.get_mut(event_id) {
249            map.swap_remove(user_id);
250            // Remove the entire map if this was the last entry.
251            if map.is_empty() {
252                self.by_event.remove(event_id);
253            }
254        }
255    }
256
257    /// Get the read receipts by user for the given event.
258    ///
259    /// This includes all the receipts on the event as well as all the receipts
260    /// on the following events that are filtered out (not visible).
261    #[instrument(skip(self, timeline_items, at_end))]
262    pub(super) fn compute_event_receipts(
263        &self,
264        event_id: &EventId,
265        timeline_items: &mut ObservableItemsTransaction<'_>,
266        at_end: bool,
267    ) -> IndexMap<OwnedUserId, Receipt> {
268        let mut all_receipts = self.get_event_receipts(event_id).cloned().unwrap_or_default();
269
270        if at_end {
271            // No need to search for extra receipts, there are no events after.
272            trace!(
273                "early return because @end, retrieved receipts: {}",
274                all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
275            );
276            return all_receipts;
277        }
278
279        trace!(
280            "loaded receipts: {}",
281            all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
282        );
283
284        // We are going to add receipts for hidden events to this item.
285        //
286        // However: since we may be inserting an event at a random position, the
287        // previous timeline item may already be holding some hidden read
288        // receipts. As a result, we need to be careful here: if we're inserting
289        // after an event that holds hidden read receipts, then we should steal
290        // them from it.
291        //
292        // Find the event, go past it, and keep a reference to the previous rendered
293        // timeline item, if any.
294        let mut events_iter = timeline_items.all_remote_events().iter();
295        let mut prev_event_and_item_index = None;
296
297        for meta in events_iter.by_ref() {
298            if meta.event_id == event_id {
299                break;
300            }
301            if let Some(item_index) = meta.timeline_item_index {
302                prev_event_and_item_index = Some((meta.event_id.clone(), item_index));
303            }
304        }
305
306        // Include receipts for all the following non-visible events.
307        let mut hidden = Vec::new();
308        for hidden_event_meta in events_iter.take_while(|meta| !meta.visible) {
309            if let Some(event_receipts) = self.get_event_receipts(&hidden_event_meta.event_id) {
310                trace!(%hidden_event_meta.event_id, "found receipts on hidden event");
311                hidden.extend(event_receipts.clone());
312            }
313        }
314
315        // Steal hidden receipts from the previous timeline item, if it carried them.
316        if let Some((prev_event_id, prev_item_index)) = prev_event_and_item_index {
317            let prev_item = &timeline_items[prev_item_index];
318            // Technically, we could unwrap the `as_event()`, because this is a rendered
319            // item for an event in all_remote_events, but this extra check is
320            // cheap.
321            if let Some(remote_prev_item) = prev_item.as_event() {
322                let prev_receipts = remote_prev_item.read_receipts().clone();
323                for (user_id, _) in &hidden {
324                    if !prev_receipts.contains_key(user_id) {
325                        continue;
326                    }
327                    let mut up = ReadReceiptTimelineUpdate {
328                        old_item_pos: Some(prev_item_index),
329                        old_event_id: Some(prev_event_id.clone()),
330                        new_item_pos: None,
331                        new_event_id: None,
332                    };
333                    up.remove_old_receipt(timeline_items, user_id);
334                }
335            }
336        }
337
338        all_receipts.extend(hidden);
339        trace!(
340            "computed receipts: {}",
341            all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
342        );
343        all_receipts
344    }
345}
346
347struct FullReceipt<'a> {
348    event_id: &'a EventId,
349    user_id: &'a UserId,
350    receipt_type: ReceiptType,
351    receipt: &'a Receipt,
352}
353
354/// A read receipt update in the timeline.
355#[derive(Clone, Debug, Default)]
356struct ReadReceiptTimelineUpdate {
357    /// The position of the timeline item that had the old receipt of the user,
358    /// if any.
359    old_item_pos: Option<usize>,
360    /// The old event that had the receipt of the user, if any.
361    old_event_id: Option<OwnedEventId>,
362    /// The position of the timeline item that has the new receipt of the user,
363    /// if any.
364    new_item_pos: Option<usize>,
365    /// The new event that has the receipt of the user, if any.
366    new_event_id: Option<OwnedEventId>,
367}
368
369impl ReadReceiptTimelineUpdate {
370    /// Remove the old receipt from the corresponding timeline item.
371    #[instrument(skip_all)]
372    fn remove_old_receipt(&mut self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) {
373        let Some(event_id) = &self.old_event_id else {
374            // Nothing to do.
375            return;
376        };
377
378        let item_pos = self.old_item_pos.or_else(|| {
379            items
380                .iter()
381                .enumerate()
382                .rev()
383                .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
384                .find_map(|(nth, event_item)| {
385                    (event_item.event_id() == Some(event_id)).then_some(nth)
386                })
387        });
388
389        let Some(item_pos) = item_pos else {
390            debug!(%event_id, %user_id, "inconsistent state: old event item for read receipt was not found");
391            return;
392        };
393
394        self.old_item_pos = Some(item_pos);
395
396        let event_item = &items[item_pos];
397        let event_item_id = event_item.unique_id().to_owned();
398
399        let Some(mut event_item) = event_item.as_event().cloned() else {
400            warn!("received a read receipt for a virtual item, this should not be possible");
401            return;
402        };
403
404        if let Some(remote_event_item) = event_item.as_remote_mut() {
405            if remote_event_item.read_receipts.swap_remove(user_id).is_none() {
406                debug!(
407                    %event_id, %user_id,
408                    "inconsistent state: old event item for user's read \
409                     receipt doesn't have a receipt for the user"
410                );
411            }
412            trace!(%user_id, %event_id, "removed read receipt from event item");
413            items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
414        } else {
415            warn!("received a read receipt for a local item, this should not be possible");
416        }
417    }
418
419    /// Add the new receipt to the corresponding timeline item.
420    #[instrument(skip_all)]
421    fn add_new_receipt(
422        self,
423        items: &mut ObservableItemsTransaction<'_>,
424        user_id: OwnedUserId,
425        receipt: Receipt,
426    ) {
427        let Some(event_id) = self.new_event_id else {
428            // Nothing to do.
429            return;
430        };
431
432        let item_pos = self.new_item_pos.or_else(|| {
433            items
434                .iter()
435                .enumerate()
436                // Don't iterate over all items if the `old_item_pos` is known: the `item_pos`
437                // for the new item is necessarily _after_ the old item.
438                .skip(self.old_item_pos.unwrap_or(0))
439                .rev()
440                .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
441                .find_map(|(nth, event_item)| {
442                    (event_item.event_id() == Some(&event_id)).then_some(nth)
443                })
444        });
445
446        let Some(item_pos) = item_pos else {
447            debug!(%event_id, %user_id, "inconsistent state: new event item for read receipt was not found");
448            return;
449        };
450
451        debug_assert!(
452            item_pos >= self.old_item_pos.unwrap_or(0),
453            "The new receipt must be added on a timeline item that is _after_ the timeline item that was holding the old receipt");
454
455        let event_item = &items[item_pos];
456        let event_item_id = event_item.unique_id().to_owned();
457
458        let Some(mut event_item) = event_item.as_event().cloned() else {
459            warn!("received a read receipt for a virtual item, this should not be possible");
460            return;
461        };
462
463        if let Some(remote_event_item) = event_item.as_remote_mut() {
464            trace!(%user_id, %event_id, "added read receipt to event item");
465            remote_event_item.read_receipts.insert(user_id, receipt);
466            items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
467        } else {
468            warn!("received a read receipt for a local item, this should not be possible");
469        }
470    }
471
472    /// Apply this update to the timeline.
473    fn apply(
474        mut self,
475        items: &mut ObservableItemsTransaction<'_>,
476        user_id: OwnedUserId,
477        receipt: Receipt,
478    ) {
479        self.remove_old_receipt(items, &user_id);
480        self.add_new_receipt(items, user_id, receipt);
481    }
482}
483
484impl TimelineStateTransaction<'_> {
485    pub(super) fn handle_explicit_read_receipts(
486        &mut self,
487        receipt_event_content: ReceiptEventContent,
488        own_user_id: &UserId,
489    ) {
490        trace!("handling explicit read receipts");
491        for (event_id, receipt_types) in receipt_event_content.0 {
492            for (receipt_type, receipts) in receipt_types {
493                // We only care about read receipts here.
494                if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
495                    continue;
496                }
497
498                for (user_id, receipt) in receipts {
499                    if !matches!(receipt.thread, ReceiptThread::Unthreaded | ReceiptThread::Main) {
500                        continue;
501                    }
502
503                    let is_own_user_id = user_id == own_user_id;
504                    let full_receipt = FullReceipt {
505                        event_id: &event_id,
506                        user_id: &user_id,
507                        receipt_type: receipt_type.clone(),
508                        receipt: &receipt,
509                    };
510
511                    self.meta.read_receipts.maybe_update_read_receipt(
512                        full_receipt,
513                        is_own_user_id,
514                        &mut self.items,
515                    );
516                }
517            }
518        }
519    }
520
521    /// Load the read receipts from the store for the given event ID.
522    ///
523    /// Populates the read receipts in-memory caches.
524    pub(super) async fn load_read_receipts_for_event<P: RoomDataProvider>(
525        &mut self,
526        event_id: &EventId,
527        room_data_provider: &P,
528    ) {
529        trace!(%event_id, "loading initial receipts for an event");
530        let read_receipts = room_data_provider.load_event_receipts(event_id).await;
531        let own_user_id = room_data_provider.own_user_id();
532
533        // Since they are explicit read receipts, we need to check if they are
534        // superseded by implicit read receipts.
535        for (user_id, receipt) in read_receipts {
536            let full_receipt = FullReceipt {
537                event_id,
538                user_id: &user_id,
539                receipt_type: ReceiptType::Read,
540                receipt: &receipt,
541            };
542
543            self.meta.read_receipts.maybe_update_read_receipt(
544                full_receipt,
545                user_id == own_user_id,
546                &mut self.items,
547            );
548        }
549    }
550
551    /// Add an implicit read receipt to the given event item, if it is more
552    /// recent than the current read receipt for the sender of the event.
553    ///
554    /// According to the spec, read receipts should not point to events sent by
555    /// our own user, but these events are used to reset the notification
556    /// count, so we need to handle them locally too. For that we create an
557    /// "implicit" read receipt, compared to the "explicit" ones sent by the
558    /// client.
559    pub(super) fn maybe_add_implicit_read_receipt(&mut self, event_meta: FullEventMeta<'_>) {
560        let FullEventMeta { event_id, sender, is_own_event, timestamp, .. } = event_meta;
561
562        let (Some(user_id), Some(timestamp)) = (sender, timestamp) else {
563            // We cannot add a read receipt if we do not know the user or the timestamp.
564            return;
565        };
566
567        trace!(%event_id, "adding implicit read receipt");
568        let receipt = Receipt::new(timestamp);
569        let full_receipt =
570            FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
571
572        self.meta.read_receipts.maybe_update_read_receipt(
573            full_receipt,
574            is_own_event,
575            &mut self.items,
576        );
577    }
578
579    /// Update the read receipts on the event with the given event ID and the
580    /// previous visible event because of a visibility change.
581    #[instrument(skip(self))]
582    pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) {
583        // Find the previous visible event, if there is one.
584        let Some(prev_event_meta) = self
585            .items
586            .all_remote_events()
587            .iter()
588            .rev()
589            // Find the event item.
590            .skip_while(|meta| meta.event_id != event_id)
591            // Go past the event item.
592            .skip(1)
593            // Find the first visible item.
594            .find(|meta| meta.visible)
595        else {
596            trace!("Couldn't find any previous visible event, exiting");
597            return;
598        };
599
600        let Some((prev_item_pos, prev_event_item)) =
601            rfind_event_by_id(&self.items, &prev_event_meta.event_id)
602        else {
603            error!("inconsistent state: timeline item of visible event was not found");
604            return;
605        };
606
607        let prev_event_item_id = prev_event_item.internal_id.to_owned();
608        let mut prev_event_item = prev_event_item.clone();
609
610        let Some(remote_prev_event_item) = prev_event_item.as_remote_mut() else {
611            warn!("loading read receipts for a local item, this should not be possible");
612            return;
613        };
614
615        let read_receipts = self.meta.read_receipts.compute_event_receipts(
616            &remote_prev_event_item.event_id,
617            &mut self.items,
618            false,
619        );
620
621        // If the count did not change, the receipts did not change either.
622        if read_receipts.len() == remote_prev_event_item.read_receipts.len() {
623            trace!("same count of read receipts, not doing anything");
624            return;
625        }
626
627        trace!("replacing read receipts with the new ones");
628        remote_prev_event_item.read_receipts = read_receipts;
629        self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id));
630    }
631}
632
633impl TimelineState {
634    /// Populates our own latest read receipt in the in-memory by-user read
635    /// receipt cache.
636    pub(super) async fn populate_initial_user_receipt<P: RoomDataProvider>(
637        &mut self,
638        room_data_provider: &P,
639        receipt_type: ReceiptType,
640    ) {
641        let own_user_id = room_data_provider.own_user_id().to_owned();
642
643        let mut read_receipt = room_data_provider
644            .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, &own_user_id)
645            .await;
646
647        // Fallback to the one in the main thread.
648        if read_receipt.is_none() {
649            read_receipt = room_data_provider
650                .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
651                .await;
652        }
653
654        if let Some(read_receipt) = read_receipt {
655            self.meta.read_receipts.upsert_latest(own_user_id, receipt_type, read_receipt);
656        }
657    }
658
659    /// Get the latest read receipt for the given user.
660    ///
661    /// Useful to get the latest read receipt, whether it's private or public.
662    pub(super) async fn latest_user_read_receipt<P: RoomDataProvider>(
663        &self,
664        user_id: &UserId,
665        room_data_provider: &P,
666    ) -> Option<(OwnedEventId, Receipt)> {
667        let all_remote_events = self.items.all_remote_events();
668        let public_read_receipt = self
669            .meta
670            .user_receipt(user_id, ReceiptType::Read, room_data_provider, all_remote_events)
671            .await;
672        let private_read_receipt = self
673            .meta
674            .user_receipt(user_id, ReceiptType::ReadPrivate, room_data_provider, all_remote_events)
675            .await;
676
677        // Let's assume that a private read receipt should be more recent than a public
678        // read receipt, otherwise there's no point in the private read receipt,
679        // and use it as default.
680        match self.meta.compare_optional_receipts(
681            public_read_receipt.as_ref(),
682            private_read_receipt.as_ref(),
683            self.items.all_remote_events(),
684        ) {
685            Ordering::Greater => public_read_receipt,
686            Ordering::Less => private_read_receipt,
687            _ => unreachable!(),
688        }
689    }
690
691    /// Get the ID of the visible timeline event with the latest read receipt
692    /// for the given user.
693    pub(super) fn latest_user_read_receipt_timeline_event_id(
694        &self,
695        user_id: &UserId,
696    ) -> Option<OwnedEventId> {
697        // We only need to use the local map, since receipts for known events are
698        // already loaded from the store.
699        let public_read_receipt = self.meta.read_receipts.get_latest(user_id, &ReceiptType::Read);
700        let private_read_receipt =
701            self.meta.read_receipts.get_latest(user_id, &ReceiptType::ReadPrivate);
702
703        // Let's assume that a private read receipt should be more recent than a public
704        // read receipt, otherwise there's no point in the private read receipt,
705        // and use it as default.
706        let (latest_receipt_id, _) = match self.meta.compare_optional_receipts(
707            public_read_receipt,
708            private_read_receipt,
709            self.items.all_remote_events(),
710        ) {
711            Ordering::Greater => public_read_receipt?,
712            Ordering::Less => private_read_receipt?,
713            _ => unreachable!(),
714        };
715
716        // Find the corresponding visible event.
717        self.items
718            .all_remote_events()
719            .iter()
720            .rev()
721            .skip_while(|ev| ev.event_id != *latest_receipt_id)
722            .find(|ev| ev.visible)
723            .map(|ev| ev.event_id.clone())
724    }
725}
726
727impl TimelineMetadata {
728    /// Get the latest receipt of the given type for the given user in the
729    /// timeline.
730    ///
731    /// This will attempt to read the latest user receipt for a user from the
732    /// cache, or load it from the storage if missing from the cache.
733    pub(super) async fn user_receipt<P: RoomDataProvider>(
734        &self,
735        user_id: &UserId,
736        receipt_type: ReceiptType,
737        room_data_provider: &P,
738        all_remote_events: &AllRemoteEvents,
739    ) -> Option<(OwnedEventId, Receipt)> {
740        if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) {
741            // Since it is in the timeline, it should be the most recent.
742            return Some(receipt.clone());
743        }
744
745        let unthreaded_read_receipt = room_data_provider
746            .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
747            .await;
748
749        let main_thread_read_receipt = room_data_provider
750            .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
751            .await;
752
753        // Let's use the unthreaded read receipt as default, since it's the one we
754        // should be using.
755        match self.compare_optional_receipts(
756            main_thread_read_receipt.as_ref(),
757            unthreaded_read_receipt.as_ref(),
758            all_remote_events,
759        ) {
760            Ordering::Greater => main_thread_read_receipt,
761            Ordering::Less => unthreaded_read_receipt,
762            _ => unreachable!(),
763        }
764    }
765
766    /// Compares two optional receipts to know which one is more recent.
767    ///
768    /// Returns `Ordering::Greater` if the left-hand side is more recent than
769    /// the right-hand side, and `Ordering::Less` if it is older. If it's
770    /// not possible to know which one is the more recent, defaults to
771    /// `Ordering::Less`, making the right-hand side the default.
772    fn compare_optional_receipts(
773        &self,
774        lhs: Option<&(OwnedEventId, Receipt)>,
775        rhs_or_default: Option<&(OwnedEventId, Receipt)>,
776        all_remote_events: &AllRemoteEvents,
777    ) -> Ordering {
778        // If we only have one, use it.
779        let Some((lhs_event_id, lhs_receipt)) = lhs else {
780            return Ordering::Less;
781        };
782        let Some((rhs_event_id, rhs_receipt)) = rhs_or_default else {
783            return Ordering::Greater;
784        };
785
786        // Compare by position in the timeline.
787        if let Some(relative_pos) =
788            self.compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events)
789        {
790            if relative_pos == RelativePosition::Before {
791                return Ordering::Greater;
792            }
793
794            return Ordering::Less;
795        }
796
797        // Compare by timestamp.
798        if let Some((lhs_ts, rhs_ts)) = lhs_receipt.ts.zip(rhs_receipt.ts) {
799            if lhs_ts > rhs_ts {
800                return Ordering::Greater;
801            }
802
803            return Ordering::Less;
804        }
805
806        Ordering::Less
807    }
808}