1mod error;
50mod latest_event;
51mod room_latest_events;
52
53use std::{
54 collections::{HashMap, HashSet},
55 ops::{ControlFlow, Not},
56 sync::Arc,
57};
58
59pub use error::LatestEventsError;
60use eyeball::{AsyncLock, Subscriber};
61use futures_util::FutureExt;
62use latest_event::LatestEvent;
63pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
64use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
65use room_latest_events::RoomLatestEvents;
66use ruma::{EventId, OwnedRoomId, RoomId};
67use tokio::{
68 select,
69 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
70};
71use tracing::{error, warn};
72
73use crate::{
74 client::WeakClient,
75 event_cache::{EventCache, RoomEventCacheGenericUpdate},
76 room::WeakRoom,
77 send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
78};
79
80#[derive(Clone, Debug)]
82pub struct LatestEvents {
83 state: Arc<LatestEventsState>,
84}
85
86#[derive(Debug)]
88struct LatestEventsState {
89 registered_rooms: Arc<RegisteredRooms>,
91
92 _listen_task_handle: AbortOnDrop<()>,
95
96 _computation_task_handle: AbortOnDrop<()>,
98}
99
100impl LatestEvents {
101 pub(crate) fn new(
103 weak_client: WeakClient,
104 event_cache: EventCache,
105 send_queue: SendQueue,
106 ) -> Self {
107 let (room_registration_sender, room_registration_receiver) = mpsc::channel(32);
108 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
109
110 let registered_rooms =
111 Arc::new(RegisteredRooms::new(room_registration_sender, weak_client, &event_cache));
112
113 let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task(
115 registered_rooms.clone(),
116 room_registration_receiver,
117 event_cache,
118 send_queue,
119 latest_event_queue_sender,
120 ))
121 .abort_on_drop();
122
123 let computation_task_handle = spawn(compute_latest_events_task(
125 registered_rooms.clone(),
126 latest_event_queue_receiver,
127 ))
128 .abort_on_drop();
129
130 Self {
131 state: Arc::new(LatestEventsState {
132 registered_rooms,
133 _listen_task_handle: listen_task_handle,
134 _computation_task_handle: computation_task_handle,
135 }),
136 }
137 }
138
139 pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
143 Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
144 }
145
146 #[cfg(test)]
150 pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
151 self.state.registered_rooms.rooms.read().await.contains_key(room_id)
152 }
153
154 pub async fn listen_and_subscribe_to_room(
160 &self,
161 room_id: &RoomId,
162 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
163 let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
164 return Ok(None);
165 };
166
167 let room_latest_events = room_latest_events.read().await;
168 let latest_event = room_latest_events.for_room();
169
170 Ok(Some(latest_event.subscribe().await))
171 }
172
173 pub async fn listen_to_thread(
178 &self,
179 room_id: &RoomId,
180 thread_id: &EventId,
181 ) -> Result<bool, LatestEventsError> {
182 Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
183 }
184
185 pub async fn listen_and_subscribe_to_thread(
191 &self,
192 room_id: &RoomId,
193 thread_id: &EventId,
194 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
195 let Some(room_latest_events) =
196 self.state.registered_rooms.for_thread(room_id, thread_id).await?
197 else {
198 return Ok(None);
199 };
200
201 let room_latest_events = room_latest_events.read().await;
202 let latest_event = room_latest_events
203 .for_thread(thread_id)
204 .expect("The `LatestEvent` for the thread must have been created");
205
206 Ok(Some(latest_event.subscribe().await))
207 }
208
209 pub async fn forget_room(&self, room_id: &RoomId) {
216 self.state.registered_rooms.forget_room(room_id).await;
217 }
218
219 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
227 self.state.registered_rooms.forget_thread(room_id, thread_id).await;
228 }
229}
230
231#[derive(Debug)]
232struct RegisteredRooms {
233 rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
235
236 room_registration_sender: mpsc::Sender<RoomRegistration>,
246
247 weak_client: WeakClient,
249
250 event_cache: EventCache,
252}
253
254impl RegisteredRooms {
255 fn new(
256 room_registration_sender: mpsc::Sender<RoomRegistration>,
257 weak_client: WeakClient,
258 event_cache: &EventCache,
259 ) -> Self {
260 Self {
261 rooms: RwLock::new(HashMap::default()),
262 room_registration_sender,
263 weak_client,
264 event_cache: event_cache.clone(),
265 }
266 }
267
268 async fn room_latest_event(
275 &self,
276 room_id: &RoomId,
277 thread_id: Option<&EventId>,
278 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
279 Ok(match thread_id {
280 Some(thread_id) => {
286 let mut rooms = self.rooms.write().await;
287
288 if rooms.contains_key(room_id).not() {
290 if let Some(room_latest_event) = RoomLatestEvents::new(
292 WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
293 &self.event_cache,
294 )
295 .await?
296 {
297 rooms.insert(room_id.to_owned(), room_latest_event);
298
299 let _ = self
300 .room_registration_sender
301 .send(RoomRegistration::Add(room_id.to_owned()))
302 .await;
303 }
304 }
305
306 if let Some(room_latest_event) = rooms.get(room_id) {
307 let mut room_latest_event = room_latest_event.write().await;
308
309 if room_latest_event.has_thread(thread_id).not() {
312 room_latest_event
313 .create_and_insert_latest_event_for_thread(thread_id)
314 .await;
315 }
316 }
317
318 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
319 }
320
321 None => {
324 match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
325 .ok()
326 {
327 value @ Some(_) => value,
328 None => {
329 let mut rooms = self.rooms.write().await;
330
331 if rooms.contains_key(room_id).not() {
332 if let Some(room_latest_event) = RoomLatestEvents::new(
334 WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
335 &self.event_cache,
336 )
337 .await?
338 {
339 rooms.insert(room_id.to_owned(), room_latest_event);
340
341 let _ = self
342 .room_registration_sender
343 .send(RoomRegistration::Add(room_id.to_owned()))
344 .await;
345 }
346 }
347
348 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
349 }
350 }
351 }
352 })
353 }
354
355 pub async fn for_room(
359 &self,
360 room_id: &RoomId,
361 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
362 self.room_latest_event(room_id, None).await
363 }
364
365 pub async fn for_thread(
369 &self,
370 room_id: &RoomId,
371 thread_id: &EventId,
372 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
373 self.room_latest_event(room_id, Some(thread_id)).await
374 }
375
376 pub async fn forget_room(&self, room_id: &RoomId) {
383 {
384 let mut rooms = self.rooms.write().await;
385
386 rooms.remove(room_id);
388 }
389
390 let _ =
391 self.room_registration_sender.send(RoomRegistration::Remove(room_id.to_owned())).await;
392 }
393
394 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
402 let rooms = self.rooms.read().await;
403
404 if let Some(room_latest_event) = rooms.get(room_id) {
406 let mut room_latest_event = room_latest_event.write().await;
407
408 drop(rooms);
410
411 room_latest_event.forget_thread(thread_id);
412 }
413 }
414}
415
416#[derive(Debug)]
422enum RoomRegistration {
423 Add(OwnedRoomId),
425
426 Remove(OwnedRoomId),
428}
429
430enum LatestEventQueueUpdate {
433 EventCache {
435 room_id: OwnedRoomId,
437 },
438
439 SendQueue {
441 room_id: OwnedRoomId,
443
444 update: RoomSendQueueUpdate,
446 },
447}
448
449async fn listen_to_event_cache_and_send_queue_updates_task(
461 registered_rooms: Arc<RegisteredRooms>,
462 mut room_registration_receiver: mpsc::Receiver<RoomRegistration>,
463 event_cache: EventCache,
464 send_queue: SendQueue,
465 latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
466) {
467 let mut event_cache_generic_updates_subscriber =
468 event_cache.subscribe_to_room_generic_updates();
469 let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
470
471 let mut listened_rooms =
477 HashSet::from_iter(registered_rooms.rooms.read().await.keys().cloned());
478
479 loop {
480 if listen_to_event_cache_and_send_queue_updates(
481 &mut room_registration_receiver,
482 &mut event_cache_generic_updates_subscriber,
483 &mut send_queue_generic_updates_subscriber,
484 &mut listened_rooms,
485 &latest_event_queue_sender,
486 )
487 .await
488 .is_break()
489 {
490 warn!("`listen_to_event_cache_and_send_queue_updates_task` has stopped");
491
492 break;
493 }
494 }
495}
496
497async fn listen_to_event_cache_and_send_queue_updates(
502 room_registration_receiver: &mut mpsc::Receiver<RoomRegistration>,
503 event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
504 send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
505 listened_rooms: &mut HashSet<OwnedRoomId>,
506 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
507) -> ControlFlow<()> {
508 select! {
511 biased;
512
513 update = room_registration_receiver.recv().fuse() => {
514 match update {
515 Some(RoomRegistration::Add(room_id)) => {
516 listened_rooms.insert(room_id);
517 }
518 Some(RoomRegistration::Remove(room_id)) => {
519 listened_rooms.remove(&room_id);
520 }
521 None => {
522 error!("`room_registration` channel has been closed");
523
524 return ControlFlow::Break(());
525 }
526 }
527 }
528
529 room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv().fuse() => {
530 if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
531 if listened_rooms.contains(&room_id) {
532 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
533 room_id
534 });
535 }
536 } else {
537 warn!("`event_cache_generic_updates` channel has been closed");
538
539 return ControlFlow::Break(());
540 }
541 }
542
543 send_queue_generic_update = send_queue_generic_updates_subscriber.recv().fuse() => {
544 if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
545 if listened_rooms.contains(&room_id) {
546 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
547 room_id,
548 update
549 });
550 }
551 } else {
552 warn!("`send_queue_generic_updates` channel has been closed");
553
554 return ControlFlow::Break(());
555 }
556 }
557 }
558
559 ControlFlow::Continue(())
560}
561
562async fn compute_latest_events_task(
568 registered_rooms: Arc<RegisteredRooms>,
569 mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
570) {
571 const BUFFER_SIZE: usize = 16;
572
573 let mut buffer = Vec::with_capacity(BUFFER_SIZE);
574
575 while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
576 compute_latest_events(®istered_rooms, &buffer).await;
577 buffer.clear();
578 }
579
580 warn!("`compute_latest_events_task` has stopped");
581}
582
583async fn compute_latest_events(
584 registered_rooms: &RegisteredRooms,
585 latest_event_queue_updates: &[LatestEventQueueUpdate],
586) {
587 for latest_event_queue_update in latest_event_queue_updates {
588 match latest_event_queue_update {
589 LatestEventQueueUpdate::EventCache { room_id } => {
590 let rooms = registered_rooms.rooms.read().await;
591
592 if let Some(room_latest_events) = rooms.get(room_id) {
593 let mut room_latest_events = room_latest_events.write().await;
594
595 drop(rooms);
598
599 room_latest_events.update_with_event_cache().await;
600 } else {
601 error!(?room_id, "Failed to find the room");
602
603 continue;
604 }
605 }
606
607 LatestEventQueueUpdate::SendQueue { room_id, update } => {
608 let rooms = registered_rooms.rooms.read().await;
609
610 if let Some(room_latest_events) = rooms.get(room_id) {
611 let mut room_latest_events = room_latest_events.write().await;
612
613 drop(rooms);
616
617 room_latest_events.update_with_send_queue(update).await;
618 } else {
619 error!(?room_id, "Failed to find the room");
620
621 continue;
622 }
623 }
624 }
625 }
626}
627
628#[cfg(all(test, not(target_family = "wasm")))]
629mod tests {
630 use std::ops::Not;
631
632 use assert_matches::assert_matches;
633 use matrix_sdk_base::{
634 RoomState,
635 deserialized_responses::TimelineEventKind,
636 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
637 };
638 use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
639 use ruma::{
640 OwnedTransactionId, event_id,
641 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent},
642 owned_room_id, room_id, user_id,
643 };
644 use stream_assert::assert_pending;
645
646 use super::{
647 HashSet, LatestEventValue, RemoteLatestEventValue, RoomEventCacheGenericUpdate,
648 RoomRegistration, RoomSendQueueUpdate, SendQueueUpdate, broadcast,
649 listen_to_event_cache_and_send_queue_updates, mpsc,
650 };
651 use crate::test_utils::mocks::MatrixMockServer;
652
653 #[async_test]
654 async fn test_latest_events_are_lazy() {
655 let room_id_0 = room_id!("!r0");
656 let room_id_1 = room_id!("!r1");
657 let room_id_2 = room_id!("!r2");
658 let thread_id_1_0 = event_id!("$ev1.0");
659 let thread_id_2_0 = event_id!("$ev2.0");
660
661 let server = MatrixMockServer::new().await;
662 let client = server.client_builder().build().await;
663
664 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
665 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
666 client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
667
668 client.event_cache().subscribe().unwrap();
669
670 let latest_events = client.latest_events().await;
671
672 assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
674
675 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
677 assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
678
679 {
680 let rooms = latest_events.state.registered_rooms.rooms.read().await;
681 assert_eq!(rooms.len(), 2);
683 assert!(rooms.contains_key(room_id_0));
685 assert!(rooms.contains_key(room_id_1));
686
687 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
689 assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
691 }
692
693 assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
695 assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
696
697 {
698 let rooms = latest_events.state.registered_rooms.rooms.read().await;
699 assert_eq!(rooms.len(), 3);
701 assert!(rooms.contains_key(room_id_0));
703 assert!(rooms.contains_key(room_id_1));
704 assert!(rooms.contains_key(room_id_2));
705
706 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
708 let room_1 = rooms.get(room_id_1).unwrap().read().await;
710 assert_eq!(room_1.per_thread().len(), 1);
711 assert!(room_1.per_thread().contains_key(thread_id_1_0));
713 let room_2 = rooms.get(room_id_2).unwrap().read().await;
715 assert_eq!(room_2.per_thread().len(), 1);
716 assert!(room_2.per_thread().contains_key(thread_id_2_0));
718 }
719 }
720
721 #[async_test]
722 async fn test_forget_room() {
723 let room_id_0 = room_id!("!r0");
724 let room_id_1 = room_id!("!r1");
725
726 let server = MatrixMockServer::new().await;
727 let client = server.client_builder().build().await;
728
729 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
730 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
731
732 client.event_cache().subscribe().unwrap();
733
734 let latest_events = client.latest_events().await;
735
736 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
738
739 {
740 let rooms = latest_events.state.registered_rooms.rooms.read().await;
741 assert_eq!(rooms.len(), 1);
743 assert!(rooms.contains_key(room_id_0));
745
746 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
748 }
749
750 latest_events.forget_room(room_id_0).await;
752
753 {
754 let rooms = latest_events.state.registered_rooms.rooms.read().await;
755 assert!(rooms.is_empty());
757 }
758 }
759
760 #[async_test]
761 async fn test_forget_thread() {
762 let room_id_0 = room_id!("!r0");
763 let room_id_1 = room_id!("!r1");
764 let thread_id_0_0 = event_id!("$ev0.0");
765
766 let server = MatrixMockServer::new().await;
767 let client = server.client_builder().build().await;
768
769 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
770 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
771
772 client.event_cache().subscribe().unwrap();
773
774 let latest_events = client.latest_events().await;
775
776 assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
778
779 {
780 let rooms = latest_events.state.registered_rooms.rooms.read().await;
781 assert_eq!(rooms.len(), 1);
783 assert!(rooms.contains_key(room_id_0));
785
786 let room_0 = rooms.get(room_id_0).unwrap().read().await;
788 assert_eq!(room_0.per_thread().len(), 1);
789 assert!(room_0.per_thread().contains_key(thread_id_0_0));
791 }
792
793 latest_events.forget_thread(room_id_0, thread_id_0_0).await;
795
796 {
797 let rooms = latest_events.state.registered_rooms.rooms.read().await;
798 assert_eq!(rooms.len(), 1);
800 assert!(rooms.contains_key(room_id_0));
802
803 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
805 }
806 }
807
808 #[async_test]
809 async fn test_inputs_task_can_listen_to_room_registration() {
810 let room_id_0 = owned_room_id!("!r0");
811 let room_id_1 = owned_room_id!("!r1");
812
813 let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
814 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
815 broadcast::channel(1);
816 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
817 broadcast::channel(1);
818 let mut listened_rooms = HashSet::new();
819 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
820
821 {
823 room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
825
826 assert!(
828 listen_to_event_cache_and_send_queue_updates(
829 &mut room_registration_receiver,
830 &mut room_event_cache_generic_update_receiver,
831 &mut send_queue_generic_update_receiver,
832 &mut listened_rooms,
833 &latest_event_queue_sender,
834 )
835 .await
836 .is_continue()
837 );
838
839 assert_eq!(listened_rooms.len(), 1);
840 assert!(listened_rooms.contains(&room_id_0));
841 assert!(latest_event_queue_receiver.is_empty());
842 }
843
844 {
846 room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
847
848 assert!(
850 listen_to_event_cache_and_send_queue_updates(
851 &mut room_registration_receiver,
852 &mut room_event_cache_generic_update_receiver,
853 &mut send_queue_generic_update_receiver,
854 &mut listened_rooms,
855 &latest_event_queue_sender,
856 )
857 .await
858 .is_continue()
859 );
860
861 assert_eq!(listened_rooms.len(), 1);
863 assert!(listened_rooms.contains(&room_id_0));
864 assert!(latest_event_queue_receiver.is_empty());
865 }
866
867 {
869 room_registration_sender
870 .send(RoomRegistration::Add(room_id_1.to_owned()))
871 .await
872 .unwrap();
873
874 assert!(
876 listen_to_event_cache_and_send_queue_updates(
877 &mut room_registration_receiver,
878 &mut room_event_cache_generic_update_receiver,
879 &mut send_queue_generic_update_receiver,
880 &mut listened_rooms,
881 &latest_event_queue_sender,
882 )
883 .await
884 .is_continue()
885 );
886
887 assert_eq!(listened_rooms.len(), 2);
889 assert!(listened_rooms.contains(&room_id_0));
890 assert!(listened_rooms.contains(&room_id_1));
891 assert!(latest_event_queue_receiver.is_empty());
892 }
893 }
894
895 #[async_test]
896 async fn test_inputs_task_stops_when_room_registration_channel_is_closed() {
897 let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
898 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
899 broadcast::channel(1);
900 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
901 broadcast::channel(1);
902 let mut listened_rooms = HashSet::new();
903 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
904
905 room_registration_receiver.close();
907
908 assert!(
910 listen_to_event_cache_and_send_queue_updates(
911 &mut room_registration_receiver,
912 &mut room_event_cache_generic_update_receiver,
913 &mut send_queue_generic_update_receiver,
914 &mut listened_rooms,
915 &latest_event_queue_sender,
916 )
917 .await
918 .is_break()
920 );
921
922 assert_eq!(listened_rooms.len(), 0);
923 assert!(latest_event_queue_receiver.is_empty());
924 }
925
926 #[async_test]
927 async fn test_inputs_task_can_listen_to_room_event_cache() {
928 let room_id = owned_room_id!("!r0");
929
930 let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
931 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
932 broadcast::channel(1);
933 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
934 broadcast::channel(1);
935 let mut listened_rooms = HashSet::new();
936 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
937
938 {
940 room_event_cache_generic_update_sender
941 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
942 .unwrap();
943
944 assert!(
946 listen_to_event_cache_and_send_queue_updates(
947 &mut room_registration_receiver,
948 &mut room_event_cache_generic_update_receiver,
949 &mut send_queue_generic_update_receiver,
950 &mut listened_rooms,
951 &latest_event_queue_sender,
952 )
953 .await
954 .is_continue()
955 );
956
957 assert!(listened_rooms.is_empty());
958
959 assert!(latest_event_queue_receiver.is_empty());
961 }
962
963 {
965 room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
966 room_event_cache_generic_update_sender
967 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
968 .unwrap();
969
970 for _ in 0..2 {
973 assert!(
974 listen_to_event_cache_and_send_queue_updates(
975 &mut room_registration_receiver,
976 &mut room_event_cache_generic_update_receiver,
977 &mut send_queue_generic_update_receiver,
978 &mut listened_rooms,
979 &latest_event_queue_sender,
980 )
981 .await
982 .is_continue()
983 );
984 }
985
986 assert_eq!(listened_rooms.len(), 1);
987 assert!(listened_rooms.contains(&room_id));
988
989 assert!(latest_event_queue_receiver.is_empty().not());
991 }
992 }
993
994 #[async_test]
995 async fn test_inputs_task_can_listen_to_send_queue() {
996 let room_id = owned_room_id!("!r0");
997
998 let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
999 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1000 broadcast::channel(1);
1001 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1002 broadcast::channel(1);
1003 let mut listened_rooms = HashSet::new();
1004 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1005
1006 {
1008 send_queue_generic_update_sender
1009 .send(SendQueueUpdate {
1010 room_id: room_id.clone(),
1011 update: RoomSendQueueUpdate::SentEvent {
1012 transaction_id: OwnedTransactionId::from("txnid0"),
1013 event_id: event_id!("$ev0").to_owned(),
1014 },
1015 })
1016 .unwrap();
1017
1018 assert!(
1020 listen_to_event_cache_and_send_queue_updates(
1021 &mut room_registration_receiver,
1022 &mut room_event_cache_generic_update_receiver,
1023 &mut send_queue_generic_update_receiver,
1024 &mut listened_rooms,
1025 &latest_event_queue_sender,
1026 )
1027 .await
1028 .is_continue()
1029 );
1030
1031 assert!(listened_rooms.is_empty());
1032
1033 assert!(latest_event_queue_receiver.is_empty());
1035 }
1036
1037 {
1039 room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
1040 send_queue_generic_update_sender
1041 .send(SendQueueUpdate {
1042 room_id: room_id.clone(),
1043 update: RoomSendQueueUpdate::SentEvent {
1044 transaction_id: OwnedTransactionId::from("txnid1"),
1045 event_id: event_id!("$ev1").to_owned(),
1046 },
1047 })
1048 .unwrap();
1049
1050 for _ in 0..2 {
1052 assert!(
1053 listen_to_event_cache_and_send_queue_updates(
1054 &mut room_registration_receiver,
1055 &mut room_event_cache_generic_update_receiver,
1056 &mut send_queue_generic_update_receiver,
1057 &mut listened_rooms,
1058 &latest_event_queue_sender,
1059 )
1060 .await
1061 .is_continue()
1062 );
1063 }
1064
1065 assert_eq!(listened_rooms.len(), 1);
1066 assert!(listened_rooms.contains(&room_id));
1067
1068 assert!(latest_event_queue_receiver.is_empty().not());
1070 }
1071 }
1072
1073 #[async_test]
1074 async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
1075 let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1076 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1077 broadcast::channel(1);
1078 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1079 broadcast::channel(1);
1080 let mut listened_rooms = HashSet::new();
1081 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1082
1083 drop(room_event_cache_generic_update_sender);
1085
1086 assert!(
1088 listen_to_event_cache_and_send_queue_updates(
1089 &mut room_registration_receiver,
1090 &mut room_event_cache_generic_update_receiver,
1091 &mut send_queue_generic_update_receiver,
1092 &mut listened_rooms,
1093 &latest_event_queue_sender,
1094 )
1095 .await
1096 .is_break()
1098 );
1099
1100 assert_eq!(listened_rooms.len(), 0);
1101 assert!(latest_event_queue_receiver.is_empty());
1102 }
1103
1104 #[async_test]
1105 async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
1106 let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1107 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1108 broadcast::channel(1);
1109 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1110 broadcast::channel(1);
1111 let mut listened_rooms = HashSet::new();
1112 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1113
1114 drop(send_queue_generic_update_sender);
1116
1117 assert!(
1119 listen_to_event_cache_and_send_queue_updates(
1120 &mut room_registration_receiver,
1121 &mut room_event_cache_generic_update_receiver,
1122 &mut send_queue_generic_update_receiver,
1123 &mut listened_rooms,
1124 &latest_event_queue_sender,
1125 )
1126 .await
1127 .is_break()
1129 );
1130
1131 assert_eq!(listened_rooms.len(), 0);
1132 assert!(latest_event_queue_receiver.is_empty());
1133 }
1134
1135 #[async_test]
1136 async fn test_latest_event_value_is_updated_via_event_cache() {
1137 let room_id = owned_room_id!("!r0");
1138 let user_id = user_id!("@mnt_io:matrix.org");
1139 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1140 let event_id_0 = event_id!("$ev0");
1141 let event_id_1 = event_id!("$ev1");
1142 let event_id_2 = event_id!("$ev2");
1143
1144 let server = MatrixMockServer::new().await;
1145 let client = server.client_builder().build().await;
1146
1147 {
1149 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1151
1152 client
1154 .event_cache_store()
1155 .lock()
1156 .await
1157 .unwrap()
1158 .handle_linked_chunk_updates(
1159 LinkedChunkId::Room(&room_id),
1160 vec![
1161 Update::NewItemsChunk {
1162 previous: None,
1163 new: ChunkIdentifier::new(0),
1164 next: None,
1165 },
1166 Update::PushItems {
1167 at: Position::new(ChunkIdentifier::new(0), 0),
1168 items: vec![
1169 event_factory.text_msg("hello").event_id(event_id_0).into(),
1170 event_factory.text_msg("world").event_id(event_id_1).into(),
1171 ],
1172 },
1173 ],
1174 )
1175 .await
1176 .unwrap();
1177 }
1178
1179 let event_cache = client.event_cache();
1180 event_cache.subscribe().unwrap();
1181
1182 let latest_events = client.latest_events().await;
1183
1184 let mut latest_event_stream =
1186 latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1187
1188 assert_matches!(
1191 latest_event_stream.get().await,
1192 LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1193 assert_matches!(
1194 event.deserialize().unwrap(),
1195 AnySyncTimelineEvent::MessageLike(
1196 AnySyncMessageLikeEvent::RoomMessage(
1197 SyncMessageLikeEvent::Original(message_content)
1198 )
1199 ) => {
1200 assert_eq!(message_content.content.body(), "world");
1201 }
1202 );
1203 }
1204 );
1205
1206 assert_pending!(latest_event_stream);
1208
1209 server
1211 .sync_room(
1212 &client,
1213 JoinedRoomBuilder::new(&room_id)
1214 .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_2)),
1215 )
1216 .await;
1217
1218 assert_matches!(
1222 latest_event_stream.next().await,
1223 Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1224 assert_matches!(
1225 event.deserialize().unwrap(),
1226 AnySyncTimelineEvent::MessageLike(
1227 AnySyncMessageLikeEvent::RoomMessage(
1228 SyncMessageLikeEvent::Original(message_content)
1229 )
1230 ) => {
1231 assert_eq!(message_content.content.body(), "raclette !");
1232 }
1233 );
1234 }
1235 );
1236 }
1237}