1use std::{cmp::Ordering, collections::HashMap};
16
17use futures_core::Stream;
18use indexmap::IndexMap;
19use ruma::{
20 events::receipt::{Receipt, ReceiptEventContent, ReceiptThread, ReceiptType},
21 EventId, OwnedEventId, OwnedUserId, UserId,
22};
23use tokio::sync::watch;
24use tokio_stream::wrappers::WatchStream;
25use tracing::{debug, error, warn};
26
27use super::{
28 rfind_event_by_id, AllRemoteEvents, FullEventMeta, ObservableItemsTransaction,
29 RelativePosition, RoomDataProvider, TimelineMetadata, TimelineState,
30};
31use crate::timeline::{controller::TimelineStateTransaction, TimelineItem};
32
33#[derive(Clone, Debug, Default)]
35pub(super) struct ReadReceipts {
36 by_event: HashMap<OwnedEventId, IndexMap<OwnedUserId, Receipt>>,
40
41 latest_by_user: HashMap<OwnedUserId, HashMap<ReceiptType, (OwnedEventId, Receipt)>>,
46
47 own_user_read_receipts_changed_sender: watch::Sender<()>,
49}
50
51impl ReadReceipts {
52 pub(super) fn clear(&mut self) {
54 self.by_event.clear();
55 self.latest_by_user.clear();
56 }
57
58 pub(super) fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
60 let subscriber = self.own_user_read_receipts_changed_sender.subscribe();
61 WatchStream::from_changes(subscriber)
62 }
63
64 fn get_latest(
67 &self,
68 user_id: &UserId,
69 receipt_type: &ReceiptType,
70 ) -> Option<&(OwnedEventId, Receipt)> {
71 self.latest_by_user.get(user_id).and_then(|map| map.get(receipt_type))
72 }
73
74 fn upsert_latest(
77 &mut self,
78 user_id: OwnedUserId,
79 receipt_type: ReceiptType,
80 read_receipt: (OwnedEventId, Receipt),
81 ) {
82 self.latest_by_user.entry(user_id).or_default().insert(receipt_type, read_receipt);
83 }
84
85 fn maybe_update_read_receipt(
95 &mut self,
96 new_receipt: FullReceipt<'_>,
97 is_own_user_id: bool,
98 timeline_items: &mut ObservableItemsTransaction<'_>,
99 ) {
100 let all_events = timeline_items.all_remote_events();
101
102 let old_receipt = self.get_latest(new_receipt.user_id, &new_receipt.receipt_type);
104
105 if old_receipt
106 .as_ref()
107 .is_some_and(|(old_receipt_event_id, _)| old_receipt_event_id == new_receipt.event_id)
108 {
109 return;
111 }
112
113 let old_event_id = old_receipt.map(|(event_id, _)| event_id);
114
115 let mut old_receipt_pos = None;
117 let mut old_item_pos = None;
118 let mut old_item_event_id = None;
119 let mut new_receipt_pos = None;
120 let mut new_item_pos = None;
121 let mut new_item_event_id = None;
122
123 for (pos, event) in all_events.iter().rev().enumerate() {
124 if old_receipt_pos.is_none() && old_event_id == Some(&event.event_id) {
125 old_receipt_pos = Some(pos);
126 }
127
128 if old_receipt_pos.is_some() && old_item_event_id.is_none() && event.visible {
130 old_item_pos = event.timeline_item_index;
131 old_item_event_id = Some(event.event_id.clone());
132 }
133
134 if new_receipt_pos.is_none() && new_receipt.event_id == event.event_id {
135 new_receipt_pos = Some(pos);
136 }
137
138 if new_receipt_pos.is_some() && new_item_event_id.is_none() && event.visible {
140 new_item_pos = event.timeline_item_index;
141 new_item_event_id = Some(event.event_id.clone());
142 }
143
144 if old_item_event_id.is_some() && new_item_event_id.is_some() {
145 break;
147 }
148 }
149
150 if let Some(old_receipt_pos) = old_receipt_pos {
152 let Some(new_receipt_pos) = new_receipt_pos else {
153 return;
156 };
157
158 if old_receipt_pos < new_receipt_pos {
159 return;
161 }
162 }
163
164 if !is_own_user_id {
174 if let Some(old_event_id) = old_event_id.cloned() {
176 self.remove_event_receipt_for_user(&old_event_id, new_receipt.user_id);
177 }
178
179 self.add_event_receipt_for_user(
181 new_receipt.event_id.to_owned(),
182 new_receipt.user_id.to_owned(),
183 new_receipt.receipt.clone(),
184 );
185 }
186
187 self.upsert_latest(
189 new_receipt.user_id.to_owned(),
190 new_receipt.receipt_type,
191 (new_receipt.event_id.to_owned(), new_receipt.receipt.clone()),
192 );
193
194 if is_own_user_id {
195 self.own_user_read_receipts_changed_sender.send_replace(());
196 return;
198 }
199
200 if new_item_event_id == old_item_event_id {
201 return;
203 }
204
205 let timeline_update = ReadReceiptTimelineUpdate {
206 old_item_pos,
207 old_event_id: old_item_event_id,
208 new_item_pos,
209 new_event_id: new_item_event_id,
210 };
211
212 timeline_update.apply(
213 timeline_items,
214 new_receipt.user_id.to_owned(),
215 new_receipt.receipt.clone(),
216 );
217 }
218
219 fn get_event_receipts(&self, event_id: &EventId) -> Option<&IndexMap<OwnedUserId, Receipt>> {
221 self.by_event.get(event_id)
222 }
223
224 fn add_event_receipt_for_user(
226 &mut self,
227 event_id: OwnedEventId,
228 user_id: OwnedUserId,
229 receipt: Receipt,
230 ) {
231 self.by_event.entry(event_id).or_default().insert(user_id, receipt);
232 }
233
234 fn remove_event_receipt_for_user(&mut self, event_id: &EventId, user_id: &UserId) {
236 if let Some(map) = self.by_event.get_mut(event_id) {
237 map.swap_remove(user_id);
238 if map.is_empty() {
240 self.by_event.remove(event_id);
241 }
242 }
243 }
244
245 pub(super) fn compute_event_receipts(
250 &self,
251 event_id: &EventId,
252 all_events: &AllRemoteEvents,
253 at_end: bool,
254 ) -> IndexMap<OwnedUserId, Receipt> {
255 let mut all_receipts = self.get_event_receipts(event_id).cloned().unwrap_or_default();
256
257 if at_end {
258 return all_receipts;
260 }
261
262 let events_iter = all_events.iter().skip_while(|meta| meta.event_id != event_id);
264
265 let events_iter = events_iter.skip(1);
267
268 for hidden_event_meta in events_iter.take_while(|meta| !meta.visible) {
270 if let Some(event_receipts) = self.get_event_receipts(&hidden_event_meta.event_id) {
271 all_receipts.extend(event_receipts.clone());
272 }
273 }
274
275 all_receipts
276 }
277}
278
279struct FullReceipt<'a> {
280 event_id: &'a EventId,
281 user_id: &'a UserId,
282 receipt_type: ReceiptType,
283 receipt: &'a Receipt,
284}
285
286#[derive(Clone, Debug, Default)]
288struct ReadReceiptTimelineUpdate {
289 old_item_pos: Option<usize>,
292 old_event_id: Option<OwnedEventId>,
294 new_item_pos: Option<usize>,
297 new_event_id: Option<OwnedEventId>,
299}
300
301impl ReadReceiptTimelineUpdate {
302 fn remove_old_receipt(&mut self, items: &mut ObservableItemsTransaction<'_>, user_id: &UserId) {
304 let Some(event_id) = &self.old_event_id else {
305 return;
307 };
308
309 let item_pos = self.old_item_pos.or_else(|| {
310 items
311 .iter()
312 .enumerate()
313 .rev()
314 .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
315 .find_map(|(nth, event_item)| {
316 (event_item.event_id() == Some(event_id)).then_some(nth)
317 })
318 });
319
320 let Some(item_pos) = item_pos else {
321 debug!(%event_id, %user_id, "inconsistent state: old event item for read receipt was not found");
322 return;
323 };
324
325 self.old_item_pos = Some(item_pos);
326
327 let event_item = &items[item_pos];
328 let event_item_id = event_item.unique_id().to_owned();
329
330 let Some(mut event_item) = event_item.as_event().cloned() else {
331 warn!("received a read receipt for a virtual item, this should not be possible");
332 return;
333 };
334
335 if let Some(remote_event_item) = event_item.as_remote_mut() {
336 if remote_event_item.read_receipts.swap_remove(user_id).is_none() {
337 debug!(
338 %event_id, %user_id,
339 "inconsistent state: old event item for user's read \
340 receipt doesn't have a receipt for the user"
341 );
342 }
343 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
344 } else {
345 warn!("received a read receipt for a local item, this should not be possible");
346 }
347 }
348
349 fn add_new_receipt(
351 self,
352 items: &mut ObservableItemsTransaction<'_>,
353 user_id: OwnedUserId,
354 receipt: Receipt,
355 ) {
356 let Some(event_id) = self.new_event_id else {
357 return;
359 };
360
361 let item_pos = self.new_item_pos.or_else(|| {
362 items
363 .iter()
364 .enumerate()
365 .skip(self.old_item_pos.unwrap_or(0))
368 .rev()
369 .filter_map(|(nth, item)| Some((nth, item.as_event()?)))
370 .find_map(|(nth, event_item)| {
371 (event_item.event_id() == Some(&event_id)).then_some(nth)
372 })
373 });
374
375 let Some(item_pos) = item_pos else {
376 debug!(%event_id, %user_id, "inconsistent state: new event item for read receipt was not found");
377 return;
378 };
379
380 debug_assert!(
381 item_pos >= self.old_item_pos.unwrap_or(0),
382 "The new receipt must be added on a timeline item that is _after_ the timeline item that was holding the old receipt");
383
384 let event_item = &items[item_pos];
385 let event_item_id = event_item.unique_id().to_owned();
386
387 let Some(mut event_item) = event_item.as_event().cloned() else {
388 warn!("received a read receipt for a virtual item, this should not be possible");
389 return;
390 };
391
392 if let Some(remote_event_item) = event_item.as_remote_mut() {
393 remote_event_item.read_receipts.insert(user_id, receipt);
394 items.replace(item_pos, TimelineItem::new(event_item, event_item_id));
395 } else {
396 warn!("received a read receipt for a local item, this should not be possible");
397 }
398 }
399
400 fn apply(
402 mut self,
403 items: &mut ObservableItemsTransaction<'_>,
404 user_id: OwnedUserId,
405 receipt: Receipt,
406 ) {
407 self.remove_old_receipt(items, &user_id);
408 self.add_new_receipt(items, user_id, receipt);
409 }
410}
411
412impl TimelineStateTransaction<'_> {
413 pub(super) fn handle_explicit_read_receipts(
414 &mut self,
415 receipt_event_content: ReceiptEventContent,
416 own_user_id: &UserId,
417 ) {
418 for (event_id, receipt_types) in receipt_event_content.0 {
419 for (receipt_type, receipts) in receipt_types {
420 if !matches!(receipt_type, ReceiptType::Read | ReceiptType::ReadPrivate) {
422 continue;
423 }
424
425 for (user_id, receipt) in receipts {
426 if !matches!(receipt.thread, ReceiptThread::Unthreaded | ReceiptThread::Main) {
427 continue;
428 }
429
430 let is_own_user_id = user_id == own_user_id;
431 let full_receipt = FullReceipt {
432 event_id: &event_id,
433 user_id: &user_id,
434 receipt_type: receipt_type.clone(),
435 receipt: &receipt,
436 };
437
438 self.meta.read_receipts.maybe_update_read_receipt(
439 full_receipt,
440 is_own_user_id,
441 &mut self.items,
442 );
443 }
444 }
445 }
446 }
447
448 pub(super) async fn load_read_receipts_for_event<P: RoomDataProvider>(
452 &mut self,
453 event_id: &EventId,
454 room_data_provider: &P,
455 ) {
456 let read_receipts = room_data_provider.load_event_receipts(event_id).await;
457 let own_user_id = room_data_provider.own_user_id();
458
459 for (user_id, receipt) in read_receipts {
462 let full_receipt = FullReceipt {
463 event_id,
464 user_id: &user_id,
465 receipt_type: ReceiptType::Read,
466 receipt: &receipt,
467 };
468
469 self.meta.read_receipts.maybe_update_read_receipt(
470 full_receipt,
471 user_id == own_user_id,
472 &mut self.items,
473 );
474 }
475 }
476
477 pub(super) fn maybe_add_implicit_read_receipt(&mut self, event_meta: FullEventMeta<'_>) {
486 let FullEventMeta { event_id, sender, is_own_event, timestamp, .. } = event_meta;
487
488 let (Some(user_id), Some(timestamp)) = (sender, timestamp) else {
489 return;
491 };
492
493 let receipt = Receipt::new(timestamp);
494 let full_receipt =
495 FullReceipt { event_id, user_id, receipt_type: ReceiptType::Read, receipt: &receipt };
496
497 self.meta.read_receipts.maybe_update_read_receipt(
498 full_receipt,
499 is_own_event,
500 &mut self.items,
501 );
502 }
503
504 pub(super) fn maybe_update_read_receipts_of_prev_event(&mut self, event_id: &EventId) {
507 let Some(prev_event_meta) = self
509 .items
510 .all_remote_events()
511 .iter()
512 .rev()
513 .skip_while(|meta| meta.event_id != event_id)
515 .skip(1)
517 .find(|meta| meta.visible)
519 else {
520 return;
521 };
522
523 let Some((prev_item_pos, prev_event_item)) =
524 rfind_event_by_id(&self.items, &prev_event_meta.event_id)
525 else {
526 error!("inconsistent state: timeline item of visible event was not found");
527 return;
528 };
529
530 let prev_event_item_id = prev_event_item.internal_id.to_owned();
531 let mut prev_event_item = prev_event_item.clone();
532
533 let Some(remote_prev_event_item) = prev_event_item.as_remote_mut() else {
534 warn!("loading read receipts for a local item, this should not be possible");
535 return;
536 };
537
538 let read_receipts = self.meta.read_receipts.compute_event_receipts(
539 &remote_prev_event_item.event_id,
540 self.items.all_remote_events(),
541 false,
542 );
543
544 if read_receipts.len() == remote_prev_event_item.read_receipts.len() {
546 return;
547 }
548
549 remote_prev_event_item.read_receipts = read_receipts;
550 self.items.replace(prev_item_pos, TimelineItem::new(prev_event_item, prev_event_item_id));
551 }
552}
553
554impl TimelineState {
555 pub(super) async fn populate_initial_user_receipt<P: RoomDataProvider>(
558 &mut self,
559 room_data_provider: &P,
560 receipt_type: ReceiptType,
561 ) {
562 let own_user_id = room_data_provider.own_user_id().to_owned();
563
564 let mut read_receipt = room_data_provider
565 .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, &own_user_id)
566 .await;
567
568 if read_receipt.is_none() {
570 read_receipt = room_data_provider
571 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, &own_user_id)
572 .await;
573 }
574
575 if let Some(read_receipt) = read_receipt {
576 self.meta.read_receipts.upsert_latest(own_user_id, receipt_type, read_receipt);
577 }
578 }
579
580 pub(super) async fn latest_user_read_receipt<P: RoomDataProvider>(
584 &self,
585 user_id: &UserId,
586 room_data_provider: &P,
587 ) -> Option<(OwnedEventId, Receipt)> {
588 let all_remote_events = self.items.all_remote_events();
589 let public_read_receipt = self
590 .meta
591 .user_receipt(user_id, ReceiptType::Read, room_data_provider, all_remote_events)
592 .await;
593 let private_read_receipt = self
594 .meta
595 .user_receipt(user_id, ReceiptType::ReadPrivate, room_data_provider, all_remote_events)
596 .await;
597
598 match self.meta.compare_optional_receipts(
602 public_read_receipt.as_ref(),
603 private_read_receipt.as_ref(),
604 self.items.all_remote_events(),
605 ) {
606 Ordering::Greater => public_read_receipt,
607 Ordering::Less => private_read_receipt,
608 _ => unreachable!(),
609 }
610 }
611
612 pub(super) fn latest_user_read_receipt_timeline_event_id(
615 &self,
616 user_id: &UserId,
617 ) -> Option<OwnedEventId> {
618 let public_read_receipt = self.meta.read_receipts.get_latest(user_id, &ReceiptType::Read);
621 let private_read_receipt =
622 self.meta.read_receipts.get_latest(user_id, &ReceiptType::ReadPrivate);
623
624 let (latest_receipt_id, _) = match self.meta.compare_optional_receipts(
628 public_read_receipt,
629 private_read_receipt,
630 self.items.all_remote_events(),
631 ) {
632 Ordering::Greater => public_read_receipt?,
633 Ordering::Less => private_read_receipt?,
634 _ => unreachable!(),
635 };
636
637 self.items
639 .all_remote_events()
640 .iter()
641 .rev()
642 .skip_while(|ev| ev.event_id != *latest_receipt_id)
643 .find(|ev| ev.visible)
644 .map(|ev| ev.event_id.clone())
645 }
646}
647
648impl TimelineMetadata {
649 pub(super) async fn user_receipt<P: RoomDataProvider>(
655 &self,
656 user_id: &UserId,
657 receipt_type: ReceiptType,
658 room_data_provider: &P,
659 all_remote_events: &AllRemoteEvents,
660 ) -> Option<(OwnedEventId, Receipt)> {
661 if let Some(receipt) = self.read_receipts.get_latest(user_id, &receipt_type) {
662 return Some(receipt.clone());
664 }
665
666 let unthreaded_read_receipt = room_data_provider
667 .load_user_receipt(receipt_type.clone(), ReceiptThread::Unthreaded, user_id)
668 .await;
669
670 let main_thread_read_receipt = room_data_provider
671 .load_user_receipt(receipt_type.clone(), ReceiptThread::Main, user_id)
672 .await;
673
674 match self.compare_optional_receipts(
677 main_thread_read_receipt.as_ref(),
678 unthreaded_read_receipt.as_ref(),
679 all_remote_events,
680 ) {
681 Ordering::Greater => main_thread_read_receipt,
682 Ordering::Less => unthreaded_read_receipt,
683 _ => unreachable!(),
684 }
685 }
686
687 fn compare_optional_receipts(
694 &self,
695 lhs: Option<&(OwnedEventId, Receipt)>,
696 rhs_or_default: Option<&(OwnedEventId, Receipt)>,
697 all_remote_events: &AllRemoteEvents,
698 ) -> Ordering {
699 let Some((lhs_event_id, lhs_receipt)) = lhs else {
701 return Ordering::Less;
702 };
703 let Some((rhs_event_id, rhs_receipt)) = rhs_or_default else {
704 return Ordering::Greater;
705 };
706
707 if let Some(relative_pos) =
709 self.compare_events_positions(lhs_event_id, rhs_event_id, all_remote_events)
710 {
711 if relative_pos == RelativePosition::Before {
712 return Ordering::Greater;
713 }
714
715 return Ordering::Less;
716 }
717
718 if let Some((lhs_ts, rhs_ts)) = lhs_receipt.ts.zip(rhs_receipt.ts) {
720 if lhs_ts > rhs_ts {
721 return Ordering::Greater;
722 }
723
724 return Ordering::Less;
725 }
726
727 Ordering::Less
728 }
729}