1mod error;
50mod latest_event;
51
52use std::{
53 collections::{HashMap, HashSet},
54 ops::{ControlFlow, Not},
55 sync::Arc,
56};
57
58pub use error::LatestEventsError;
59use eyeball::{AsyncLock, Subscriber};
60use futures_util::FutureExt;
61use latest_event::LatestEvent;
62pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
63use matrix_sdk_common::executor::{spawn, AbortOnDrop, JoinHandleExt as _};
64use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId};
65use tokio::{
66 select,
67 sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard},
68};
69use tracing::{error, warn};
70
71use crate::{
72 client::WeakClient,
73 event_cache::{EventCache, EventCacheError, RoomEventCache, RoomEventCacheGenericUpdate},
74 room::WeakRoom,
75 send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
76};
77
78#[derive(Clone, Debug)]
80pub struct LatestEvents {
81 state: Arc<LatestEventsState>,
82}
83
84#[derive(Debug)]
86struct LatestEventsState {
87 registered_rooms: Arc<RegisteredRooms>,
89
90 _listen_task_handle: AbortOnDrop<()>,
93
94 _computation_task_handle: AbortOnDrop<()>,
96}
97
98impl LatestEvents {
99 pub(crate) fn new(
101 weak_client: WeakClient,
102 event_cache: EventCache,
103 send_queue: SendQueue,
104 ) -> Self {
105 let (room_registration_sender, room_registration_receiver) = mpsc::channel(32);
106 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
107
108 let registered_rooms =
109 Arc::new(RegisteredRooms::new(room_registration_sender, weak_client, &event_cache));
110
111 let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task(
113 registered_rooms.clone(),
114 room_registration_receiver,
115 event_cache,
116 send_queue,
117 latest_event_queue_sender,
118 ))
119 .abort_on_drop();
120
121 let computation_task_handle = spawn(compute_latest_events_task(
123 registered_rooms.clone(),
124 latest_event_queue_receiver,
125 ))
126 .abort_on_drop();
127
128 Self {
129 state: Arc::new(LatestEventsState {
130 registered_rooms,
131 _listen_task_handle: listen_task_handle,
132 _computation_task_handle: computation_task_handle,
133 }),
134 }
135 }
136
137 pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
141 Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
142 }
143
144 pub async fn listen_and_subscribe_to_room(
150 &self,
151 room_id: &RoomId,
152 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
153 let Some(latest_event) = self.state.registered_rooms.for_room(room_id).await? else {
154 return Ok(None);
155 };
156
157 Ok(Some(latest_event.subscribe().await))
158 }
159
160 pub async fn listen_to_thread(
165 &self,
166 room_id: &RoomId,
167 thread_id: &EventId,
168 ) -> Result<bool, LatestEventsError> {
169 Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
170 }
171
172 pub async fn listen_and_subscribe_to_thread(
178 &self,
179 room_id: &RoomId,
180 thread_id: &EventId,
181 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
182 let Some(latest_event) = self.state.registered_rooms.for_thread(room_id, thread_id).await?
183 else {
184 return Ok(None);
185 };
186
187 Ok(Some(latest_event.subscribe().await))
188 }
189
190 pub async fn forget_room(&self, room_id: &RoomId) {
197 self.state.registered_rooms.forget_room(room_id).await;
198 }
199
200 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
208 self.state.registered_rooms.forget_thread(room_id, thread_id).await;
209 }
210}
211
212#[derive(Debug)]
213struct RegisteredRooms {
214 rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
216
217 room_registration_sender: mpsc::Sender<RoomRegistration>,
227
228 weak_client: WeakClient,
230
231 event_cache: EventCache,
233}
234
235impl RegisteredRooms {
236 fn new(
237 room_registration_sender: mpsc::Sender<RoomRegistration>,
238 weak_client: WeakClient,
239 event_cache: &EventCache,
240 ) -> Self {
241 Self {
242 rooms: RwLock::new(HashMap::default()),
243 room_registration_sender,
244 weak_client,
245 event_cache: event_cache.clone(),
246 }
247 }
248
249 async fn room_latest_event(
256 &self,
257 room_id: &RoomId,
258 thread_id: Option<&EventId>,
259 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
260 Ok(match thread_id {
261 Some(thread_id) => {
267 let mut rooms = self.rooms.write().await;
268
269 if rooms.contains_key(room_id).not() {
271 if let Some(room_latest_event) = RoomLatestEvents::new(
273 WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
274 &self.event_cache,
275 )
276 .await?
277 {
278 rooms.insert(room_id.to_owned(), room_latest_event);
279
280 let _ = self
281 .room_registration_sender
282 .send(RoomRegistration::Add(room_id.to_owned()))
283 .await;
284 }
285 }
286
287 if let Some(room_latest_event) = rooms.get_mut(room_id) {
288 if room_latest_event.per_thread.contains_key(thread_id).not() {
291 room_latest_event.per_thread.insert(
292 thread_id.to_owned(),
293 room_latest_event.create_latest_event_for(Some(thread_id)).await,
294 );
295 }
296 }
297
298 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
299 }
300
301 None => {
304 match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
305 .ok()
306 {
307 value @ Some(_) => value,
308 None => {
309 let mut rooms = self.rooms.write().await;
310
311 if rooms.contains_key(room_id).not() {
312 if let Some(room_latest_event) = RoomLatestEvents::new(
314 WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
315 &self.event_cache,
316 )
317 .await?
318 {
319 rooms.insert(room_id.to_owned(), room_latest_event);
320
321 let _ = self
322 .room_registration_sender
323 .send(RoomRegistration::Add(room_id.to_owned()))
324 .await;
325 }
326 }
327
328 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
329 }
330 }
331 }
332 })
333 }
334
335 pub async fn for_room(
340 &self,
341 room_id: &RoomId,
342 ) -> Result<Option<RwLockReadGuard<'_, LatestEvent>>, LatestEventsError> {
343 Ok(self.room_latest_event(room_id, None).await?.map(|lock_guard| {
344 RwLockReadGuard::map(lock_guard, |room_latest_event| room_latest_event.for_room())
345 }))
346 }
347
348 pub async fn for_thread(
353 &self,
354 room_id: &RoomId,
355 thread_id: &EventId,
356 ) -> Result<Option<RwLockReadGuard<'_, LatestEvent>>, LatestEventsError> {
357 Ok(self.room_latest_event(room_id, Some(thread_id)).await?.and_then(|lock_guard| {
358 RwLockReadGuard::try_map(lock_guard, |room_latest_event| {
359 room_latest_event.for_thread(thread_id)
360 })
361 .ok()
362 }))
363 }
364
365 pub async fn forget_room(&self, room_id: &RoomId) {
372 let mut rooms = self.rooms.write().await;
373
374 rooms.remove(room_id);
376
377 let _ =
378 self.room_registration_sender.send(RoomRegistration::Remove(room_id.to_owned())).await;
379 }
380
381 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
389 let mut rooms = self.rooms.write().await;
390
391 if let Some(room_latest_event) = rooms.get_mut(room_id) {
393 room_latest_event.per_thread.remove(thread_id);
394 }
395 }
396}
397
398#[derive(Debug)]
404enum RoomRegistration {
405 Add(OwnedRoomId),
407
408 Remove(OwnedRoomId),
410}
411
412enum LatestEventQueueUpdate {
415 EventCache {
417 room_id: OwnedRoomId,
419 },
420
421 SendQueue {
423 room_id: OwnedRoomId,
425
426 update: RoomSendQueueUpdate,
428 },
429}
430
431#[derive(Debug)]
433struct RoomLatestEvents {
434 for_the_room: LatestEvent,
436
437 per_thread: HashMap<OwnedEventId, LatestEvent>,
439
440 room_event_cache: RoomEventCache,
442
443 weak_room: WeakRoom,
448}
449
450impl RoomLatestEvents {
451 async fn new(
452 weak_room: WeakRoom,
453 event_cache: &EventCache,
454 ) -> Result<Option<Self>, LatestEventsError> {
455 let room_id = weak_room.room_id();
456 let room_event_cache = match event_cache.for_room(room_id).await {
457 Ok((room_event_cache, _drop_handles)) => room_event_cache,
460 Err(EventCacheError::RoomNotFound { .. }) => return Ok(None),
461 Err(err) => return Err(LatestEventsError::EventCache(err)),
462 };
463
464 Ok(Some(Self {
465 for_the_room: Self::create_latest_event_for_inner(&weak_room, None, &room_event_cache)
466 .await,
467 per_thread: HashMap::new(),
468 weak_room,
469 room_event_cache,
470 }))
471 }
472
473 async fn create_latest_event_for(&self, thread_id: Option<&EventId>) -> LatestEvent {
474 Self::create_latest_event_for_inner(&self.weak_room, thread_id, &self.room_event_cache)
475 .await
476 }
477
478 async fn create_latest_event_for_inner(
479 weak_room: &WeakRoom,
480 thread_id: Option<&EventId>,
481 room_event_cache: &RoomEventCache,
482 ) -> LatestEvent {
483 LatestEvent::new(weak_room, thread_id, room_event_cache).await
484 }
485
486 fn for_room(&self) -> &LatestEvent {
488 &self.for_the_room
489 }
490
491 fn for_thread(&self, thread_id: &EventId) -> Option<&LatestEvent> {
493 self.per_thread.get(thread_id)
494 }
495
496 async fn update_with_event_cache(&mut self) {
499 let room = self.weak_room.get();
505 let power_levels = match &room {
506 Some(room) => {
507 let power_levels = room.power_levels().await.ok();
508
509 Some(room.own_user_id()).zip(power_levels)
510 }
511
512 None => None,
513 };
514
515 self.for_the_room.update_with_event_cache(&self.room_event_cache, &power_levels).await;
516
517 for latest_event in self.per_thread.values_mut() {
518 latest_event.update_with_event_cache(&self.room_event_cache, &power_levels).await;
519 }
520 }
521
522 async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
525 let room = self.weak_room.get();
531 let power_levels = match &room {
532 Some(room) => {
533 let power_levels = room.power_levels().await.ok();
534
535 Some(room.own_user_id()).zip(power_levels)
536 }
537
538 None => None,
539 };
540
541 self.for_the_room
542 .update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels)
543 .await;
544
545 for latest_event in self.per_thread.values_mut() {
546 latest_event
547 .update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels)
548 .await;
549 }
550 }
551}
552
553async fn listen_to_event_cache_and_send_queue_updates_task(
565 registered_rooms: Arc<RegisteredRooms>,
566 mut room_registration_receiver: mpsc::Receiver<RoomRegistration>,
567 event_cache: EventCache,
568 send_queue: SendQueue,
569 latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
570) {
571 let mut event_cache_generic_updates_subscriber =
572 event_cache.subscribe_to_room_generic_updates();
573 let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
574
575 let mut listened_rooms =
581 HashSet::from_iter(registered_rooms.rooms.read().await.keys().cloned());
582
583 loop {
584 if listen_to_event_cache_and_send_queue_updates(
585 &mut room_registration_receiver,
586 &mut event_cache_generic_updates_subscriber,
587 &mut send_queue_generic_updates_subscriber,
588 &mut listened_rooms,
589 &latest_event_queue_sender,
590 )
591 .await
592 .is_break()
593 {
594 warn!("`listen_to_event_cache_and_send_queue_updates_task` has stopped");
595
596 break;
597 }
598 }
599}
600
601async fn listen_to_event_cache_and_send_queue_updates(
606 room_registration_receiver: &mut mpsc::Receiver<RoomRegistration>,
607 event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
608 send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
609 listened_rooms: &mut HashSet<OwnedRoomId>,
610 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
611) -> ControlFlow<()> {
612 select! {
615 biased;
616
617 update = room_registration_receiver.recv().fuse() => {
618 match update {
619 Some(RoomRegistration::Add(room_id)) => {
620 listened_rooms.insert(room_id);
621 }
622 Some(RoomRegistration::Remove(room_id)) => {
623 listened_rooms.remove(&room_id);
624 }
625 None => {
626 error!("`room_registration` channel has been closed");
627
628 return ControlFlow::Break(());
629 }
630 }
631 }
632
633 room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv().fuse() => {
634 if let Ok(room_event_cache_generic_update) = room_event_cache_generic_update {
635 let room_id = room_event_cache_generic_update.room_id;
636
637 if listened_rooms.contains(&room_id) {
638 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
639 room_id
640 });
641 }
642 } else {
643 warn!("`event_cache_generic_updates` channel has been closed");
644
645 return ControlFlow::Break(());
646 }
647 }
648
649 send_queue_generic_update = send_queue_generic_updates_subscriber.recv().fuse() => {
650 if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
651 if listened_rooms.contains(&room_id) {
652 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
653 room_id,
654 update
655 });
656 }
657 } else {
658 warn!("`send_queue_generic_updates` channel has been closed");
659
660 return ControlFlow::Break(());
661 }
662 }
663 }
664
665 ControlFlow::Continue(())
666}
667
668async fn compute_latest_events_task(
674 registered_rooms: Arc<RegisteredRooms>,
675 mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
676) {
677 const BUFFER_SIZE: usize = 16;
678
679 let mut buffer = Vec::with_capacity(BUFFER_SIZE);
680
681 while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
682 compute_latest_events(®istered_rooms, &buffer).await;
683 buffer.clear();
684 }
685
686 warn!("`compute_latest_events_task` has stopped");
687}
688
689async fn compute_latest_events(
690 registered_rooms: &RegisteredRooms,
691 latest_event_queue_updates: &[LatestEventQueueUpdate],
692) {
693 for latest_event_queue_update in latest_event_queue_updates {
694 match latest_event_queue_update {
695 LatestEventQueueUpdate::EventCache { room_id } => {
696 let mut rooms = registered_rooms.rooms.write().await;
697
698 if let Some(room_latest_events) = rooms.get_mut(room_id) {
699 room_latest_events.update_with_event_cache().await;
700 } else {
701 error!(?room_id, "Failed to find the room");
702
703 continue;
704 }
705 }
706
707 LatestEventQueueUpdate::SendQueue { room_id, update } => {
708 let mut rooms = registered_rooms.rooms.write().await;
709
710 if let Some(room_latest_events) = rooms.get_mut(room_id) {
711 room_latest_events.update_with_send_queue(update).await;
712 } else {
713 error!(?room_id, "Failed to find the room");
714
715 continue;
716 }
717 }
718 }
719 }
720}
721
722#[cfg(all(test, not(target_family = "wasm")))]
723mod tests {
724 use std::ops::Not;
725
726 use assert_matches::assert_matches;
727 use matrix_sdk_base::{
728 deserialized_responses::TimelineEventKind,
729 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
730 RoomState,
731 };
732 use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder};
733 use ruma::{
734 event_id,
735 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent},
736 owned_room_id, room_id, user_id, OwnedTransactionId,
737 };
738 use stream_assert::assert_pending;
739
740 use super::{
741 broadcast, listen_to_event_cache_and_send_queue_updates, mpsc, HashSet, LatestEventValue,
742 RemoteLatestEventValue, RoomEventCacheGenericUpdate, RoomRegistration, RoomSendQueueUpdate,
743 SendQueueUpdate,
744 };
745 use crate::test_utils::mocks::MatrixMockServer;
746
747 #[async_test]
748 async fn test_latest_events_are_lazy() {
749 let room_id_0 = room_id!("!r0");
750 let room_id_1 = room_id!("!r1");
751 let room_id_2 = room_id!("!r2");
752 let thread_id_1_0 = event_id!("$ev1.0");
753 let thread_id_2_0 = event_id!("$ev2.0");
754
755 let server = MatrixMockServer::new().await;
756 let client = server.client_builder().build().await;
757
758 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
759 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
760 client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
761
762 client.event_cache().subscribe().unwrap();
763
764 let latest_events = client.latest_events().await;
765
766 assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
768
769 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
771 assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
772
773 {
774 let rooms = latest_events.state.registered_rooms.rooms.read().await;
775 assert_eq!(rooms.len(), 2);
777 assert!(rooms.contains_key(room_id_0));
779 assert!(rooms.contains_key(room_id_1));
780
781 assert!(rooms.get(room_id_0).unwrap().per_thread.is_empty());
783 assert!(rooms.get(room_id_1).unwrap().per_thread.is_empty());
785 }
786
787 assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
789 assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
790
791 {
792 let rooms = latest_events.state.registered_rooms.rooms.read().await;
793 assert_eq!(rooms.len(), 3);
795 assert!(rooms.contains_key(room_id_0));
797 assert!(rooms.contains_key(room_id_1));
798 assert!(rooms.contains_key(room_id_2));
799
800 assert!(rooms.get(room_id_0).unwrap().per_thread.is_empty());
802 assert_eq!(rooms.get(room_id_1).unwrap().per_thread.len(), 1);
804 assert!(rooms.get(room_id_1).unwrap().per_thread.contains_key(thread_id_1_0));
806 assert_eq!(rooms.get(room_id_2).unwrap().per_thread.len(), 1);
808 assert!(rooms.get(room_id_2).unwrap().per_thread.contains_key(thread_id_2_0));
810 }
811 }
812
813 #[async_test]
814 async fn test_forget_room() {
815 let room_id_0 = room_id!("!r0");
816 let room_id_1 = room_id!("!r1");
817
818 let server = MatrixMockServer::new().await;
819 let client = server.client_builder().build().await;
820
821 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
822 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
823
824 client.event_cache().subscribe().unwrap();
825
826 let latest_events = client.latest_events().await;
827
828 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
830
831 {
832 let rooms = latest_events.state.registered_rooms.rooms.read().await;
833 assert_eq!(rooms.len(), 1);
835 assert!(rooms.contains_key(room_id_0));
837
838 assert!(rooms.get(room_id_0).unwrap().per_thread.is_empty());
840 }
841
842 latest_events.forget_room(room_id_0).await;
844
845 {
846 let rooms = latest_events.state.registered_rooms.rooms.read().await;
847 assert!(rooms.is_empty());
849 }
850 }
851
852 #[async_test]
853 async fn test_forget_thread() {
854 let room_id_0 = room_id!("!r0");
855 let room_id_1 = room_id!("!r1");
856 let thread_id_0_0 = event_id!("$ev0.0");
857
858 let server = MatrixMockServer::new().await;
859 let client = server.client_builder().build().await;
860
861 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
862 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
863
864 client.event_cache().subscribe().unwrap();
865
866 let latest_events = client.latest_events().await;
867
868 assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
870
871 {
872 let rooms = latest_events.state.registered_rooms.rooms.read().await;
873 assert_eq!(rooms.len(), 1);
875 assert!(rooms.contains_key(room_id_0));
877
878 assert_eq!(rooms.get(room_id_0).unwrap().per_thread.len(), 1);
880 assert!(rooms.get(room_id_0).unwrap().per_thread.contains_key(thread_id_0_0));
882 }
883
884 latest_events.forget_thread(room_id_0, thread_id_0_0).await;
886
887 {
888 let rooms = latest_events.state.registered_rooms.rooms.read().await;
889 assert_eq!(rooms.len(), 1);
891 assert!(rooms.contains_key(room_id_0));
893
894 assert!(rooms.get(room_id_0).unwrap().per_thread.is_empty());
896 }
897 }
898
899 #[async_test]
900 async fn test_inputs_task_can_listen_to_room_registration() {
901 let room_id_0 = owned_room_id!("!r0");
902 let room_id_1 = owned_room_id!("!r1");
903
904 let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
905 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
906 broadcast::channel(1);
907 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
908 broadcast::channel(1);
909 let mut listened_rooms = HashSet::new();
910 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
911
912 {
914 room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
916
917 assert!(listen_to_event_cache_and_send_queue_updates(
919 &mut room_registration_receiver,
920 &mut room_event_cache_generic_update_receiver,
921 &mut send_queue_generic_update_receiver,
922 &mut listened_rooms,
923 &latest_event_queue_sender,
924 )
925 .await
926 .is_continue());
927
928 assert_eq!(listened_rooms.len(), 1);
929 assert!(listened_rooms.contains(&room_id_0));
930 assert!(latest_event_queue_receiver.is_empty());
931 }
932
933 {
935 room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
936
937 assert!(listen_to_event_cache_and_send_queue_updates(
939 &mut room_registration_receiver,
940 &mut room_event_cache_generic_update_receiver,
941 &mut send_queue_generic_update_receiver,
942 &mut listened_rooms,
943 &latest_event_queue_sender,
944 )
945 .await
946 .is_continue());
947
948 assert_eq!(listened_rooms.len(), 1);
950 assert!(listened_rooms.contains(&room_id_0));
951 assert!(latest_event_queue_receiver.is_empty());
952 }
953
954 {
956 room_registration_sender
957 .send(RoomRegistration::Add(room_id_1.to_owned()))
958 .await
959 .unwrap();
960
961 assert!(listen_to_event_cache_and_send_queue_updates(
963 &mut room_registration_receiver,
964 &mut room_event_cache_generic_update_receiver,
965 &mut send_queue_generic_update_receiver,
966 &mut listened_rooms,
967 &latest_event_queue_sender,
968 )
969 .await
970 .is_continue());
971
972 assert_eq!(listened_rooms.len(), 2);
974 assert!(listened_rooms.contains(&room_id_0));
975 assert!(listened_rooms.contains(&room_id_1));
976 assert!(latest_event_queue_receiver.is_empty());
977 }
978 }
979
980 #[async_test]
981 async fn test_inputs_task_stops_when_room_registration_channel_is_closed() {
982 let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
983 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
984 broadcast::channel(1);
985 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
986 broadcast::channel(1);
987 let mut listened_rooms = HashSet::new();
988 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
989
990 room_registration_receiver.close();
992
993 assert!(listen_to_event_cache_and_send_queue_updates(
995 &mut room_registration_receiver,
996 &mut room_event_cache_generic_update_receiver,
997 &mut send_queue_generic_update_receiver,
998 &mut listened_rooms,
999 &latest_event_queue_sender,
1000 )
1001 .await
1002 .is_break());
1004
1005 assert_eq!(listened_rooms.len(), 0);
1006 assert!(latest_event_queue_receiver.is_empty());
1007 }
1008
1009 #[async_test]
1010 async fn test_inputs_task_can_listen_to_room_event_cache() {
1011 let room_id = owned_room_id!("!r0");
1012
1013 let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1014 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1015 broadcast::channel(1);
1016 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1017 broadcast::channel(1);
1018 let mut listened_rooms = HashSet::new();
1019 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1020
1021 {
1023 room_event_cache_generic_update_sender
1024 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
1025 .unwrap();
1026
1027 assert!(listen_to_event_cache_and_send_queue_updates(
1029 &mut room_registration_receiver,
1030 &mut room_event_cache_generic_update_receiver,
1031 &mut send_queue_generic_update_receiver,
1032 &mut listened_rooms,
1033 &latest_event_queue_sender,
1034 )
1035 .await
1036 .is_continue());
1037
1038 assert!(listened_rooms.is_empty());
1039
1040 assert!(latest_event_queue_receiver.is_empty());
1042 }
1043
1044 {
1046 room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
1047 room_event_cache_generic_update_sender
1048 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
1049 .unwrap();
1050
1051 for _ in 0..2 {
1054 assert!(listen_to_event_cache_and_send_queue_updates(
1055 &mut room_registration_receiver,
1056 &mut room_event_cache_generic_update_receiver,
1057 &mut send_queue_generic_update_receiver,
1058 &mut listened_rooms,
1059 &latest_event_queue_sender,
1060 )
1061 .await
1062 .is_continue());
1063 }
1064
1065 assert_eq!(listened_rooms.len(), 1);
1066 assert!(listened_rooms.contains(&room_id));
1067
1068 assert!(latest_event_queue_receiver.is_empty().not());
1070 }
1071 }
1072
1073 #[async_test]
1074 async fn test_inputs_task_can_listen_to_send_queue() {
1075 let room_id = owned_room_id!("!r0");
1076
1077 let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1078 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1079 broadcast::channel(1);
1080 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1081 broadcast::channel(1);
1082 let mut listened_rooms = HashSet::new();
1083 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1084
1085 {
1087 send_queue_generic_update_sender
1088 .send(SendQueueUpdate {
1089 room_id: room_id.clone(),
1090 update: RoomSendQueueUpdate::SentEvent {
1091 transaction_id: OwnedTransactionId::from("txnid0"),
1092 event_id: event_id!("$ev0").to_owned(),
1093 },
1094 })
1095 .unwrap();
1096
1097 assert!(listen_to_event_cache_and_send_queue_updates(
1099 &mut room_registration_receiver,
1100 &mut room_event_cache_generic_update_receiver,
1101 &mut send_queue_generic_update_receiver,
1102 &mut listened_rooms,
1103 &latest_event_queue_sender,
1104 )
1105 .await
1106 .is_continue());
1107
1108 assert!(listened_rooms.is_empty());
1109
1110 assert!(latest_event_queue_receiver.is_empty());
1112 }
1113
1114 {
1116 room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
1117 send_queue_generic_update_sender
1118 .send(SendQueueUpdate {
1119 room_id: room_id.clone(),
1120 update: RoomSendQueueUpdate::SentEvent {
1121 transaction_id: OwnedTransactionId::from("txnid1"),
1122 event_id: event_id!("$ev1").to_owned(),
1123 },
1124 })
1125 .unwrap();
1126
1127 for _ in 0..2 {
1129 assert!(listen_to_event_cache_and_send_queue_updates(
1130 &mut room_registration_receiver,
1131 &mut room_event_cache_generic_update_receiver,
1132 &mut send_queue_generic_update_receiver,
1133 &mut listened_rooms,
1134 &latest_event_queue_sender,
1135 )
1136 .await
1137 .is_continue());
1138 }
1139
1140 assert_eq!(listened_rooms.len(), 1);
1141 assert!(listened_rooms.contains(&room_id));
1142
1143 assert!(latest_event_queue_receiver.is_empty().not());
1145 }
1146 }
1147
1148 #[async_test]
1149 async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
1150 let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1151 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1152 broadcast::channel(1);
1153 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1154 broadcast::channel(1);
1155 let mut listened_rooms = HashSet::new();
1156 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1157
1158 drop(room_event_cache_generic_update_sender);
1160
1161 assert!(listen_to_event_cache_and_send_queue_updates(
1163 &mut room_registration_receiver,
1164 &mut room_event_cache_generic_update_receiver,
1165 &mut send_queue_generic_update_receiver,
1166 &mut listened_rooms,
1167 &latest_event_queue_sender,
1168 )
1169 .await
1170 .is_break());
1172
1173 assert_eq!(listened_rooms.len(), 0);
1174 assert!(latest_event_queue_receiver.is_empty());
1175 }
1176
1177 #[async_test]
1178 async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
1179 let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1180 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1181 broadcast::channel(1);
1182 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1183 broadcast::channel(1);
1184 let mut listened_rooms = HashSet::new();
1185 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1186
1187 drop(send_queue_generic_update_sender);
1189
1190 assert!(listen_to_event_cache_and_send_queue_updates(
1192 &mut room_registration_receiver,
1193 &mut room_event_cache_generic_update_receiver,
1194 &mut send_queue_generic_update_receiver,
1195 &mut listened_rooms,
1196 &latest_event_queue_sender,
1197 )
1198 .await
1199 .is_break());
1201
1202 assert_eq!(listened_rooms.len(), 0);
1203 assert!(latest_event_queue_receiver.is_empty());
1204 }
1205
1206 #[async_test]
1207 async fn test_latest_event_value_is_updated_via_event_cache() {
1208 let room_id = owned_room_id!("!r0");
1209 let user_id = user_id!("@mnt_io:matrix.org");
1210 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1211 let event_id_0 = event_id!("$ev0");
1212 let event_id_1 = event_id!("$ev1");
1213 let event_id_2 = event_id!("$ev2");
1214
1215 let server = MatrixMockServer::new().await;
1216 let client = server.client_builder().build().await;
1217
1218 {
1220 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1222
1223 client
1225 .event_cache_store()
1226 .lock()
1227 .await
1228 .unwrap()
1229 .handle_linked_chunk_updates(
1230 LinkedChunkId::Room(&room_id),
1231 vec![
1232 Update::NewItemsChunk {
1233 previous: None,
1234 new: ChunkIdentifier::new(0),
1235 next: None,
1236 },
1237 Update::PushItems {
1238 at: Position::new(ChunkIdentifier::new(0), 0),
1239 items: vec![
1240 event_factory.text_msg("hello").event_id(event_id_0).into(),
1241 event_factory.text_msg("world").event_id(event_id_1).into(),
1242 ],
1243 },
1244 ],
1245 )
1246 .await
1247 .unwrap();
1248 }
1249
1250 let event_cache = client.event_cache();
1251 event_cache.subscribe().unwrap();
1252
1253 let latest_events = client.latest_events().await;
1254
1255 let mut latest_event_stream =
1257 latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1258
1259 assert_matches!(
1262 latest_event_stream.get().await,
1263 LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1264 assert_matches!(
1265 event.deserialize().unwrap(),
1266 AnySyncTimelineEvent::MessageLike(
1267 AnySyncMessageLikeEvent::RoomMessage(
1268 SyncMessageLikeEvent::Original(message_content)
1269 )
1270 ) => {
1271 assert_eq!(message_content.content.body(), "world");
1272 }
1273 );
1274 }
1275 );
1276
1277 assert_pending!(latest_event_stream);
1279
1280 server
1282 .sync_room(
1283 &client,
1284 JoinedRoomBuilder::new(&room_id).add_timeline_event(
1285 event_factory.text_msg("raclette !").event_id(event_id_2).into_raw(),
1286 ),
1287 )
1288 .await;
1289
1290 assert_matches!(
1294 latest_event_stream.next().await,
1295 Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1296 assert_matches!(
1297 event.deserialize().unwrap(),
1298 AnySyncTimelineEvent::MessageLike(
1299 AnySyncMessageLikeEvent::RoomMessage(
1300 SyncMessageLikeEvent::Original(message_content)
1301 )
1302 ) => {
1303 assert_eq!(message_content.content.body(), "raclette !");
1304 }
1305 );
1306 }
1307 );
1308 }
1309}