1mod error;
50mod latest_event;
51mod room_latest_events;
52
53use std::{
54 collections::HashMap,
55 ops::{ControlFlow, DerefMut, Not},
56 sync::Arc,
57};
58
59pub use error::LatestEventsError;
60use eyeball::{AsyncLock, Subscriber};
61use latest_event::{LatestEvent, With};
62pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
63use matrix_sdk_base::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, timer};
64use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
65use room_latest_events::{RoomLatestEvents, RoomLatestEventsWriteGuard};
66use ruma::{EventId, OwnedRoomId, RoomId};
67use tokio::{
68 select,
69 sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
70};
71use tracing::{info, warn};
72
73use crate::{
74 client::WeakClient,
75 event_cache::{EventCache, RoomEventCacheGenericUpdate},
76 room::WeakRoom,
77 send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
78};
79
80#[derive(Clone, Debug)]
82pub struct LatestEvents {
83 state: Arc<LatestEventsState>,
84}
85
86#[derive(Debug)]
88struct LatestEventsState {
89 registered_rooms: Arc<RegisteredRooms>,
91
92 _listen_task_handle: AbortOnDrop<()>,
94
95 _computation_task_handle: AbortOnDrop<()>,
97}
98
99impl LatestEvents {
100 pub(crate) fn new(
102 weak_client: WeakClient,
103 event_cache: EventCache,
104 send_queue: SendQueue,
105 room_info_updates: broadcast::Receiver<RoomInfoNotableUpdate>,
106 ) -> Self {
107 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
108
109 let registered_rooms =
110 Arc::new(RegisteredRooms::new(weak_client, &event_cache, &latest_event_queue_sender));
111
112 let listen_task_handle = spawn(listen_to_updates_task(
115 registered_rooms.clone(),
116 event_cache,
117 send_queue,
118 room_info_updates,
119 latest_event_queue_sender,
120 ))
121 .abort_on_drop();
122
123 let computation_task_handle = spawn(compute_latest_events_task(
125 registered_rooms.clone(),
126 latest_event_queue_receiver,
127 ))
128 .abort_on_drop();
129
130 Self {
131 state: Arc::new(LatestEventsState {
132 registered_rooms,
133 _listen_task_handle: listen_task_handle,
134 _computation_task_handle: computation_task_handle,
135 }),
136 }
137 }
138
139 pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
143 Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
144 }
145
146 #[cfg(test)]
150 pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
151 self.state.registered_rooms.rooms.read().await.contains_key(room_id)
152 }
153
154 pub async fn listen_and_subscribe_to_room(
160 &self,
161 room_id: &RoomId,
162 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
163 let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
164 return Ok(None);
165 };
166
167 let room_latest_events = room_latest_events.read().await;
168 let latest_event = room_latest_events.for_room();
169
170 Ok(Some(latest_event.subscribe().await))
171 }
172
173 pub async fn listen_to_thread(
178 &self,
179 room_id: &RoomId,
180 thread_id: &EventId,
181 ) -> Result<bool, LatestEventsError> {
182 Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
183 }
184
185 pub async fn listen_and_subscribe_to_thread(
191 &self,
192 room_id: &RoomId,
193 thread_id: &EventId,
194 ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
195 let Some(room_latest_events) =
196 self.state.registered_rooms.for_thread(room_id, thread_id).await?
197 else {
198 return Ok(None);
199 };
200
201 let room_latest_events = room_latest_events.read().await;
202 let latest_event = room_latest_events
203 .for_thread(thread_id)
204 .expect("The `LatestEvent` for the thread must have been created");
205
206 Ok(Some(latest_event.subscribe().await))
207 }
208
209 pub async fn forget_room(&self, room_id: &RoomId) {
216 self.state.registered_rooms.forget_room(room_id).await;
217 }
218
219 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
227 self.state.registered_rooms.forget_thread(room_id, thread_id).await;
228 }
229}
230
231#[derive(Debug)]
232struct RegisteredRooms {
233 rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
235
236 weak_client: WeakClient,
238
239 event_cache: EventCache,
241
242 latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
247}
248
249impl RegisteredRooms {
250 fn new(
251 weak_client: WeakClient,
252 event_cache: &EventCache,
253 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
254 ) -> Self {
255 Self {
256 rooms: RwLock::new(HashMap::default()),
257 weak_client,
258 event_cache: event_cache.clone(),
259 latest_event_queue_sender: latest_event_queue_sender.clone(),
260 }
261 }
262
263 async fn room_latest_event(
270 &self,
271 room_id: &RoomId,
272 thread_id: Option<&EventId>,
273 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
274 fn create_and_insert_room_latest_events(
275 room_id: &RoomId,
276 rooms: &mut HashMap<OwnedRoomId, RoomLatestEvents>,
277 weak_client: &WeakClient,
278 event_cache: &EventCache,
279 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
280 ) {
281 let (room_latest_events, is_latest_event_value_none) =
282 With::unzip(RoomLatestEvents::new(
283 WeakRoom::new(weak_client.clone(), room_id.to_owned()),
284 event_cache,
285 ));
286
287 rooms.insert(room_id.to_owned(), room_latest_events);
289
290 if is_latest_event_value_none {
296 let _ = latest_event_queue_sender
297 .send(LatestEventQueueUpdate::EventCache { room_id: room_id.to_owned() });
298 }
299 }
300
301 Ok(match thread_id {
302 Some(thread_id) => {
308 let mut rooms = self.rooms.write().await;
309
310 if rooms.contains_key(room_id).not() {
312 create_and_insert_room_latest_events(
313 room_id,
314 rooms.deref_mut(),
315 &self.weak_client,
316 &self.event_cache,
317 &self.latest_event_queue_sender,
318 );
319 }
320
321 if let Some(room_latest_event) = rooms.get(room_id) {
322 let mut room_latest_event = room_latest_event.write().await;
323
324 if room_latest_event.has_thread(thread_id).not() {
327 room_latest_event.create_and_insert_latest_event_for_thread(thread_id);
328 }
329 }
330
331 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
332 }
333
334 None => {
337 match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
338 .ok()
339 {
340 value @ Some(_) => value,
341 None => {
342 let _timer = timer!(
343 tracing::Level::INFO,
344 format!("Creating `RoomLatestEvents` for {room_id:?}"),
345 );
346
347 let mut rooms = self.rooms.write().await;
348
349 if rooms.contains_key(room_id).not() {
350 create_and_insert_room_latest_events(
351 room_id,
352 rooms.deref_mut(),
353 &self.weak_client,
354 &self.event_cache,
355 &self.latest_event_queue_sender,
356 );
357 }
358
359 RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
360 }
361 }
362 }
363 })
364 }
365
366 pub async fn for_room(
370 &self,
371 room_id: &RoomId,
372 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
373 self.room_latest_event(room_id, None).await
374 }
375
376 pub async fn for_thread(
380 &self,
381 room_id: &RoomId,
382 thread_id: &EventId,
383 ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
384 self.room_latest_event(room_id, Some(thread_id)).await
385 }
386
387 pub async fn forget_room(&self, room_id: &RoomId) {
394 {
395 let mut rooms = self.rooms.write().await;
396
397 rooms.remove(room_id);
399 }
400 }
401
402 pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
410 let rooms = self.rooms.read().await;
411
412 if let Some(room_latest_event) = rooms.get(room_id) {
414 let mut room_latest_event = room_latest_event.write().await;
415
416 drop(rooms);
418
419 room_latest_event.forget_thread(thread_id);
420 }
421 }
422}
423
424#[derive(Debug)]
427enum LatestEventQueueUpdate {
428 EventCache {
430 room_id: OwnedRoomId,
432 },
433
434 SendQueue {
436 room_id: OwnedRoomId,
438
439 update: RoomSendQueueUpdate,
441 },
442
443 RoomInfo {
447 room_id: OwnedRoomId,
449
450 reasons: RoomInfoNotableUpdateReasons,
452 },
453}
454
455async fn listen_to_updates_task(
464 registered_rooms: Arc<RegisteredRooms>,
465 event_cache: EventCache,
466 send_queue: SendQueue,
467 room_info_updates: broadcast::Receiver<RoomInfoNotableUpdate>,
468 latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
469) {
470 let mut event_cache_generic_updates_subscriber =
471 event_cache.subscribe_to_room_generic_updates();
472 let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
473 let mut room_info_updates_subscriber = room_info_updates.resubscribe();
474
475 loop {
476 if listen_to_updates(
477 ®istered_rooms.rooms,
478 &mut event_cache_generic_updates_subscriber,
479 &mut send_queue_generic_updates_subscriber,
480 &mut room_info_updates_subscriber,
481 &latest_event_queue_sender,
482 )
483 .await
484 .is_break()
485 {
486 warn!("`listen_to_updates_task` has stopped");
487
488 break;
489 }
490 }
491}
492
493async fn listen_to_updates(
498 registered_rooms: &RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
499 event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
500 send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
501 room_info_updates_subscriber: &mut broadcast::Receiver<RoomInfoNotableUpdate>,
502 latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
503) -> ControlFlow<()> {
504 select! {
505 room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv() => {
506 if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
507 if registered_rooms.read().await.contains_key(&room_id) {
508 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
509 room_id
510 });
511 }
512 } else {
513 warn!("`event_cache_generic_updates` channel has been closed");
514
515 return ControlFlow::Break(());
516 }
517 }
518
519 send_queue_generic_update = send_queue_generic_updates_subscriber.recv() => {
520 if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
521 if registered_rooms.read().await.contains_key(&room_id) {
522 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
523 room_id,
524 update
525 });
526 }
527 } else {
528 warn!("`send_queue_generic_updates` channel has been closed");
529
530 return ControlFlow::Break(());
531 }
532 }
533
534 room_info_update = room_info_updates_subscriber.recv() => {
535 if let Ok(RoomInfoNotableUpdate { room_id, reasons }) = room_info_update {
536 if
538 reasons == RoomInfoNotableUpdateReasons::LATEST_EVENT ||
541
542 !reasons.contains(RoomInfoNotableUpdateReasons::MEMBERSHIP)
546 {
547 return ControlFlow::Continue(())
548 }
549
550 if registered_rooms.read().await.contains_key(&room_id) {
551 let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::RoomInfo {
552 room_id,
553 reasons,
554 });
555 }
556 } else {
557 warn!("`room_info_updates` channel has been closed");
558
559 return ControlFlow::Break(());
560 }
561 }
562 }
563
564 ControlFlow::Continue(())
565}
566
567async fn compute_latest_events_task(
572 registered_rooms: Arc<RegisteredRooms>,
573 mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
574) {
575 const BUFFER_SIZE: usize = 16;
576
577 let mut buffer = Vec::with_capacity(BUFFER_SIZE);
578
579 while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
580 compute_latest_events(®istered_rooms, &buffer).await;
581 buffer.clear();
582 }
583
584 warn!("`compute_latest_events_task` has stopped");
585}
586
587async fn compute_latest_events(
588 registered_rooms: &RegisteredRooms,
589 latest_event_queue_updates: &[LatestEventQueueUpdate],
590) {
591 async fn room_latest_events_write_guard(
592 registered_rooms: &RegisteredRooms,
593 room_id: &OwnedRoomId,
594 ) -> ControlFlow<RoomLatestEventsWriteGuard, ()> {
595 let rooms = registered_rooms.rooms.read().await;
596
597 if let Some(room_latest_events) = rooms.get(room_id) {
598 let room_latest_events = room_latest_events.write().await;
599
600 drop(rooms);
603
604 ControlFlow::Break(room_latest_events)
605 } else {
606 info!(?room_id, "Failed to find the room");
607
608 ControlFlow::Continue(())
609 }
610 }
611
612 for latest_event_queue_update in latest_event_queue_updates {
613 match latest_event_queue_update {
614 LatestEventQueueUpdate::EventCache { room_id } => {
615 let ControlFlow::Break(mut room_latest_events) =
616 room_latest_events_write_guard(registered_rooms, room_id).await
617 else {
618 continue;
619 };
620
621 room_latest_events.update_with_event_cache().await;
622 }
623
624 LatestEventQueueUpdate::SendQueue { room_id, update } => {
625 let ControlFlow::Break(mut room_latest_events) =
626 room_latest_events_write_guard(registered_rooms, room_id).await
627 else {
628 continue;
629 };
630
631 room_latest_events.update_with_send_queue(update).await;
632 }
633
634 LatestEventQueueUpdate::RoomInfo { room_id, reasons } => {
635 let ControlFlow::Break(mut room_latest_events) =
636 room_latest_events_write_guard(registered_rooms, room_id).await
637 else {
638 continue;
639 };
640
641 room_latest_events.update_with_room_info(*reasons).await;
642 }
643 }
644 }
645}
646
647#[cfg(test)]
648fn local_room_message(body: &str) -> LocalLatestEventValue {
649 use matrix_sdk_base::store::SerializableEventContent;
650 use ruma::{
651 MilliSecondsSinceUnixEpoch,
652 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
653 };
654
655 LocalLatestEventValue {
656 timestamp: MilliSecondsSinceUnixEpoch::now(),
657 content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
658 RoomMessageEventContent::text_plain(body),
659 ))
660 .unwrap(),
661 }
662}
663
664#[cfg(all(test, not(target_family = "wasm")))]
665mod tests {
666 use std::{collections::HashMap, ops::Not, time::Duration};
667
668 use assert_matches::assert_matches;
669 use matrix_sdk_base::{
670 RoomState,
671 deserialized_responses::TimelineEventKind,
672 linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
673 };
674 use matrix_sdk_test::{
675 InvitedRoomBuilder, JoinedRoomBuilder, async_test, event_factory::EventFactory,
676 };
677 use ruma::{
678 MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
679 events::{
680 AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent, SyncMessageLikeEvent,
681 room::member::{MembershipState, SyncRoomMemberEvent},
682 },
683 owned_event_id, owned_room_id, room_id, user_id,
684 };
685 use stream_assert::assert_pending;
686 use tokio::{task::yield_now, time::timeout};
687
688 use super::{
689 LatestEventValue, RegisteredRooms, RemoteLatestEventValue, RoomEventCacheGenericUpdate,
690 RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomLatestEvents, RoomSendQueueUpdate,
691 RwLock, SendQueueUpdate, WeakClient, WeakRoom, With, broadcast, listen_to_updates, mpsc,
692 };
693 use crate::{
694 latest_events::{LatestEventQueueUpdate, local_room_message},
695 test_utils::mocks::MatrixMockServer,
696 };
697
698 #[async_test]
699 async fn test_latest_events_are_lazy() {
700 let room_id_0 = room_id!("!r0");
701 let room_id_1 = room_id!("!r1");
702 let room_id_2 = room_id!("!r2");
703 let thread_id_1_0 = event_id!("$ev1.0");
704 let thread_id_2_0 = event_id!("$ev2.0");
705
706 let server = MatrixMockServer::new().await;
707 let client = server.client_builder().build().await;
708
709 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
710 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
711 client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
712
713 client.event_cache().subscribe().unwrap();
714
715 let latest_events = client.latest_events().await;
716
717 assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
719
720 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
722 assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
723
724 {
725 let rooms = latest_events.state.registered_rooms.rooms.read().await;
726 assert_eq!(rooms.len(), 2);
728 assert!(rooms.contains_key(room_id_0));
730 assert!(rooms.contains_key(room_id_1));
731
732 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
734 assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
736 }
737
738 assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
740 assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
741
742 {
743 let rooms = latest_events.state.registered_rooms.rooms.read().await;
744 assert_eq!(rooms.len(), 3);
746 assert!(rooms.contains_key(room_id_0));
748 assert!(rooms.contains_key(room_id_1));
749 assert!(rooms.contains_key(room_id_2));
750
751 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
753 let room_1 = rooms.get(room_id_1).unwrap().read().await;
755 assert_eq!(room_1.per_thread().len(), 1);
756 assert!(room_1.per_thread().contains_key(thread_id_1_0));
758 let room_2 = rooms.get(room_id_2).unwrap().read().await;
760 assert_eq!(room_2.per_thread().len(), 1);
761 assert!(room_2.per_thread().contains_key(thread_id_2_0));
763 }
764 }
765
766 #[async_test]
767 async fn test_forget_room() {
768 let room_id_0 = room_id!("!r0");
769 let room_id_1 = room_id!("!r1");
770
771 let server = MatrixMockServer::new().await;
772 let client = server.client_builder().build().await;
773
774 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
775 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
776
777 client.event_cache().subscribe().unwrap();
778
779 let latest_events = client.latest_events().await;
780
781 assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
783
784 {
785 let rooms = latest_events.state.registered_rooms.rooms.read().await;
786 assert_eq!(rooms.len(), 1);
788 assert!(rooms.contains_key(room_id_0));
790
791 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
793 }
794
795 latest_events.forget_room(room_id_0).await;
797
798 {
799 let rooms = latest_events.state.registered_rooms.rooms.read().await;
800 assert!(rooms.is_empty());
802 }
803 }
804
805 #[async_test]
806 async fn test_forget_thread() {
807 let room_id_0 = room_id!("!r0");
808 let room_id_1 = room_id!("!r1");
809 let thread_id_0_0 = event_id!("$ev0.0");
810
811 let server = MatrixMockServer::new().await;
812 let client = server.client_builder().build().await;
813
814 client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
815 client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
816
817 client.event_cache().subscribe().unwrap();
818
819 let latest_events = client.latest_events().await;
820
821 assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
823
824 {
825 let rooms = latest_events.state.registered_rooms.rooms.read().await;
826 assert_eq!(rooms.len(), 1);
828 assert!(rooms.contains_key(room_id_0));
830
831 let room_0 = rooms.get(room_id_0).unwrap().read().await;
833 assert_eq!(room_0.per_thread().len(), 1);
834 assert!(room_0.per_thread().contains_key(thread_id_0_0));
836 }
837
838 latest_events.forget_thread(room_id_0, thread_id_0_0).await;
840
841 {
842 let rooms = latest_events.state.registered_rooms.rooms.read().await;
843 assert_eq!(rooms.len(), 1);
845 assert!(rooms.contains_key(room_id_0));
847
848 assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
850 }
851 }
852
853 #[async_test]
854 async fn test_inputs_task_can_listen_to_room_event_cache() {
855 let room_id = owned_room_id!("!r0");
856
857 let server = MatrixMockServer::new().await;
858 let client = server.client_builder().build().await;
859 let weak_client = WeakClient::from_client(&client);
860 let weak_room = WeakRoom::new(weak_client, room_id.clone());
861
862 let event_cache = client.event_cache();
863
864 let registered_rooms = RwLock::new(HashMap::new());
865 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
866 broadcast::channel(1);
867 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
868 broadcast::channel(1);
869 let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
870 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
871
872 {
874 room_event_cache_generic_update_sender
875 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
876 .unwrap();
877
878 assert!(
880 listen_to_updates(
881 ®istered_rooms,
882 &mut room_event_cache_generic_update_receiver,
883 &mut send_queue_generic_update_receiver,
884 &mut room_info_update_receiver,
885 &latest_event_queue_sender,
886 )
887 .await
888 .is_continue()
889 );
890
891 assert!(latest_event_queue_receiver.is_empty());
893 }
894
895 {
897 registered_rooms.write().await.insert(
898 room_id.clone(),
899 With::inner(RoomLatestEvents::new(weak_room, event_cache)),
900 );
901 room_event_cache_generic_update_sender
902 .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
903 .unwrap();
904
905 assert!(
906 listen_to_updates(
907 ®istered_rooms,
908 &mut room_event_cache_generic_update_receiver,
909 &mut send_queue_generic_update_receiver,
910 &mut room_info_update_receiver,
911 &latest_event_queue_sender,
912 )
913 .await
914 .is_continue()
915 );
916
917 assert!(latest_event_queue_receiver.is_empty().not());
919 }
920 }
921
922 #[async_test]
923 async fn test_inputs_task_can_listen_to_send_queue() {
924 let room_id = owned_room_id!("!r0");
925
926 let server = MatrixMockServer::new().await;
927 let client = server.client_builder().build().await;
928 let weak_client = WeakClient::from_client(&client);
929 let weak_room = WeakRoom::new(weak_client, room_id.clone());
930
931 let event_cache = client.event_cache();
932
933 let registered_rooms = RwLock::new(HashMap::new());
934
935 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
936 broadcast::channel(1);
937 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
938 broadcast::channel(1);
939 let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
940 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
941
942 {
944 send_queue_generic_update_sender
945 .send(SendQueueUpdate {
946 room_id: room_id.clone(),
947 update: RoomSendQueueUpdate::SentEvent {
948 transaction_id: OwnedTransactionId::from("txnid0"),
949 event_id: owned_event_id!("$ev0"),
950 },
951 })
952 .unwrap();
953
954 assert!(
956 listen_to_updates(
957 ®istered_rooms,
958 &mut room_event_cache_generic_update_receiver,
959 &mut send_queue_generic_update_receiver,
960 &mut room_info_update_receiver,
961 &latest_event_queue_sender,
962 )
963 .await
964 .is_continue()
965 );
966
967 assert!(latest_event_queue_receiver.is_empty());
969 }
970
971 {
973 registered_rooms.write().await.insert(
974 room_id.clone(),
975 With::inner(RoomLatestEvents::new(weak_room, event_cache)),
976 );
977 send_queue_generic_update_sender
978 .send(SendQueueUpdate {
979 room_id: room_id.clone(),
980 update: RoomSendQueueUpdate::SentEvent {
981 transaction_id: OwnedTransactionId::from("txnid1"),
982 event_id: owned_event_id!("$ev1"),
983 },
984 })
985 .unwrap();
986
987 assert!(
988 listen_to_updates(
989 ®istered_rooms,
990 &mut room_event_cache_generic_update_receiver,
991 &mut send_queue_generic_update_receiver,
992 &mut room_info_update_receiver,
993 &latest_event_queue_sender,
994 )
995 .await
996 .is_continue()
997 );
998
999 assert!(latest_event_queue_receiver.is_empty().not());
1001 }
1002 }
1003
1004 #[async_test]
1005 async fn test_inputs_task_can_listen_to_room_info() {
1006 let room_id = owned_room_id!("!r0");
1007
1008 let server = MatrixMockServer::new().await;
1009 let client = server.client_builder().build().await;
1010 let weak_client = WeakClient::from_client(&client);
1011 let weak_room = WeakRoom::new(weak_client, room_id.clone());
1012
1013 let event_cache = client.event_cache();
1014
1015 let registered_rooms = RwLock::new(HashMap::new());
1016
1017 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1018 broadcast::channel(1);
1019 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1020 broadcast::channel(1);
1021 let (room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1022 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1023
1024 {
1026 room_info_update_sender
1027 .send(RoomInfoNotableUpdate {
1028 room_id: room_id.clone(),
1029 reasons: RoomInfoNotableUpdateReasons::MEMBERSHIP,
1030 })
1031 .unwrap();
1032
1033 assert!(
1035 listen_to_updates(
1036 ®istered_rooms,
1037 &mut room_event_cache_generic_update_receiver,
1038 &mut send_queue_generic_update_receiver,
1039 &mut room_info_update_receiver,
1040 &latest_event_queue_sender,
1041 )
1042 .await
1043 .is_continue()
1044 );
1045
1046 assert!(latest_event_queue_receiver.is_empty());
1048 }
1049
1050 {
1052 registered_rooms.write().await.insert(
1053 room_id.clone(),
1054 With::inner(RoomLatestEvents::new(weak_room, event_cache)),
1055 );
1056 room_info_update_sender
1057 .send(RoomInfoNotableUpdate {
1058 room_id: room_id.clone(),
1059 reasons: RoomInfoNotableUpdateReasons::MEMBERSHIP,
1060 })
1061 .unwrap();
1062
1063 assert!(
1064 listen_to_updates(
1065 ®istered_rooms,
1066 &mut room_event_cache_generic_update_receiver,
1067 &mut send_queue_generic_update_receiver,
1068 &mut room_info_update_receiver,
1069 &latest_event_queue_sender,
1070 )
1071 .await
1072 .is_continue()
1073 );
1074
1075 assert!(latest_event_queue_receiver.is_empty().not());
1077 }
1078 }
1079
1080 #[async_test]
1081 async fn test_inputs_task_can_listen_to_specific_room_info_update_reasons() {
1082 let room_id = owned_room_id!("!r0");
1083
1084 let server = MatrixMockServer::new().await;
1085 let client = server.client_builder().build().await;
1086 let weak_client = WeakClient::from_client(&client);
1087 let weak_room = WeakRoom::new(weak_client, room_id.clone());
1088
1089 let event_cache = client.event_cache();
1090
1091 let registered_rooms = RwLock::new(HashMap::new());
1092
1093 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1094 broadcast::channel(1);
1095 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1096 broadcast::channel(1);
1097 let (room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1098 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1099
1100 registered_rooms
1101 .write()
1102 .await
1103 .insert(room_id.clone(), With::inner(RoomLatestEvents::new(weak_room, event_cache)));
1104
1105 for reason in {
1110 let mut all = RoomInfoNotableUpdateReasons::all();
1111 all.remove(RoomInfoNotableUpdateReasons::MEMBERSHIP);
1112
1113 all.iter()
1114 } {
1115 room_info_update_sender
1116 .send(RoomInfoNotableUpdate { room_id: room_id.clone(), reasons: reason })
1117 .unwrap();
1118
1119 assert!(
1120 listen_to_updates(
1121 ®istered_rooms,
1122 &mut room_event_cache_generic_update_receiver,
1123 &mut send_queue_generic_update_receiver,
1124 &mut room_info_update_receiver,
1125 &latest_event_queue_sender,
1126 )
1127 .await
1128 .is_continue()
1129 );
1130
1131 assert!(latest_event_queue_receiver.is_empty());
1133 }
1134
1135 {
1137 room_info_update_sender
1138 .send(RoomInfoNotableUpdate {
1139 room_id: room_id.clone(),
1140 reasons: RoomInfoNotableUpdateReasons::MEMBERSHIP,
1141 })
1142 .unwrap();
1143
1144 assert!(
1145 listen_to_updates(
1146 ®istered_rooms,
1147 &mut room_event_cache_generic_update_receiver,
1148 &mut send_queue_generic_update_receiver,
1149 &mut room_info_update_receiver,
1150 &latest_event_queue_sender,
1151 )
1152 .await
1153 .is_continue()
1154 );
1155
1156 assert!(latest_event_queue_receiver.is_empty().not());
1158 }
1159 }
1160
1161 #[async_test]
1162 async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
1163 let registered_rooms = RwLock::new(HashMap::new());
1164 let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1165 broadcast::channel(1);
1166 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1167 broadcast::channel(1);
1168 let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1169 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1170
1171 drop(room_event_cache_generic_update_sender);
1173
1174 assert!(
1176 listen_to_updates(
1177 ®istered_rooms,
1178 &mut room_event_cache_generic_update_receiver,
1179 &mut send_queue_generic_update_receiver,
1180 &mut room_info_update_receiver,
1181 &latest_event_queue_sender,
1182 )
1183 .await
1184 .is_break()
1186 );
1187
1188 assert!(latest_event_queue_receiver.is_empty());
1189 }
1190
1191 #[async_test]
1192 async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
1193 let registered_rooms = RwLock::new(HashMap::new());
1194 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1195 broadcast::channel(1);
1196 let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1197 broadcast::channel(1);
1198 let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1199 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1200
1201 drop(send_queue_generic_update_sender);
1203
1204 assert!(
1206 listen_to_updates(
1207 ®istered_rooms,
1208 &mut room_event_cache_generic_update_receiver,
1209 &mut send_queue_generic_update_receiver,
1210 &mut room_info_update_receiver,
1211 &latest_event_queue_sender,
1212 )
1213 .await
1214 .is_break()
1216 );
1217
1218 assert!(latest_event_queue_receiver.is_empty());
1219 }
1220
1221 #[async_test]
1222 async fn test_inputs_task_stops_when_room_info_updates_are_closed() {
1223 let registered_rooms = RwLock::new(HashMap::new());
1224 let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1225 broadcast::channel(1);
1226 let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1227 broadcast::channel(1);
1228 let (room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1229 let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1230
1231 drop(room_info_update_sender);
1233
1234 assert!(
1236 listen_to_updates(
1237 ®istered_rooms,
1238 &mut room_event_cache_generic_update_receiver,
1239 &mut send_queue_generic_update_receiver,
1240 &mut room_info_update_receiver,
1241 &latest_event_queue_sender,
1242 )
1243 .await
1244 .is_break()
1246 );
1247
1248 assert!(latest_event_queue_receiver.is_empty());
1249 }
1250
1251 #[async_test]
1252 async fn test_latest_event_value_is_updated_via_event_cache() {
1253 let room_id = owned_room_id!("!r0");
1254 let user_id = user_id!("@mnt_io:matrix.org");
1255 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1256 let event_id_0 = event_id!("$ev0");
1257
1258 let server = MatrixMockServer::new().await;
1259 let client = server.client_builder().build().await;
1260
1261 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1263
1264 let event_cache = client.event_cache();
1265 event_cache.subscribe().unwrap();
1266
1267 let latest_events = client.latest_events().await;
1268
1269 let mut latest_event_stream =
1271 latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1272
1273 assert_pending!(latest_event_stream);
1275
1276 server
1278 .sync_room(
1279 &client,
1280 JoinedRoomBuilder::new(&room_id)
1281 .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_0)),
1282 )
1283 .await;
1284
1285 assert_matches!(
1289 latest_event_stream.next().await,
1290 Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1291 assert_matches!(
1292 event.deserialize().unwrap(),
1293 AnySyncTimelineEvent::MessageLike(
1294 AnySyncMessageLikeEvent::RoomMessage(
1295 SyncMessageLikeEvent::Original(message_content)
1296 )
1297 ) => {
1298 assert_eq!(message_content.content.body(), "raclette !");
1299 }
1300 );
1301 }
1302 );
1303
1304 assert_pending!(latest_event_stream);
1305 }
1306
1307 #[async_test]
1308 async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily() {
1309 let room_id = owned_room_id!("!r0");
1310 let user_id = user_id!("@mnt_io:matrix.org");
1311 let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1312 let event_id_0 = event_id!("$ev0");
1313
1314 let server = MatrixMockServer::new().await;
1315 let client = server.client_builder().build().await;
1316
1317 {
1319 client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1321
1322 client
1324 .event_cache_store()
1325 .lock()
1326 .await
1327 .expect("Could not acquire the event cache lock")
1328 .as_clean()
1329 .expect("Could not acquire a clean event cache lock")
1330 .handle_linked_chunk_updates(
1331 LinkedChunkId::Room(&room_id),
1332 vec![
1333 Update::NewItemsChunk {
1334 previous: None,
1335 new: ChunkIdentifier::new(0),
1336 next: None,
1337 },
1338 Update::PushItems {
1339 at: Position::new(ChunkIdentifier::new(0), 0),
1340 items: vec![
1341 event_factory.text_msg("hello").event_id(event_id_0).into(),
1342 ],
1343 },
1344 ],
1345 )
1346 .await
1347 .unwrap();
1348 }
1349
1350 let event_cache = client.event_cache();
1351 event_cache.subscribe().unwrap();
1352
1353 let latest_events = client.latest_events().await;
1354
1355 let mut latest_event_stream =
1356 latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1357
1358 yield_now().await;
1364 assert_matches!(latest_event_stream.next_now().await, LatestEventValue::Remote(_));
1365
1366 assert_pending!(latest_event_stream);
1367 }
1368
1369 #[async_test]
1377 async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily_inner() {
1378 let room_id_0 = owned_room_id!("!r0");
1379 let room_id_1 = owned_room_id!("!r1");
1380
1381 let server = MatrixMockServer::new().await;
1382 let client = server.client_builder().build().await;
1383
1384 let room_0 = client.base_client().get_or_create_room(&room_id_0, RoomState::Joined);
1386 let room_1 = client.base_client().get_or_create_room(&room_id_1, RoomState::Joined);
1387
1388 let mut room_info_1 = room_0.clone_info();
1391 room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
1392 room_1.set_room_info(room_info_1, Default::default());
1393
1394 let weak_client = WeakClient::from_client(&client);
1395
1396 let event_cache = client.event_cache();
1397 event_cache.subscribe().unwrap();
1398
1399 let (latest_event_queue_sender, mut latest_event_queue_receiver) =
1400 mpsc::unbounded_channel();
1401
1402 let registered_rooms =
1403 RegisteredRooms::new(weak_client, event_cache, &latest_event_queue_sender);
1404
1405 {
1408 let room_latest_events = registered_rooms.for_room(&room_id_0).await.unwrap().unwrap();
1409 assert_matches!(
1410 room_latest_events.read().await.for_room().get().await,
1411 LatestEventValue::None
1412 );
1413 assert_matches!(
1414 latest_event_queue_receiver.recv().await,
1415 Some(LatestEventQueueUpdate::EventCache { room_id }) => {
1416 assert_eq!(room_id, room_id_0);
1417 }
1418 );
1419 assert!(latest_event_queue_receiver.is_empty());
1420 }
1421
1422 {
1425 let room_latest_events = registered_rooms.for_room(&room_id_1).await.unwrap().unwrap();
1426 assert_matches!(
1427 room_latest_events.read().await.for_room().get().await,
1428 LatestEventValue::LocalIsSending(_)
1429 );
1430 assert!(latest_event_queue_receiver.is_empty());
1431 }
1432 }
1433
1434 #[async_test]
1435 async fn test_latest_event_value_is_updated_via_room_infos_for_invites() {
1436 let room_id = owned_room_id!("!r0");
1437 let event_factory = EventFactory::new().room(&room_id);
1438 let event_id_0 = event_id!("$ev0");
1439 let event_id_1 = event_id!("$ev1");
1440
1441 let server = MatrixMockServer::new().await;
1442 let client = server.client_builder().build().await;
1443 let own_user_id = client.user_id().unwrap();
1444 let other_user_id = user_id!("@other:servername");
1445
1446 let event_cache = client.event_cache();
1447 event_cache.subscribe().unwrap();
1448
1449 let latest_events = client.latest_events().await;
1450
1451 let mut latest_event_stream =
1453 latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1454
1455 assert_pending!(latest_event_stream);
1457
1458 let now = MilliSecondsSinceUnixEpoch::now().get();
1459
1460 {
1462 server
1463 .sync_room(
1464 &client,
1465 InvitedRoomBuilder::new(&room_id).add_state_event(
1466 event_factory
1467 .member(other_user_id)
1468 .invited(own_user_id)
1469 .event_id(event_id_0),
1470 ),
1471 )
1472 .await;
1473
1474 assert_matches!(
1478 latest_event_stream.next().await,
1479 Some(LatestEventValue::RemoteInvite { event_id, timestamp, inviter }) => {
1480 assert!(event_id.is_none());
1482 assert!(timestamp.get() >= now);
1484 assert_eq!(inviter.as_deref(), Some(other_user_id));
1485 }
1486 );
1487
1488 assert_pending!(latest_event_stream);
1489 };
1490
1491 {
1493 server
1494 .sync_room(
1495 &client,
1496 InvitedRoomBuilder::new(&room_id).add_state_event(
1497 event_factory
1498 .member(other_user_id)
1499 .invited(own_user_id)
1500 .event_id(event_id_0),
1501 ),
1502 )
1503 .await;
1504
1505 assert!(timeout(Duration::from_secs(1), latest_event_stream.next()).await.is_err());
1510
1511 assert_pending!(latest_event_stream);
1512 }
1513
1514 {
1516 let now = u64::from(now) + 10; server
1518 .sync_room(
1519 &client,
1520 JoinedRoomBuilder::new(&room_id).add_timeline_event(
1521 event_factory
1522 .member(own_user_id)
1523 .membership(MembershipState::Join)
1524 .event_id(event_id_1)
1525 .server_ts(now),
1526 ),
1527 )
1528 .await;
1529
1530 assert_matches!(
1534 latest_event_stream.next().await,
1535 Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1536 assert_matches!(
1537 event.deserialize().unwrap(),
1538 AnySyncTimelineEvent::State(
1539 AnySyncStateEvent::RoomMember(
1540 SyncRoomMemberEvent::Original(event)
1541 )
1542 ) => {
1543 assert_eq!(event.event_id, event_id_1);
1544 assert_eq!(event.content.membership, MembershipState::Join);
1545 assert_eq!(event.sender, own_user_id);
1546 assert_eq!(event.state_key, own_user_id);
1547 assert_eq!(u64::from(event.origin_server_ts.get()), now);
1548 }
1549 );
1550 }
1551 );
1552
1553 assert_pending!(latest_event_stream);
1554 }
1555 }
1556}