1mod error;
50mod latest_event;
51mod room_latest_events;
52
53use std::{
54 collections::HashMap,
55 ops::{ControlFlow, DerefMut, Not},
56 sync::Arc,
57};
58
59pub use error::LatestEventsError;
60use eyeball::{AsyncLock, Subscriber};
61use latest_event::{LatestEvent, With};
62pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
63use matrix_sdk_base::timer;
64use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
65use room_latest_events::RoomLatestEvents;
66use ruma::{EventId, OwnedRoomId, RoomId};
67use tokio::{
68 select,
69 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
70};
71use tracing::{error, warn};
72
73use crate::{
74 client::WeakClient,
75 event_cache::{EventCache, RoomEventCacheGenericUpdate},
76 room::WeakRoom,
77 send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
78};
79
80#[derive(Clone, Debug)]
82pub struct LatestEvents {
83 state: Arc<LatestEventsState>,
84}
85
86#[derive(Debug)]
88struct LatestEventsState {
89 registered_rooms: Arc<RegisteredRooms>,
91
92 _listen_task_handle: AbortOnDrop<()>,
95
96 _computation_task_handle: AbortOnDrop<()>,
98}
99
100impl LatestEvents {
101 pub(crate) fn new(
103 weak_client: WeakClient,
104 event_cache: EventCache,
105 send_queue: SendQueue,
106 ) -> Self {
107 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
108
109 let registered_rooms =
110 Arc::new(RegisteredRooms::new(weak_client, &event_cache, &latest_event_queue_sender));
111
112 let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task(
114 registered_rooms.clone(),
115 event_cache,
116 send_queue,
117 latest_event_queue_sender,
118 ))
119 .abort_on_drop();
120
121 let computation_task_handle = spawn(compute_latest_events_task(
123 registered_rooms.clone(),
124 latest_event_queue_receiver,
125 ))
126 .abort_on_drop();
127
128 Self {
129 state: Arc::new(LatestEventsState {
130 registered_rooms,
131 _listen_task_handle: listen_task_handle,
132 _computation_task_handle: computation_task_handle,
133 }),
134 }
135 }
136
137 pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
141 Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
142 }
143
144 #[cfg(test)]
148 pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
149 self.state.registered_rooms.rooms.read().await.contains_key(room_id)
150 }
151
152 pub async fn listen_and_subscribe_to_room(
158 &self,
159 room_id: &RoomId,
160 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
161 let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
162 return Ok(None);
163 };
164
165 let room_latest_events = room_latest_events.read().await;
166 let latest_event = room_latest_events.for_room();
167
168 Ok(Some(latest_event.subscribe().await))
169 }
170
171 pub async fn listen_to_thread(
176 &self,
177 room_id: &RoomId,
178 thread_id: &EventId,
179 ) -> Result<bool, LatestEventsError> {
180 Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
181 }
182
183 pub async fn listen_and_subscribe_to_thread(
189 &self,
190 room_id: &RoomId,
191 thread_id: &EventId,
192 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
193 let Some(room_latest_events) =
194 self.state.registered_rooms.for_thread(room_id, thread_id).await?
195 else {
196 return Ok(None);
197 };
198
199 let room_latest_events = room_latest_events.read().await;
200 let latest_event = room_latest_events
201 .for_thread(thread_id)
202 .expect("The `LatestEvent` for the thread must have been created");
203
204 Ok(Some(latest_event.subscribe().await))
205 }
206
207 pub async fn forget_room(&self, room_id: &RoomId) {
214 self.state.registered_rooms.forget_room(room_id).await;
215 }
216
217 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
225 self.state.registered_rooms.forget_thread(room_id, thread_id).await;
226 }
227}
228
229#[derive(Debug)]
230struct RegisteredRooms {
231 rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
233
234 weak_client: WeakClient,
236
237 event_cache: EventCache,
239
240 latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
245}
246
247impl RegisteredRooms {
248 fn new(
249 weak_client: WeakClient,
250 event_cache: &EventCache,
251 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
252 ) -> Self {
253 Self {
254 rooms: RwLock::new(HashMap::default()),
255 weak_client,
256 event_cache: event_cache.clone(),
257 latest_event_queue_sender: latest_event_queue_sender.clone(),
258 }
259 }
260
261 async fn room_latest_event(
268 &self,
269 room_id: &RoomId,
270 thread_id: Option<&EventId>,
271 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
272 fn create_and_insert_room_latest_events(
273 room_id: &RoomId,
274 rooms: &mut HashMap<OwnedRoomId, RoomLatestEvents>,
275 weak_client: &WeakClient,
276 event_cache: &EventCache,
277 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
278 ) {
279 let (room_latest_events, is_latest_event_value_none) =
280 With::unzip(RoomLatestEvents::new(
281 WeakRoom::new(weak_client.clone(), room_id.to_owned()),
282 event_cache,
283 ));
284
285 rooms.insert(room_id.to_owned(), room_latest_events);
287
288 if is_latest_event_value_none {
294 let _ = latest_event_queue_sender
295 .send(LatestEventQueueUpdate::EventCache { room_id: room_id.to_owned() });
296 }
297 }
298
299 Ok(match thread_id {
300 Some(thread_id) => {
306 let mut rooms = self.rooms.write().await;
307
308 if rooms.contains_key(room_id).not() {
310 create_and_insert_room_latest_events(
311 room_id,
312 rooms.deref_mut(),
313 &self.weak_client,
314 &self.event_cache,
315 &self.latest_event_queue_sender,
316 );
317 }
318
319 if let Some(room_latest_event) = rooms.get(room_id) {
320 let mut room_latest_event = room_latest_event.write().await;
321
322 if room_latest_event.has_thread(thread_id).not() {
325 room_latest_event.create_and_insert_latest_event_for_thread(thread_id);
326 }
327 }
328
329 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
330 }
331
332 None => {
335 match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
336 .ok()
337 {
338 value @ Some(_) => value,
339 None => {
340 let _timer = timer!(
341 tracing::Level::INFO,
342 format!("Creating `RoomLatestEvents` for {room_id:?}"),
343 );
344
345 let mut rooms = self.rooms.write().await;
346
347 if rooms.contains_key(room_id).not() {
348 create_and_insert_room_latest_events(
349 room_id,
350 rooms.deref_mut(),
351 &self.weak_client,
352 &self.event_cache,
353 &self.latest_event_queue_sender,
354 );
355 }
356
357 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
358 }
359 }
360 }
361 })
362 }
363
364 pub async fn for_room(
368 &self,
369 room_id: &RoomId,
370 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
371 self.room_latest_event(room_id, None).await
372 }
373
374 pub async fn for_thread(
378 &self,
379 room_id: &RoomId,
380 thread_id: &EventId,
381 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
382 self.room_latest_event(room_id, Some(thread_id)).await
383 }
384
385 pub async fn forget_room(&self, room_id: &RoomId) {
392 {
393 let mut rooms = self.rooms.write().await;
394
395 rooms.remove(room_id);
397 }
398 }
399
400 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
408 let rooms = self.rooms.read().await;
409
410 if let Some(room_latest_event) = rooms.get(room_id) {
412 let mut room_latest_event = room_latest_event.write().await;
413
414 drop(rooms);
416
417 room_latest_event.forget_thread(thread_id);
418 }
419 }
420}
421
422#[derive(Debug)]
425enum LatestEventQueueUpdate {
426 EventCache {
428 room_id: OwnedRoomId,
430 },
431
432 SendQueue {
434 room_id: OwnedRoomId,
436
437 update: RoomSendQueueUpdate,
439 },
440}
441
442async fn listen_to_event_cache_and_send_queue_updates_task(
449 registered_rooms: Arc<RegisteredRooms>,
450 event_cache: EventCache,
451 send_queue: SendQueue,
452 latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
453) {
454 let mut event_cache_generic_updates_subscriber =
455 event_cache.subscribe_to_room_generic_updates();
456 let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
457
458 loop {
459 if listen_to_event_cache_and_send_queue_updates(
460 ®istered_rooms.rooms,
461 &mut event_cache_generic_updates_subscriber,
462 &mut send_queue_generic_updates_subscriber,
463 &latest_event_queue_sender,
464 )
465 .await
466 .is_break()
467 {
468 warn!("`listen_to_event_cache_and_send_queue_updates_task` has stopped");
469
470 break;
471 }
472 }
473}
474
475async fn listen_to_event_cache_and_send_queue_updates(
480 registered_rooms: &RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
481 event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
482 send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
483 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
484) -> ControlFlow<()> {
485 select! {
486 room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv() => {
487 if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
488 if registered_rooms.read().await.contains_key(&room_id) {
489 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
490 room_id
491 });
492 }
493 } else {
494 warn!("`event_cache_generic_updates` channel has been closed");
495
496 return ControlFlow::Break(());
497 }
498 }
499
500 send_queue_generic_update = send_queue_generic_updates_subscriber.recv() => {
501 if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
502 if registered_rooms.read().await.contains_key(&room_id) {
503 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
504 room_id,
505 update
506 });
507 }
508 } else {
509 warn!("`send_queue_generic_updates` channel has been closed");
510
511 return ControlFlow::Break(());
512 }
513 }
514 }
515
516 ControlFlow::Continue(())
517}
518
519async fn compute_latest_events_task(
525 registered_rooms: Arc<RegisteredRooms>,
526 mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
527) {
528 const BUFFER_SIZE: usize = 16;
529
530 let mut buffer = Vec::with_capacity(BUFFER_SIZE);
531
532 while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
533 compute_latest_events(®istered_rooms, &buffer).await;
534 buffer.clear();
535 }
536
537 warn!("`compute_latest_events_task` has stopped");
538}
539
540async fn compute_latest_events(
541 registered_rooms: &RegisteredRooms,
542 latest_event_queue_updates: &[LatestEventQueueUpdate],
543) {
544 for latest_event_queue_update in latest_event_queue_updates {
545 match latest_event_queue_update {
546 LatestEventQueueUpdate::EventCache { room_id } => {
547 let rooms = registered_rooms.rooms.read().await;
548
549 if let Some(room_latest_events) = rooms.get(room_id) {
550 let mut room_latest_events = room_latest_events.write().await;
551
552 drop(rooms);
555
556 room_latest_events.update_with_event_cache().await;
557 } else {
558 error!(?room_id, "Failed to find the room");
559
560 continue;
561 }
562 }
563
564 LatestEventQueueUpdate::SendQueue { room_id, update } => {
565 let rooms = registered_rooms.rooms.read().await;
566
567 if let Some(room_latest_events) = rooms.get(room_id) {
568 let mut room_latest_events = room_latest_events.write().await;
569
570 drop(rooms);
573
574 room_latest_events.update_with_send_queue(update).await;
575 } else {
576 error!(?room_id, "Failed to find the room");
577
578 continue;
579 }
580 }
581 }
582 }
583}
584
585#[cfg(test)]
586fn local_room_message(body: &str) -> LocalLatestEventValue {
587 use matrix_sdk_base::store::SerializableEventContent;
588 use ruma::{
589 MilliSecondsSinceUnixEpoch,
590 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
591 };
592
593 LocalLatestEventValue {
594 timestamp: MilliSecondsSinceUnixEpoch::now(),
595 content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
596 RoomMessageEventContent::text_plain(body),
597 ))
598 .unwrap(),
599 }
600}
601
602#[cfg(all(test, not(target_family = "wasm")))]
603mod tests {
604 use std::{collections::HashMap, ops::Not};
605
606 use assert_matches::assert_matches;
607 use matrix_sdk_base::{
608 RoomState,
609 deserialized_responses::TimelineEventKind,
610 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
611 };
612 use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
613 use ruma::{
614 OwnedTransactionId, event_id,
615 events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent},
616 owned_room_id, room_id, user_id,
617 };
618 use stream_assert::assert_pending;
619 use tokio::task::yield_now;
620
621 use super::{
622 LatestEventValue, RegisteredRooms, RemoteLatestEventValue, RoomEventCacheGenericUpdate,
623 RoomLatestEvents, RoomSendQueueUpdate, RwLock, SendQueueUpdate, WeakClient, WeakRoom, With,
624 broadcast, listen_to_event_cache_and_send_queue_updates, mpsc,
625 };
626 use crate::{
627 latest_events::{LatestEventQueueUpdate, local_room_message},
628 test_utils::mocks::MatrixMockServer,
629 };
630
631 #[async_test]
632 async fn test_latest_events_are_lazy() {
633 let room_id_0 = room_id!("!r0");
634 let room_id_1 = room_id!("!r1");
635 let room_id_2 = room_id!("!r2");
636 let thread_id_1_0 = event_id!("$ev1.0");
637 let thread_id_2_0 = event_id!("$ev2.0");
638
639 let server = MatrixMockServer::new().await;
640 let client = server.client_builder().build().await;
641
642 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
643 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
644 client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
645
646 client.event_cache().subscribe().unwrap();
647
648 let latest_events = client.latest_events().await;
649
650 assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
652
653 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
655 assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
656
657 {
658 let rooms = latest_events.state.registered_rooms.rooms.read().await;
659 assert_eq!(rooms.len(), 2);
661 assert!(rooms.contains_key(room_id_0));
663 assert!(rooms.contains_key(room_id_1));
664
665 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
667 assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
669 }
670
671 assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
673 assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
674
675 {
676 let rooms = latest_events.state.registered_rooms.rooms.read().await;
677 assert_eq!(rooms.len(), 3);
679 assert!(rooms.contains_key(room_id_0));
681 assert!(rooms.contains_key(room_id_1));
682 assert!(rooms.contains_key(room_id_2));
683
684 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
686 let room_1 = rooms.get(room_id_1).unwrap().read().await;
688 assert_eq!(room_1.per_thread().len(), 1);
689 assert!(room_1.per_thread().contains_key(thread_id_1_0));
691 let room_2 = rooms.get(room_id_2).unwrap().read().await;
693 assert_eq!(room_2.per_thread().len(), 1);
694 assert!(room_2.per_thread().contains_key(thread_id_2_0));
696 }
697 }
698
699 #[async_test]
700 async fn test_forget_room() {
701 let room_id_0 = room_id!("!r0");
702 let room_id_1 = room_id!("!r1");
703
704 let server = MatrixMockServer::new().await;
705 let client = server.client_builder().build().await;
706
707 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
708 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
709
710 client.event_cache().subscribe().unwrap();
711
712 let latest_events = client.latest_events().await;
713
714 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
716
717 {
718 let rooms = latest_events.state.registered_rooms.rooms.read().await;
719 assert_eq!(rooms.len(), 1);
721 assert!(rooms.contains_key(room_id_0));
723
724 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
726 }
727
728 latest_events.forget_room(room_id_0).await;
730
731 {
732 let rooms = latest_events.state.registered_rooms.rooms.read().await;
733 assert!(rooms.is_empty());
735 }
736 }
737
738 #[async_test]
739 async fn test_forget_thread() {
740 let room_id_0 = room_id!("!r0");
741 let room_id_1 = room_id!("!r1");
742 let thread_id_0_0 = event_id!("$ev0.0");
743
744 let server = MatrixMockServer::new().await;
745 let client = server.client_builder().build().await;
746
747 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
748 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
749
750 client.event_cache().subscribe().unwrap();
751
752 let latest_events = client.latest_events().await;
753
754 assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
756
757 {
758 let rooms = latest_events.state.registered_rooms.rooms.read().await;
759 assert_eq!(rooms.len(), 1);
761 assert!(rooms.contains_key(room_id_0));
763
764 let room_0 = rooms.get(room_id_0).unwrap().read().await;
766 assert_eq!(room_0.per_thread().len(), 1);
767 assert!(room_0.per_thread().contains_key(thread_id_0_0));
769 }
770
771 latest_events.forget_thread(room_id_0, thread_id_0_0).await;
773
774 {
775 let rooms = latest_events.state.registered_rooms.rooms.read().await;
776 assert_eq!(rooms.len(), 1);
778 assert!(rooms.contains_key(room_id_0));
780
781 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
783 }
784 }
785
786 #[async_test]
787 async fn test_inputs_task_can_listen_to_room_event_cache() {
788 let room_id = owned_room_id!("!r0");
789
790 let server = MatrixMockServer::new().await;
791 let client = server.client_builder().build().await;
792 let weak_client = WeakClient::from_client(&client);
793 let weak_room = WeakRoom::new(weak_client, room_id.clone());
794
795 let event_cache = client.event_cache();
796
797 let registered_rooms = RwLock::new(HashMap::new());
798 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
799 broadcast::channel(1);
800 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
801 broadcast::channel(1);
802 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
803
804 {
806 room_event_cache_generic_update_sender
807 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
808 .unwrap();
809
810 assert!(
812 listen_to_event_cache_and_send_queue_updates(
813 ®istered_rooms,
814 &mut room_event_cache_generic_update_receiver,
815 &mut send_queue_generic_update_receiver,
816 &latest_event_queue_sender,
817 )
818 .await
819 .is_continue()
820 );
821
822 assert!(latest_event_queue_receiver.is_empty());
824 }
825
826 {
828 registered_rooms.write().await.insert(
829 room_id.clone(),
830 With::inner(RoomLatestEvents::new(weak_room, event_cache)),
831 );
832 room_event_cache_generic_update_sender
833 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
834 .unwrap();
835
836 assert!(
837 listen_to_event_cache_and_send_queue_updates(
838 ®istered_rooms,
839 &mut room_event_cache_generic_update_receiver,
840 &mut send_queue_generic_update_receiver,
841 &latest_event_queue_sender,
842 )
843 .await
844 .is_continue()
845 );
846
847 assert!(latest_event_queue_receiver.is_empty().not());
849 }
850 }
851
852 #[async_test]
853 async fn test_inputs_task_can_listen_to_send_queue() {
854 let room_id = owned_room_id!("!r0");
855
856 let server = MatrixMockServer::new().await;
857 let client = server.client_builder().build().await;
858 let weak_client = WeakClient::from_client(&client);
859 let weak_room = WeakRoom::new(weak_client, room_id.clone());
860
861 let event_cache = client.event_cache();
862
863 let registered_rooms = RwLock::new(HashMap::new());
864
865 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
866 broadcast::channel(1);
867 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
868 broadcast::channel(1);
869 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
870
871 {
873 send_queue_generic_update_sender
874 .send(SendQueueUpdate {
875 room_id: room_id.clone(),
876 update: RoomSendQueueUpdate::SentEvent {
877 transaction_id: OwnedTransactionId::from("txnid0"),
878 event_id: event_id!("$ev0").to_owned(),
879 },
880 })
881 .unwrap();
882
883 assert!(
885 listen_to_event_cache_and_send_queue_updates(
886 ®istered_rooms,
887 &mut room_event_cache_generic_update_receiver,
888 &mut send_queue_generic_update_receiver,
889 &latest_event_queue_sender,
890 )
891 .await
892 .is_continue()
893 );
894
895 assert!(latest_event_queue_receiver.is_empty());
897 }
898
899 {
901 registered_rooms.write().await.insert(
902 room_id.clone(),
903 With::inner(RoomLatestEvents::new(weak_room, event_cache)),
904 );
905 send_queue_generic_update_sender
906 .send(SendQueueUpdate {
907 room_id: room_id.clone(),
908 update: RoomSendQueueUpdate::SentEvent {
909 transaction_id: OwnedTransactionId::from("txnid1"),
910 event_id: event_id!("$ev1").to_owned(),
911 },
912 })
913 .unwrap();
914
915 assert!(
916 listen_to_event_cache_and_send_queue_updates(
917 ®istered_rooms,
918 &mut room_event_cache_generic_update_receiver,
919 &mut send_queue_generic_update_receiver,
920 &latest_event_queue_sender,
921 )
922 .await
923 .is_continue()
924 );
925
926 assert!(latest_event_queue_receiver.is_empty().not());
928 }
929 }
930
931 #[async_test]
932 async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
933 let registered_rooms = RwLock::new(HashMap::new());
934 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
935 broadcast::channel(1);
936 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
937 broadcast::channel(1);
938 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
939
940 drop(room_event_cache_generic_update_sender);
942
943 assert!(
945 listen_to_event_cache_and_send_queue_updates(
946 ®istered_rooms,
947 &mut room_event_cache_generic_update_receiver,
948 &mut send_queue_generic_update_receiver,
949 &latest_event_queue_sender,
950 )
951 .await
952 .is_break()
954 );
955
956 assert!(latest_event_queue_receiver.is_empty());
957 }
958
959 #[async_test]
960 async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
961 let registered_rooms = RwLock::new(HashMap::new());
962 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
963 broadcast::channel(1);
964 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
965 broadcast::channel(1);
966 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
967
968 drop(send_queue_generic_update_sender);
970
971 assert!(
973 listen_to_event_cache_and_send_queue_updates(
974 ®istered_rooms,
975 &mut room_event_cache_generic_update_receiver,
976 &mut send_queue_generic_update_receiver,
977 &latest_event_queue_sender,
978 )
979 .await
980 .is_break()
982 );
983
984 assert!(latest_event_queue_receiver.is_empty());
985 }
986
987 #[async_test]
988 async fn test_latest_event_value_is_updated_via_event_cache() {
989 let room_id = owned_room_id!("!r0");
990 let user_id = user_id!("@mnt_io:matrix.org");
991 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
992 let event_id_2 = event_id!("$ev2");
993
994 let server = MatrixMockServer::new().await;
995 let client = server.client_builder().build().await;
996
997 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
999
1000 let event_cache = client.event_cache();
1001 event_cache.subscribe().unwrap();
1002
1003 let latest_events = client.latest_events().await;
1004
1005 let mut latest_event_stream =
1007 latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1008
1009 assert_pending!(latest_event_stream);
1011
1012 server
1014 .sync_room(
1015 &client,
1016 JoinedRoomBuilder::new(&room_id)
1017 .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_2)),
1018 )
1019 .await;
1020
1021 assert_matches!(
1025 latest_event_stream.next().await,
1026 Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1027 assert_matches!(
1028 event.deserialize().unwrap(),
1029 AnySyncTimelineEvent::MessageLike(
1030 AnySyncMessageLikeEvent::RoomMessage(
1031 SyncMessageLikeEvent::Original(message_content)
1032 )
1033 ) => {
1034 assert_eq!(message_content.content.body(), "raclette !");
1035 }
1036 );
1037 }
1038 );
1039
1040 assert_pending!(latest_event_stream);
1041 }
1042
1043 #[async_test]
1044 async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily() {
1045 let room_id = owned_room_id!("!r0");
1046 let user_id = user_id!("@mnt_io:matrix.org");
1047 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1048 let event_id_0 = event_id!("$ev0");
1049
1050 let server = MatrixMockServer::new().await;
1051 let client = server.client_builder().build().await;
1052
1053 {
1055 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1057
1058 client
1060 .event_cache_store()
1061 .lock()
1062 .await
1063 .expect("Could not acquire the event cache lock")
1064 .as_clean()
1065 .expect("Could not acquire a clean event cache lock")
1066 .handle_linked_chunk_updates(
1067 LinkedChunkId::Room(&room_id),
1068 vec![
1069 Update::NewItemsChunk {
1070 previous: None,
1071 new: ChunkIdentifier::new(0),
1072 next: None,
1073 },
1074 Update::PushItems {
1075 at: Position::new(ChunkIdentifier::new(0), 0),
1076 items: vec![
1077 event_factory.text_msg("hello").event_id(event_id_0).into(),
1078 ],
1079 },
1080 ],
1081 )
1082 .await
1083 .unwrap();
1084 }
1085
1086 let event_cache = client.event_cache();
1087 event_cache.subscribe().unwrap();
1088
1089 let latest_events = client.latest_events().await;
1090
1091 let mut latest_event_stream =
1092 latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1093
1094 yield_now().await;
1100 assert_matches!(latest_event_stream.next_now().await, LatestEventValue::Remote(_));
1101
1102 assert_pending!(latest_event_stream);
1103 }
1104
1105 #[async_test]
1113 async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily_inner() {
1114 let room_id_0 = owned_room_id!("!r0");
1115 let room_id_1 = owned_room_id!("!r1");
1116
1117 let server = MatrixMockServer::new().await;
1118 let client = server.client_builder().build().await;
1119
1120 let room_0 = client.base_client().get_or_create_room(&room_id_0, RoomState::Joined);
1122 let room_1 = client.base_client().get_or_create_room(&room_id_1, RoomState::Joined);
1123
1124 let mut room_info_1 = room_0.clone_info();
1127 room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
1128 room_1.set_room_info(room_info_1, Default::default());
1129
1130 let weak_client = WeakClient::from_client(&client);
1131
1132 let event_cache = client.event_cache();
1133 event_cache.subscribe().unwrap();
1134
1135 let (latest_event_queue_sender, mut latest_event_queue_receiver) =
1136 mpsc::unbounded_channel();
1137
1138 let registered_rooms =
1139 RegisteredRooms::new(weak_client, event_cache, &latest_event_queue_sender);
1140
1141 {
1144 let room_latest_events = registered_rooms.for_room(&room_id_0).await.unwrap().unwrap();
1145 assert_matches!(
1146 room_latest_events.read().await.for_room().get().await,
1147 LatestEventValue::None
1148 );
1149 assert_matches!(
1150 latest_event_queue_receiver.recv().await,
1151 Some(LatestEventQueueUpdate::EventCache { room_id }) => {
1152 assert_eq!(room_id, room_id_0);
1153 }
1154 );
1155 assert!(latest_event_queue_receiver.is_empty());
1156 }
1157
1158 {
1161 let room_latest_events = registered_rooms.for_room(&room_id_1).await.unwrap().unwrap();
1162 assert_matches!(
1163 room_latest_events.read().await.for_room().get().await,
1164 LatestEventValue::LocalIsSending(_)
1165 );
1166 assert!(latest_event_queue_receiver.is_empty());
1167 }
1168 }
1169}