1use std::{cmp::Ordering, collections::HashMap};
16
17use futures_core::Stream;
18use indexmap::IndexMap;
19use ruma::{
20 events::receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
21 EventId, OwnedEventId, OwnedUserId, UserId,
22};
23use tokio::sync::watch;
24use tokio_stream::wrappers::WatchStream;
25use tracing::{debug, error, instrument, trace, warn};
26
27use super::{
28 rfind_event_by_id, AllRemoteEvents, FullEventMeta, ObservableItemsTransaction,
29 RelativePosition, RoomDataProvider, TimelineMetadata, TimelineState,
30};
31use crate::timeline::{controller::TimelineStateTransaction, TimelineItem};
32
33#[derive(Clone, Debug, Default)]
35pub(super) struct ReadReceipts {
36 by_event: HashMap<OwnedEventId, IndexMap<OwnedUserId, Receipt>>,
40
41 latest_by_user: HashMap<OwnedUserId, HashMap<ReceiptType, (OwnedEventId, Receipt)>>,
46
47 own_user_read_receipts_changed_sender: watch::Sender<()>,
49}
50
51impl ReadReceipts {
52 pub(super) fn clear(&mut self) {
54 self.by_event.clear();
55 self.latest_by_user.clear();
56 }
57
58 pub(super) fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
60 let subscriber = self.own_user_read_receipts_changed_sender.subscribe();
61 WatchStream::from_changes(subscriber)
62 }
63
64 fn get_latest(
67 &self,
68 user_id: &UserId,
69 receipt_type: &ReceiptType,
70 ) -> Option<&(OwnedEventId, Receipt)> {
71 self.latest_by_user.get(user_id).and_then(|map| map.get(receipt_type))
72 }
73
74 fn upsert_latest(
77 &mut self,
78 user_id: OwnedUserId,
79 receipt_type: ReceiptType,
80 read_receipt: (OwnedEventId, Receipt),
81 ) {
82 self.latest_by_user.entry(user_id).or_default().insert(receipt_type, read_receipt);
83 }
84
85 #[instrument(skip_all, fields(user_id = %new_receipt.user_id, event_id = %new_receipt.event_id))]
95 fn maybe_update_read_receipt(
96 &mut self,
97 new_receipt: FullReceipt<'_>,
98 is_own_user_id: bool,
99 timeline_items: &mut ObservableItemsTransaction<'_>,
100 ) {
101 let all_events = timeline_items.all_remote_events();
102
103 let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type);
105
106 if old_receipt
107 .as_ref()
108 .is_some_and(|(old_receipt_event_id, _)| old_receipt_event_id == new_receipt.event_id)
109 {
110 if !is_own_user_id {
112 trace!("receipt hasn't changed, nothing to do");
113 }
114 return;
115 }
116
117 let old_event_id = old_receipt.map(|(event_id, _)| event_id);
118
119 let mut old_receipt_pos = None;
121 let mut old_item_pos = None;
122 let mut old_item_event_id = None;
123 let mut new_receipt_pos = None;
124 let mut new_item_pos = None;
125 let mut new_item_event_id = None;
126
127 for (pos, event) in all_events.iter().rev().enumerate() {
128 if old_receipt_pos.is_none() && old_event_id == Some(&event.event_id) {
129 old_receipt_pos = Some(pos);
130 }
131
132 if old_receipt_pos.is_some() && old_item_event_id.is_none() && event.visible {
134 old_item_pos = event.timeline_item_index;
135 old_item_event_id = Some(event.event_id.clone());
136 }
137
138 if new_receipt_pos.is_none() && new_receipt.event_id == event.event_id {
139 new_receipt_pos = Some(pos);
140 }
141
142 if new_receipt_pos.is_some() && new_item_event_id.is_none() && event.visible {
144 new_item_pos = event.timeline_item_index;
145 new_item_event_id = Some(event.event_id.clone());
146 }
147
148 if old_item_event_id.is_some() && new_item_event_id.is_some() {
149 break;
151 }
152 }
153
154 if let Some(old_receipt_pos) = old_receipt_pos {
156 let Some(new_receipt_pos) = new_receipt_pos else {
157 if !is_own_user_id {
160 trace!("we had a previous read receipt, but couldn't find the event targeted by the new read receipt in the timeline, exiting");
161 }
162 return;
163 };
164
165 if old_receipt_pos < new_receipt_pos {
166 if !is_own_user_id {
168 trace!("the previous read receipt is more recent than the new one, exiting");
169 }
170 return;
171 }
172 }
173
174 if !is_own_user_id {
184 trace!(from_event = ?old_event_id, from_visible_event = ?old_item_event_id, to_event = ?new_receipt.event_id, to_visible_event = ?new_item_event_id, ?old_item_pos, ?new_item_pos, "moving read receipt");
185
186 if let Some(old_event_id) = old_event_id.cloned() {
188 self.remove_event_receipt_for_user(&old_event_id, new_receipt.user_id);
189 }
190
191 self.add_event_receipt_for_user(
193 new_receipt.event_id.to_owned(),
194 new_receipt.user_id.to_owned(),
195 new_receipt.receipt.clone(),
196 );
197 }
198
199 self.upsert_latest(
201 new_receipt.user_id.to_owned(),
202 new_receipt.receipt_type,
203 (new_receipt.event_id.to_owned(), new_receipt.receipt.clone()),
204 );
205
206 if is_own_user_id {
207 self.own_user_read_receipts_changed_sender.send_replace(());
208 return;
210 }
211
212 if new_item_event_id == old_item_event_id {
213 return;
215 }
216
217 let timeline_update = ReadReceiptTimelineUpdate {
218 old_item_pos,
219 old_event_id: old_item_event_id,
220 new_item_pos,
221 new_event_id: new_item_event_id,
222 };
223
224 timeline_update.apply(
225 timeline_items,
226 new_receipt.user_id.to_owned(),
227 new_receipt.receipt.clone(),
228 );
229 }
230
231 fn get_event_receipts(&self, event_id: &EventId) -> Option<&IndexMap<OwnedUserId, Receipt>> {
233 self.by_event.get(event_id)
234 }
235
236 fn add_event_receipt_for_user(
238 &mut self,
239 event_id: OwnedEventId,
240 user_id: OwnedUserId,
241 receipt: Receipt,
242 ) {
243 self.by_event.entry(event_id).or_default().insert(user_id, receipt);
244 }
245
246 fn remove_event_receipt_for_user(&mut self, event_id: &EventId, user_id: &UserId) {
248 if let Some(map) = self.by_event.get_mut(event_id) {
249 map.swap_remove(user_id);
250 if map.is_empty() {
252 self.by_event.remove(event_id);
253 }
254 }
255 }
256
257 #[instrument(skip(self, timeline_items, at_end))]
262 pub(super) fn compute_event_receipts(
263 &self,
264 event_id: &EventId,
265 timeline_items: &mut ObservableItemsTransaction<'_>,
266 at_end: bool,
267 ) -> IndexMap<OwnedUserId, Receipt> {
268 let mut all_receipts = self.get_event_receipts(event_id).cloned().unwrap_or_default();
269
270 if at_end {
271 trace!(
273 "early return because @end, retrieved receipts: {}",
274 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
275 );
276 return all_receipts;
277 }
278
279 trace!(
280 "loaded receipts: {}",
281 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
282 );
283
284 let mut events_iter = timeline_items.all_remote_events().iter();
295 let mut prev_event_and_item_index = None;
296
297 for meta in events_iter.by_ref() {
298 if meta.event_id == event_id {
299 break;
300 }
301 if let Some(item_index) = meta.timeline_item_index {
302 prev_event_and_item_index = Some((meta.event_id.clone(), item_index));
303 }
304 }
305
306 let mut hidden = Vec::new();
308 for hidden_event_meta in events_iter.take_while(|meta| !meta.visible) {
309 if let Some(event_receipts) = self.get_event_receipts(&hidden_event_meta.event_id) {
310 trace!(%hidden_event_meta.event_id, "found receipts on hidden event");
311 hidden.extend(event_receipts.clone());
312 }
313 }
314
315 if let Some((prev_event_id, prev_item_index)) = prev_event_and_item_index {
317 let prev_item = &timeline_items[prev_item_index];
318 if let Some(remote_prev_item) = prev_item.as_event() {
322 let prev_receipts = remote_prev_item.read_receipts().clone();
323 for (user_id, _) in &hidden {
324 if !prev_receipts.contains_key(user_id) {
325 continue;
326 }
327 let mut up = ReadReceiptTimelineUpdate {
328 old_item_pos: Some(prev_item_index),
329 old_event_id: Some(prev_event_id.clone()),
330 new_item_pos: None,
331 new_event_id: None,
332 };
333 up.remove_old_receipt(timeline_items, user_id);
334 }
335 }
336 }
337
338 all_receipts.extend(hidden);
339 trace!(
340 "computed receipts: {}",
341 all_receipts.iter().map(|(u, _)| u.as_str()).collect::<Vec<_>>().join(", ")
342 );
343 all_receipts
344 }
345}
346
347struct FullReceipt<'a> {
348 event_id: &'a EventId,
349 user_id: &'a UserId,
350 receipt_type: ReceiptType,
351 receipt: &'a Receipt,
352}
353
354#[derive(Clone, Debug, Default)]
356struct ReadReceiptTimelineUpdate {
357 old_item_pos: Option<usize>,
360 old_event_id: Option<OwnedEventId>,
362 new_item_pos: Option<usize>,
365 new_event_id: Option<OwnedEventId>,
367}
368
369impl ReadReceiptTimelineUpdate {
370 #[instrument(skip_all)]
372 fn remove_old_receipt(&mut self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) {
373 let Some(event_id) = &self.old_event_id else {
374 return;
376 };
377
378 let item_pos = self.old_item_pos.or_else(|| {
379 items
380 .iter()
381 .enumerate()
382 .rev()
383 .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
384 .find_map(|(nth, event_item)| {
385 (event_item.event_id() == Some(event_id)).then_some(nth)
386 })
387 });
388
389 let Some(item_pos) = item_pos else {
390 debug!(%event_id, %user_id, "inconsistent state: old event item for read receipt was not found");
391 return;
392 };
393
394 self.old_item_pos = Some(item_pos);
395
396 let event_item = &items[item_pos];
397 let event_item_id = event_item.unique_id().to_owned();
398
399 let Some(mut event_item) = event_item.as_event().cloned() else {
400 warn!("received a read receipt for a virtual item, this should not be possible");
401 return;
402 };
403
404 if let Some(remote_event_item) = event_item.as_remote_mut() {
405 if remote_event_item.read_receipts.swap_remove(user_id).is_none() {
406 debug!(
407 %event_id, %user_id,
408 "inconsistent state: old event item for user's read \
409 receipt doesn't have a receipt for the user"
410 );
411 }
412 trace!(%user_id, %event_id, "removed read receipt from event item");
413 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
414 } else {
415 warn!("received a read receipt for a local item, this should not be possible");
416 }
417 }
418
419 #[instrument(skip_all)]
421 fn add_new_receipt(
422 self,
423 items: &mut ObservableItemsTransaction<'_>,
424 user_id: OwnedUserId,
425 receipt: Receipt,
426 ) {
427 let Some(event_id) = self.new_event_id else {
428 return;
430 };
431
432 let item_pos = self.new_item_pos.or_else(|| {
433 items
434 .iter()
435 .enumerate()
436 .skip(self.old_item_pos.unwrap_or(0))
439 .rev()
440 .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
441 .find_map(|(nth, event_item)| {
442 (event_item.event_id() == Some(&event_id)).then_some(nth)
443 })
444 });
445
446 let Some(item_pos) = item_pos else {
447 debug!(%event_id, %user_id, "inconsistent state: new event item for read receipt was not found");
448 return;
449 };
450
451 debug_assert!(
452 item_pos >= self.old_item_pos.unwrap_or(0),
453 "The new receipt must be added on a timeline item that is _after_ the timeline item that was holding the old receipt");
454
455 let event_item = &items[item_pos];
456 let event_item_id = event_item.unique_id().to_owned();
457
458 let Some(mut event_item) = event_item.as_event().cloned() else {
459 warn!("received a read receipt for a virtual item, this should not be possible");
460 return;
461 };
462
463 if let Some(remote_event_item) = event_item.as_remote_mut() {
464 trace!(%user_id, %event_id, "added read receipt to event item");
465 remote_event_item.read_receipts.insert(user_id, receipt);
466 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
467 } else {
468 warn!("received a read receipt for a local item, this should not be possible");
469 }
470 }
471
472 fn apply(
474 mut self,
475 items: &mut ObservableItemsTransaction<'_>,
476 user_id: OwnedUserId,
477 receipt: Receipt,
478 ) {
479 self.remove_old_receipt(items, &user_id);
480 self.add_new_receipt(items, user_id, receipt);
481 }
482}
483
484impl TimelineStateTransaction<'_> {
485 pub(super) fn handle_explicit_read_receipts(
486 &mut self,
487 receipt_event_content: ReceiptEventContent,
488 own_user_id: &UserId,
489 ) {
490 trace!("handling explicit read receipts");
491 for (event_id, receipt_types) in receipt_event_content.0 {
492 for (receipt_type, receipts) in receipt_types {
493 if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
495 continue;
496 }
497
498 for (user_id, receipt) in receipts {
499 if !matches!(receipt.thread, ReceiptThread::Unthreaded | ReceiptThread::Main) {
500 continue;
501 }
502
503 let is_own_user_id = user_id == own_user_id;
504 let full_receipt = FullReceipt {
505 event_id: &event_id,
506 user_id: &user_id,
507 receipt_type: receipt_type.clone(),
508 receipt: &receipt,
509 };
510
511 self.meta.read_receipts.maybe_update_read_receipt(
512 full_receipt,
513 is_own_user_id,
514 &mut self.items,
515 );
516 }
517 }
518 }
519 }
520
521 pub(super) async fn load_read_receipts_for_event<P: RoomDataProvider>(
525 &mut self,
526 event_id: &EventId,
527 room_data_provider: &P,
528 ) {
529 trace!(%event_id, "loading initial receipts for an event");
530 let read_receipts = room_data_provider.load_event_receipts(event_id).await;
531 let own_user_id = room_data_provider.own_user_id();
532
533 for (user_id, receipt) in read_receipts {
536 let full_receipt = FullReceipt {
537 event_id,
538 user_id: &user_id,
539 receipt_type: ReceiptType::Read,
540 receipt: &receipt,
541 };
542
543 self.meta.read_receipts.maybe_update_read_receipt(
544 full_receipt,
545 user_id == own_user_id,
546 &mut self.items,
547 );
548 }
549 }
550
551 pub(super) fn maybe_add_implicit_read_receipt(&mut self, event_meta: FullEventMeta<'_>) {
560 let FullEventMeta { event_id, sender, is_own_event, timestamp, .. } = event_meta;
561
562 let (Some(user_id), Some(timestamp)) = (sender, timestamp) else {
563 return;
565 };
566
567 trace!(%event_id, "adding implicit read receipt");
568 let receipt = Receipt::new(timestamp);
569 let full_receipt =
570 FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
571
572 self.meta.read_receipts.maybe_update_read_receipt(
573 full_receipt,
574 is_own_event,
575 &mut self.items,
576 );
577 }
578
579 #[instrument(skip(self))]
582 pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) {
583 let Some(prev_event_meta) = self
585 .items
586 .all_remote_events()
587 .iter()
588 .rev()
589 .skip_while(|meta| meta.event_id != event_id)
591 .skip(1)
593 .find(|meta| meta.visible)
595 else {
596 trace!("Couldn't find any previous visible event, exiting");
597 return;
598 };
599
600 let Some((prev_item_pos, prev_event_item)) =
601 rfind_event_by_id(&self.items, &prev_event_meta.event_id)
602 else {
603 error!("inconsistent state: timeline item of visible event was not found");
604 return;
605 };
606
607 let prev_event_item_id = prev_event_item.internal_id.to_owned();
608 let mut prev_event_item = prev_event_item.clone();
609
610 let Some(remote_prev_event_item) = prev_event_item.as_remote_mut() else {
611 warn!("loading read receipts for a local item, this should not be possible");
612 return;
613 };
614
615 let read_receipts = self.meta.read_receipts.compute_event_receipts(
616 &remote_prev_event_item.event_id,
617 &mut self.items,
618 false,
619 );
620
621 if read_receipts.len() == remote_prev_event_item.read_receipts.len() {
623 trace!("same count of read receipts, not doing anything");
624 return;
625 }
626
627 trace!("replacing read receipts with the new ones");
628 remote_prev_event_item.read_receipts = read_receipts;
629 self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id));
630 }
631}
632
633impl TimelineState {
634 pub(super) async fn populate_initial_user_receipt<P: RoomDataProvider>(
637 &mut self,
638 room_data_provider: &P,
639 receipt_type: ReceiptType,
640 ) {
641 let own_user_id = room_data_provider.own_user_id().to_owned();
642
643 let mut read_receipt = room_data_provider
644 .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, &own_user_id)
645 .await;
646
647 if read_receipt.is_none() {
649 read_receipt = room_data_provider
650 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
651 .await;
652 }
653
654 if let Some(read_receipt) = read_receipt {
655 self.meta.read_receipts.upsert_latest(own_user_id, receipt_type, read_receipt);
656 }
657 }
658
659 pub(super) async fn latest_user_read_receipt<P: RoomDataProvider>(
663 &self,
664 user_id: &UserId,
665 room_data_provider: &P,
666 ) -> Option<(OwnedEventId, Receipt)> {
667 let all_remote_events = self.items.all_remote_events();
668 let public_read_receipt = self
669 .meta
670 .user_receipt(user_id, ReceiptType::Read, room_data_provider, all_remote_events)
671 .await;
672 let private_read_receipt = self
673 .meta
674 .user_receipt(user_id, ReceiptType::ReadPrivate, room_data_provider, all_remote_events)
675 .await;
676
677 match self.meta.compare_optional_receipts(
681 public_read_receipt.as_ref(),
682 private_read_receipt.as_ref(),
683 self.items.all_remote_events(),
684 ) {
685 Ordering::Greater => public_read_receipt,
686 Ordering::Less => private_read_receipt,
687 _ => unreachable!(),
688 }
689 }
690
691 pub(super) fn latest_user_read_receipt_timeline_event_id(
694 &self,
695 user_id: &UserId,
696 ) -> Option<OwnedEventId> {
697 let public_read_receipt = self.meta.read_receipts.get_latest(user_id, &ReceiptType::Read);
700 let private_read_receipt =
701 self.meta.read_receipts.get_latest(user_id, &ReceiptType::ReadPrivate);
702
703 let (latest_receipt_id, _) = match self.meta.compare_optional_receipts(
707 public_read_receipt,
708 private_read_receipt,
709 self.items.all_remote_events(),
710 ) {
711 Ordering::Greater => public_read_receipt?,
712 Ordering::Less => private_read_receipt?,
713 _ => unreachable!(),
714 };
715
716 self.items
718 .all_remote_events()
719 .iter()
720 .rev()
721 .skip_while(|ev| ev.event_id != *latest_receipt_id)
722 .find(|ev| ev.visible)
723 .map(|ev| ev.event_id.clone())
724 }
725}
726
727impl TimelineMetadata {
728 pub(super) async fn user_receipt<P: RoomDataProvider>(
734 &self,
735 user_id: &UserId,
736 receipt_type: ReceiptType,
737 room_data_provider: &P,
738 all_remote_events: &AllRemoteEvents,
739 ) -> Option<(OwnedEventId, Receipt)> {
740 if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) {
741 return Some(receipt.clone());
743 }
744
745 let unthreaded_read_receipt = room_data_provider
746 .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
747 .await;
748
749 let main_thread_read_receipt = room_data_provider
750 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
751 .await;
752
753 match self.compare_optional_receipts(
756 main_thread_read_receipt.as_ref(),
757 unthreaded_read_receipt.as_ref(),
758 all_remote_events,
759 ) {
760 Ordering::Greater => main_thread_read_receipt,
761 Ordering::Less => unthreaded_read_receipt,
762 _ => unreachable!(),
763 }
764 }
765
766 fn compare_optional_receipts(
773 &self,
774 lhs: Option<&(OwnedEventId, Receipt)>,
775 rhs_or_default: Option<&(OwnedEventId, Receipt)>,
776 all_remote_events: &AllRemoteEvents,
777 ) -> Ordering {
778 let Some((lhs_event_id, lhs_receipt)) = lhs else {
780 return Ordering::Less;
781 };
782 let Some((rhs_event_id, rhs_receipt)) = rhs_or_default else {
783 return Ordering::Greater;
784 };
785
786 if let Some(relative_pos) =
788 self.compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events)
789 {
790 if relative_pos == RelativePosition::Before {
791 return Ordering::Greater;
792 }
793
794 return Ordering::Less;
795 }
796
797 if let Some((lhs_ts, rhs_ts)) = lhs_receipt.ts.zip(rhs_receipt.ts) {
799 if lhs_ts > rhs_ts {
800 return Ordering::Greater;
801 }
802
803 return Ordering::Less;
804 }
805
806 Ordering::Less
807 }
808}