1mod error;
50mod latest_event;
51mod room_latest_events;
52
53use std::{
54 collections::HashMap,
55 ops::{ControlFlow, DerefMut, Not},
56 sync::Arc,
57};
58
59pub use error::LatestEventsError;
60use eyeball::{AsyncLock, Subscriber};
61use latest_event::{LatestEvent, With};
62pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
63use matrix_sdk_base::timer;
64use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
65use room_latest_events::RoomLatestEvents;
66use ruma::{EventId, OwnedRoomId, RoomId};
67use tokio::{
68 select,
69 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
70};
71use tracing::{error, warn};
72
73use crate::{
74 client::WeakClient,
75 event_cache::{EventCache, RoomEventCacheGenericUpdate},
76 room::WeakRoom,
77 send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
78};
79
80#[derive(Clone, Debug)]
82pub struct LatestEvents {
83 state: Arc<LatestEventsState>,
84}
85
86#[derive(Debug)]
88struct LatestEventsState {
89 registered_rooms: Arc<RegisteredRooms>,
91
92 _listen_task_handle: AbortOnDrop<()>,
95
96 _computation_task_handle: AbortOnDrop<()>,
98}
99
100impl LatestEvents {
101 pub(crate) fn new(
103 weak_client: WeakClient,
104 event_cache: EventCache,
105 send_queue: SendQueue,
106 ) -> Self {
107 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
108
109 let registered_rooms =
110 Arc::new(RegisteredRooms::new(weak_client, &event_cache, &latest_event_queue_sender));
111
112 let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task(
114 registered_rooms.clone(),
115 event_cache,
116 send_queue,
117 latest_event_queue_sender,
118 ))
119 .abort_on_drop();
120
121 let computation_task_handle = spawn(compute_latest_events_task(
123 registered_rooms.clone(),
124 latest_event_queue_receiver,
125 ))
126 .abort_on_drop();
127
128 Self {
129 state: Arc::new(LatestEventsState {
130 registered_rooms,
131 _listen_task_handle: listen_task_handle,
132 _computation_task_handle: computation_task_handle,
133 }),
134 }
135 }
136
137 pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
141 Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
142 }
143
144 #[cfg(test)]
148 pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
149 self.state.registered_rooms.rooms.read().await.contains_key(room_id)
150 }
151
152 pub async fn listen_and_subscribe_to_room(
158 &self,
159 room_id: &RoomId,
160 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
161 let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
162 return Ok(None);
163 };
164
165 let room_latest_events = room_latest_events.read().await;
166 let latest_event = room_latest_events.for_room();
167
168 Ok(Some(latest_event.subscribe().await))
169 }
170
171 pub async fn listen_to_thread(
176 &self,
177 room_id: &RoomId,
178 thread_id: &EventId,
179 ) -> Result<bool, LatestEventsError> {
180 Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
181 }
182
183 pub async fn listen_and_subscribe_to_thread(
189 &self,
190 room_id: &RoomId,
191 thread_id: &EventId,
192 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
193 let Some(room_latest_events) =
194 self.state.registered_rooms.for_thread(room_id, thread_id).await?
195 else {
196 return Ok(None);
197 };
198
199 let room_latest_events = room_latest_events.read().await;
200 let latest_event = room_latest_events
201 .for_thread(thread_id)
202 .expect("The `LatestEvent` for the thread must have been created");
203
204 Ok(Some(latest_event.subscribe().await))
205 }
206
207 pub async fn forget_room(&self, room_id: &RoomId) {
214 self.state.registered_rooms.forget_room(room_id).await;
215 }
216
217 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
225 self.state.registered_rooms.forget_thread(room_id, thread_id).await;
226 }
227}
228
229#[derive(Debug)]
230struct RegisteredRooms {
231 rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
233
234 weak_client: WeakClient,
236
237 event_cache: EventCache,
239
240 latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
245}
246
247impl RegisteredRooms {
248 fn new(
249 weak_client: WeakClient,
250 event_cache: &EventCache,
251 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
252 ) -> Self {
253 Self {
254 rooms: RwLock::new(HashMap::default()),
255 weak_client,
256 event_cache: event_cache.clone(),
257 latest_event_queue_sender: latest_event_queue_sender.clone(),
258 }
259 }
260
261 async fn room_latest_event(
268 &self,
269 room_id: &RoomId,
270 thread_id: Option<&EventId>,
271 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
272 fn create_and_insert_room_latest_events(
273 room_id: &RoomId,
274 rooms: &mut HashMap<OwnedRoomId, RoomLatestEvents>,
275 weak_client: &WeakClient,
276 event_cache: &EventCache,
277 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
278 ) {
279 let (room_latest_events, is_latest_event_value_none) =
280 With::unzip(RoomLatestEvents::new(
281 WeakRoom::new(weak_client.clone(), room_id.to_owned()),
282 event_cache,
283 ));
284
285 rooms.insert(room_id.to_owned(), room_latest_events);
287
288 if is_latest_event_value_none {
294 let _ = latest_event_queue_sender
295 .send(LatestEventQueueUpdate::EventCache { room_id: room_id.to_owned() });
296 }
297 }
298
299 Ok(match thread_id {
300 Some(thread_id) => {
306 let mut rooms = self.rooms.write().await;
307
308 if rooms.contains_key(room_id).not() {
310 create_and_insert_room_latest_events(
311 room_id,
312 rooms.deref_mut(),
313 &self.weak_client,
314 &self.event_cache,
315 &self.latest_event_queue_sender,
316 );
317 }
318
319 if let Some(room_latest_event) = rooms.get(room_id) {
320 let mut room_latest_event = room_latest_event.write().await;
321
322 if room_latest_event.has_thread(thread_id).not() {
325 room_latest_event.create_and_insert_latest_event_for_thread(thread_id);
326 }
327 }
328
329 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
330 }
331
332 None => {
335 match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
336 .ok()
337 {
338 value @ Some(_) => value,
339 None => {
340 let _timer = timer!(
341 tracing::Level::INFO,
342 format!("Creating `RoomLatestEvents` for {room_id:?}"),
343 );
344
345 let mut rooms = self.rooms.write().await;
346
347 if rooms.contains_key(room_id).not() {
348 create_and_insert_room_latest_events(
349 room_id,
350 rooms.deref_mut(),
351 &self.weak_client,
352 &self.event_cache,
353 &self.latest_event_queue_sender,
354 );
355 }
356
357 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
358 }
359 }
360 }
361 })
362 }
363
364 pub async fn for_room(
368 &self,
369 room_id: &RoomId,
370 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
371 self.room_latest_event(room_id, None).await
372 }
373
374 pub async fn for_thread(
378 &self,
379 room_id: &RoomId,
380 thread_id: &EventId,
381 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
382 self.room_latest_event(room_id, Some(thread_id)).await
383 }
384
385 pub async fn forget_room(&self, room_id: &RoomId) {
392 {
393 let mut rooms = self.rooms.write().await;
394
395 rooms.remove(room_id);
397 }
398 }
399
400 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
408 let rooms = self.rooms.read().await;
409
410 if let Some(room_latest_event) = rooms.get(room_id) {
412 let mut room_latest_event = room_latest_event.write().await;
413
414 drop(rooms);
416
417 room_latest_event.forget_thread(thread_id);
418 }
419 }
420}
421
422#[derive(Debug)]
425enum LatestEventQueueUpdate {
426 EventCache {
428 room_id: OwnedRoomId,
430 },
431
432 SendQueue {
434 room_id: OwnedRoomId,
436
437 update: RoomSendQueueUpdate,
439 },
440}
441
442async fn listen_to_event_cache_and_send_queue_updates_task(
449 registered_rooms: Arc<RegisteredRooms>,
450 event_cache: EventCache,
451 send_queue: SendQueue,
452 latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
453) {
454 let mut event_cache_generic_updates_subscriber =
455 event_cache.subscribe_to_room_generic_updates();
456 let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
457
458 loop {
459 if listen_to_event_cache_and_send_queue_updates(
460 ®istered_rooms.rooms,
461 &mut event_cache_generic_updates_subscriber,
462 &mut send_queue_generic_updates_subscriber,
463 &latest_event_queue_sender,
464 )
465 .await
466 .is_break()
467 {
468 warn!("`listen_to_event_cache_and_send_queue_updates_task` has stopped");
469
470 break;
471 }
472 }
473}
474
475async fn listen_to_event_cache_and_send_queue_updates(
480 registered_rooms: &RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
481 event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
482 send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
483 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
484) -> ControlFlow<()> {
485 select! {
486 room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv() => {
487 if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
488 if registered_rooms.read().await.contains_key(&room_id) {
489 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
490 room_id
491 });
492 }
493 } else {
494 warn!("`event_cache_generic_updates` channel has been closed");
495
496 return ControlFlow::Break(());
497 }
498 }
499
500 send_queue_generic_update = send_queue_generic_updates_subscriber.recv() => {
501 if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
502 if registered_rooms.read().await.contains_key(&room_id) {
503 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
504 room_id,
505 update
506 });
507 }
508 } else {
509 warn!("`send_queue_generic_updates` channel has been closed");
510
511 return ControlFlow::Break(());
512 }
513 }
514 }
515
516 ControlFlow::Continue(())
517}
518
519async fn compute_latest_events_task(
525 registered_rooms: Arc<RegisteredRooms>,
526 mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
527) {
528 const BUFFER_SIZE: usize = 16;
529
530 let mut buffer = Vec::with_capacity(BUFFER_SIZE);
531
532 while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
533 compute_latest_events(®istered_rooms, &buffer).await;
534 buffer.clear();
535 }
536
537 warn!("`compute_latest_events_task` has stopped");
538}
539
540async fn compute_latest_events(
541 registered_rooms: &RegisteredRooms,
542 latest_event_queue_updates: &[LatestEventQueueUpdate],
543) {
544 for latest_event_queue_update in latest_event_queue_updates {
545 match latest_event_queue_update {
546 LatestEventQueueUpdate::EventCache { room_id } => {
547 let rooms = registered_rooms.rooms.read().await;
548
549 if let Some(room_latest_events) = rooms.get(room_id) {
550 let mut room_latest_events = room_latest_events.write().await;
551
552 drop(rooms);
555
556 room_latest_events.update_with_event_cache().await;
557 } else {
558 error!(?room_id, "Failed to find the room");
559
560 continue;
561 }
562 }
563
564 LatestEventQueueUpdate::SendQueue { room_id, update } => {
565 let rooms = registered_rooms.rooms.read().await;
566
567 if let Some(room_latest_events) = rooms.get(room_id) {
568 let mut room_latest_events = room_latest_events.write().await;
569
570 drop(rooms);
573
574 room_latest_events.update_with_send_queue(update).await;
575 } else {
576 error!(?room_id, "Failed to find the room");
577
578 continue;
579 }
580 }
581 }
582 }
583}
584
585#[cfg(all(test, not(target_family = "wasm")))]
586mod tests {
587 use std::{collections::HashMap, ops::Not};
588
589 use assert_matches::assert_matches;
590 use matrix_sdk_base::{
591 RoomState,
592 deserialized_responses::TimelineEventKind,
593 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
594 store::SerializableEventContent,
595 };
596 use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
597 use ruma::{
598 MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
599 events::{
600 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
601 SyncMessageLikeEvent, room::message::RoomMessageEventContent,
602 },
603 owned_room_id, room_id, user_id,
604 };
605 use stream_assert::assert_pending;
606 use tokio::task::yield_now;
607
608 use super::{
609 LatestEventValue, LocalLatestEventValue, RegisteredRooms, RemoteLatestEventValue,
610 RoomEventCacheGenericUpdate, RoomLatestEvents, RoomSendQueueUpdate, RwLock,
611 SendQueueUpdate, WeakClient, WeakRoom, With, broadcast,
612 listen_to_event_cache_and_send_queue_updates, mpsc,
613 };
614 use crate::{latest_events::LatestEventQueueUpdate, test_utils::mocks::MatrixMockServer};
615
616 fn local_room_message(body: &str) -> LocalLatestEventValue {
617 LocalLatestEventValue {
618 timestamp: MilliSecondsSinceUnixEpoch::now(),
619 content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
620 RoomMessageEventContent::text_plain(body),
621 ))
622 .unwrap(),
623 }
624 }
625
626 #[async_test]
627 async fn test_latest_events_are_lazy() {
628 let room_id_0 = room_id!("!r0");
629 let room_id_1 = room_id!("!r1");
630 let room_id_2 = room_id!("!r2");
631 let thread_id_1_0 = event_id!("$ev1.0");
632 let thread_id_2_0 = event_id!("$ev2.0");
633
634 let server = MatrixMockServer::new().await;
635 let client = server.client_builder().build().await;
636
637 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
638 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
639 client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
640
641 client.event_cache().subscribe().unwrap();
642
643 let latest_events = client.latest_events().await;
644
645 assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
647
648 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
650 assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
651
652 {
653 let rooms = latest_events.state.registered_rooms.rooms.read().await;
654 assert_eq!(rooms.len(), 2);
656 assert!(rooms.contains_key(room_id_0));
658 assert!(rooms.contains_key(room_id_1));
659
660 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
662 assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
664 }
665
666 assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
668 assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
669
670 {
671 let rooms = latest_events.state.registered_rooms.rooms.read().await;
672 assert_eq!(rooms.len(), 3);
674 assert!(rooms.contains_key(room_id_0));
676 assert!(rooms.contains_key(room_id_1));
677 assert!(rooms.contains_key(room_id_2));
678
679 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
681 let room_1 = rooms.get(room_id_1).unwrap().read().await;
683 assert_eq!(room_1.per_thread().len(), 1);
684 assert!(room_1.per_thread().contains_key(thread_id_1_0));
686 let room_2 = rooms.get(room_id_2).unwrap().read().await;
688 assert_eq!(room_2.per_thread().len(), 1);
689 assert!(room_2.per_thread().contains_key(thread_id_2_0));
691 }
692 }
693
694 #[async_test]
695 async fn test_forget_room() {
696 let room_id_0 = room_id!("!r0");
697 let room_id_1 = room_id!("!r1");
698
699 let server = MatrixMockServer::new().await;
700 let client = server.client_builder().build().await;
701
702 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
703 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
704
705 client.event_cache().subscribe().unwrap();
706
707 let latest_events = client.latest_events().await;
708
709 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
711
712 {
713 let rooms = latest_events.state.registered_rooms.rooms.read().await;
714 assert_eq!(rooms.len(), 1);
716 assert!(rooms.contains_key(room_id_0));
718
719 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
721 }
722
723 latest_events.forget_room(room_id_0).await;
725
726 {
727 let rooms = latest_events.state.registered_rooms.rooms.read().await;
728 assert!(rooms.is_empty());
730 }
731 }
732
733 #[async_test]
734 async fn test_forget_thread() {
735 let room_id_0 = room_id!("!r0");
736 let room_id_1 = room_id!("!r1");
737 let thread_id_0_0 = event_id!("$ev0.0");
738
739 let server = MatrixMockServer::new().await;
740 let client = server.client_builder().build().await;
741
742 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
743 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
744
745 client.event_cache().subscribe().unwrap();
746
747 let latest_events = client.latest_events().await;
748
749 assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
751
752 {
753 let rooms = latest_events.state.registered_rooms.rooms.read().await;
754 assert_eq!(rooms.len(), 1);
756 assert!(rooms.contains_key(room_id_0));
758
759 let room_0 = rooms.get(room_id_0).unwrap().read().await;
761 assert_eq!(room_0.per_thread().len(), 1);
762 assert!(room_0.per_thread().contains_key(thread_id_0_0));
764 }
765
766 latest_events.forget_thread(room_id_0, thread_id_0_0).await;
768
769 {
770 let rooms = latest_events.state.registered_rooms.rooms.read().await;
771 assert_eq!(rooms.len(), 1);
773 assert!(rooms.contains_key(room_id_0));
775
776 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
778 }
779 }
780
781 #[async_test]
782 async fn test_inputs_task_can_listen_to_room_event_cache() {
783 let room_id = owned_room_id!("!r0");
784
785 let server = MatrixMockServer::new().await;
786 let client = server.client_builder().build().await;
787 let weak_client = WeakClient::from_client(&client);
788 let weak_room = WeakRoom::new(weak_client, room_id.clone());
789
790 let event_cache = client.event_cache();
791
792 let registered_rooms = RwLock::new(HashMap::new());
793 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
794 broadcast::channel(1);
795 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
796 broadcast::channel(1);
797 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
798
799 {
801 room_event_cache_generic_update_sender
802 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
803 .unwrap();
804
805 assert!(
807 listen_to_event_cache_and_send_queue_updates(
808 ®istered_rooms,
809 &mut room_event_cache_generic_update_receiver,
810 &mut send_queue_generic_update_receiver,
811 &latest_event_queue_sender,
812 )
813 .await
814 .is_continue()
815 );
816
817 assert!(latest_event_queue_receiver.is_empty());
819 }
820
821 {
823 registered_rooms.write().await.insert(
824 room_id.clone(),
825 With::inner(RoomLatestEvents::new(weak_room, event_cache)),
826 );
827 room_event_cache_generic_update_sender
828 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
829 .unwrap();
830
831 assert!(
832 listen_to_event_cache_and_send_queue_updates(
833 ®istered_rooms,
834 &mut room_event_cache_generic_update_receiver,
835 &mut send_queue_generic_update_receiver,
836 &latest_event_queue_sender,
837 )
838 .await
839 .is_continue()
840 );
841
842 assert!(latest_event_queue_receiver.is_empty().not());
844 }
845 }
846
847 #[async_test]
848 async fn test_inputs_task_can_listen_to_send_queue() {
849 let room_id = owned_room_id!("!r0");
850
851 let server = MatrixMockServer::new().await;
852 let client = server.client_builder().build().await;
853 let weak_client = WeakClient::from_client(&client);
854 let weak_room = WeakRoom::new(weak_client, room_id.clone());
855
856 let event_cache = client.event_cache();
857
858 let registered_rooms = RwLock::new(HashMap::new());
859
860 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
861 broadcast::channel(1);
862 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
863 broadcast::channel(1);
864 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
865
866 {
868 send_queue_generic_update_sender
869 .send(SendQueueUpdate {
870 room_id: room_id.clone(),
871 update: RoomSendQueueUpdate::SentEvent {
872 transaction_id: OwnedTransactionId::from("txnid0"),
873 event_id: event_id!("$ev0").to_owned(),
874 },
875 })
876 .unwrap();
877
878 assert!(
880 listen_to_event_cache_and_send_queue_updates(
881 ®istered_rooms,
882 &mut room_event_cache_generic_update_receiver,
883 &mut send_queue_generic_update_receiver,
884 &latest_event_queue_sender,
885 )
886 .await
887 .is_continue()
888 );
889
890 assert!(latest_event_queue_receiver.is_empty());
892 }
893
894 {
896 registered_rooms.write().await.insert(
897 room_id.clone(),
898 With::inner(RoomLatestEvents::new(weak_room, event_cache)),
899 );
900 send_queue_generic_update_sender
901 .send(SendQueueUpdate {
902 room_id: room_id.clone(),
903 update: RoomSendQueueUpdate::SentEvent {
904 transaction_id: OwnedTransactionId::from("txnid1"),
905 event_id: event_id!("$ev1").to_owned(),
906 },
907 })
908 .unwrap();
909
910 assert!(
911 listen_to_event_cache_and_send_queue_updates(
912 ®istered_rooms,
913 &mut room_event_cache_generic_update_receiver,
914 &mut send_queue_generic_update_receiver,
915 &latest_event_queue_sender,
916 )
917 .await
918 .is_continue()
919 );
920
921 assert!(latest_event_queue_receiver.is_empty().not());
923 }
924 }
925
926 #[async_test]
927 async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
928 let registered_rooms = RwLock::new(HashMap::new());
929 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
930 broadcast::channel(1);
931 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
932 broadcast::channel(1);
933 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
934
935 drop(room_event_cache_generic_update_sender);
937
938 assert!(
940 listen_to_event_cache_and_send_queue_updates(
941 ®istered_rooms,
942 &mut room_event_cache_generic_update_receiver,
943 &mut send_queue_generic_update_receiver,
944 &latest_event_queue_sender,
945 )
946 .await
947 .is_break()
949 );
950
951 assert!(latest_event_queue_receiver.is_empty());
952 }
953
954 #[async_test]
955 async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
956 let registered_rooms = RwLock::new(HashMap::new());
957 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
958 broadcast::channel(1);
959 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
960 broadcast::channel(1);
961 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
962
963 drop(send_queue_generic_update_sender);
965
966 assert!(
968 listen_to_event_cache_and_send_queue_updates(
969 ®istered_rooms,
970 &mut room_event_cache_generic_update_receiver,
971 &mut send_queue_generic_update_receiver,
972 &latest_event_queue_sender,
973 )
974 .await
975 .is_break()
977 );
978
979 assert!(latest_event_queue_receiver.is_empty());
980 }
981
982 #[async_test]
983 async fn test_latest_event_value_is_updated_via_event_cache() {
984 let room_id = owned_room_id!("!r0");
985 let user_id = user_id!("@mnt_io:matrix.org");
986 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
987 let event_id_2 = event_id!("$ev2");
988
989 let server = MatrixMockServer::new().await;
990 let client = server.client_builder().build().await;
991
992 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
994
995 let event_cache = client.event_cache();
996 event_cache.subscribe().unwrap();
997
998 let latest_events = client.latest_events().await;
999
1000 let mut latest_event_stream =
1002 latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1003
1004 assert_pending!(latest_event_stream);
1006
1007 server
1009 .sync_room(
1010 &client,
1011 JoinedRoomBuilder::new(&room_id)
1012 .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_2)),
1013 )
1014 .await;
1015
1016 assert_matches!(
1020 latest_event_stream.next().await,
1021 Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1022 assert_matches!(
1023 event.deserialize().unwrap(),
1024 AnySyncTimelineEvent::MessageLike(
1025 AnySyncMessageLikeEvent::RoomMessage(
1026 SyncMessageLikeEvent::Original(message_content)
1027 )
1028 ) => {
1029 assert_eq!(message_content.content.body(), "raclette !");
1030 }
1031 );
1032 }
1033 );
1034
1035 assert_pending!(latest_event_stream);
1036 }
1037
1038 #[async_test]
1039 async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily() {
1040 let room_id = owned_room_id!("!r0");
1041 let user_id = user_id!("@mnt_io:matrix.org");
1042 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1043 let event_id_0 = event_id!("$ev0");
1044
1045 let server = MatrixMockServer::new().await;
1046 let client = server.client_builder().build().await;
1047
1048 {
1050 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1052
1053 client
1055 .event_cache_store()
1056 .lock()
1057 .await
1058 .expect("Could not acquire the event cache lock")
1059 .as_clean()
1060 .expect("Could not acquire a clean event cache lock")
1061 .handle_linked_chunk_updates(
1062 LinkedChunkId::Room(&room_id),
1063 vec![
1064 Update::NewItemsChunk {
1065 previous: None,
1066 new: ChunkIdentifier::new(0),
1067 next: None,
1068 },
1069 Update::PushItems {
1070 at: Position::new(ChunkIdentifier::new(0), 0),
1071 items: vec![
1072 event_factory.text_msg("hello").event_id(event_id_0).into(),
1073 ],
1074 },
1075 ],
1076 )
1077 .await
1078 .unwrap();
1079 }
1080
1081 let event_cache = client.event_cache();
1082 event_cache.subscribe().unwrap();
1083
1084 let latest_events = client.latest_events().await;
1085
1086 let mut latest_event_stream =
1087 latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1088
1089 yield_now().await;
1095 assert_matches!(latest_event_stream.next_now().await, LatestEventValue::Remote(_));
1096
1097 assert_pending!(latest_event_stream);
1098 }
1099
1100 #[async_test]
1108 async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily_inner() {
1109 let room_id_0 = owned_room_id!("!r0");
1110 let room_id_1 = owned_room_id!("!r1");
1111
1112 let server = MatrixMockServer::new().await;
1113 let client = server.client_builder().build().await;
1114
1115 let room_0 = client.base_client().get_or_create_room(&room_id_0, RoomState::Joined);
1117 let room_1 = client.base_client().get_or_create_room(&room_id_1, RoomState::Joined);
1118
1119 let mut room_info_1 = room_0.clone_info();
1122 room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
1123 room_1.set_room_info(room_info_1, Default::default());
1124
1125 let weak_client = WeakClient::from_client(&client);
1126
1127 let event_cache = client.event_cache();
1128 event_cache.subscribe().unwrap();
1129
1130 let (latest_event_queue_sender, mut latest_event_queue_receiver) =
1131 mpsc::unbounded_channel();
1132
1133 let registered_rooms =
1134 RegisteredRooms::new(weak_client, event_cache, &latest_event_queue_sender);
1135
1136 {
1139 let room_latest_events = registered_rooms.for_room(&room_id_0).await.unwrap().unwrap();
1140 assert_matches!(
1141 room_latest_events.read().await.for_room().get().await,
1142 LatestEventValue::None
1143 );
1144 assert_matches!(
1145 latest_event_queue_receiver.recv().await,
1146 Some(LatestEventQueueUpdate::EventCache { room_id }) => {
1147 assert_eq!(room_id, room_id_0);
1148 }
1149 );
1150 assert!(latest_event_queue_receiver.is_empty());
1151 }
1152
1153 {
1156 let room_latest_events = registered_rooms.for_room(&room_id_1).await.unwrap().unwrap();
1157 assert_matches!(
1158 room_latest_events.read().await.for_room().get().await,
1159 LatestEventValue::LocalIsSending(_)
1160 );
1161 assert!(latest_event_queue_receiver.is_empty());
1162 }
1163 }
1164}