1#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37 borrow::Cow,
38 fmt,
39 future::Future,
40 pin::Pin,
41 sync::{
42 atomic::{AtomicU64, Ordering::SeqCst},
43 Arc, RwLock, Weak,
44 },
45 task::{Context, Poll},
46};
47
48use anymap2::any::CloneAnySendSync;
49use eyeball::{SharedObservable, Subscriber};
50use futures_core::Stream;
51use futures_util::stream::{FuturesUnordered, StreamExt};
52use matrix_sdk_base::{
53 deserialized_responses::{EncryptionInfo, TimelineEvent},
54 SendOutsideWasm, SyncOutsideWasm,
55};
56use pin_project_lite::pin_project;
57use ruma::{events::AnySyncStateEvent, push::Action, serde::Raw, OwnedRoomId};
58use serde::{de::DeserializeOwned, Deserialize};
59use serde_json::value::RawValue as RawJsonValue;
60use tracing::{debug, error, field::debug, instrument, warn};
61
62use self::maps::EventHandlerMaps;
63use crate::{Client, Room};
64
65mod context;
66mod maps;
67mod static_events;
68
69pub use self::context::{Ctx, EventHandlerContext, RawEvent};
70
71#[cfg(not(target_arch = "wasm32"))]
72type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
73#[cfg(target_arch = "wasm32")]
74type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
75
76#[cfg(not(target_arch = "wasm32"))]
77type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
78#[cfg(target_arch = "wasm32")]
79type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
80
81type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
82
83#[derive(Default)]
84pub(crate) struct EventHandlerStore {
85 handlers: RwLock<EventHandlerMaps>,
86 context: RwLock<AnyMap>,
87 counter: AtomicU64,
88}
89
90impl EventHandlerStore {
91 pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
92 self.handlers.write().unwrap().add(handle, handler_fn);
93 }
94
95 pub fn add_context<T>(&self, ctx: T)
96 where
97 T: Clone + Send + Sync + 'static,
98 {
99 self.context.write().unwrap().insert(ctx);
100 }
101
102 pub fn remove(&self, handle: EventHandlerHandle) {
103 self.handlers.write().unwrap().remove(handle);
104 }
105
106 #[cfg(test)]
107 fn len(&self) -> usize {
108 self.handlers.read().unwrap().len()
109 }
110}
111
112#[doc(hidden)]
113#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
114pub enum HandlerKind {
115 GlobalAccountData,
116 RoomAccountData,
117 EphemeralRoomData,
118 Timeline,
119 MessageLike,
120 OriginalMessageLike,
121 RedactedMessageLike,
122 State,
123 OriginalState,
124 RedactedState,
125 StrippedState,
126 ToDevice,
127 Presence,
128}
129
130impl HandlerKind {
131 fn message_like_redacted(redacted: bool) -> Self {
132 if redacted {
133 Self::RedactedMessageLike
134 } else {
135 Self::OriginalMessageLike
136 }
137 }
138
139 fn state_redacted(redacted: bool) -> Self {
140 if redacted {
141 Self::RedactedState
142 } else {
143 Self::OriginalState
144 }
145 }
146}
147
148pub trait SyncEvent {
150 #[doc(hidden)]
151 const KIND: HandlerKind;
152 #[doc(hidden)]
153 const TYPE: Option<&'static str>;
154}
155
156pub(crate) struct EventHandlerWrapper {
157 handler_fn: Box<EventHandlerFn>,
158 pub handler_id: u64,
159}
160
161#[derive(Clone, Debug)]
164pub struct EventHandlerHandle {
165 pub(crate) ev_kind: HandlerKind,
166 pub(crate) ev_type: Option<&'static str>,
167 pub(crate) room_id: Option<OwnedRoomId>,
168 pub(crate) handler_id: u64,
169}
170
171pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
205 #[doc(hidden)]
207 type Future: EventHandlerFuture;
208
209 #[doc(hidden)]
216 fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
217}
218
219#[doc(hidden)]
220pub trait EventHandlerFuture:
221 Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
222{
223 type Output: EventHandlerResult;
224}
225
226impl<T> EventHandlerFuture for T
227where
228 T: Future + SendOutsideWasm + 'static,
229 <T as Future>::Output: EventHandlerResult,
230{
231 type Output = <T as Future>::Output;
232}
233
234#[doc(hidden)]
235#[derive(Debug)]
236pub struct EventHandlerData<'a> {
237 client: Client,
238 room: Option<Room>,
239 raw: &'a RawJsonValue,
240 encryption_info: Option<&'a EncryptionInfo>,
241 push_actions: &'a [Action],
242 handle: EventHandlerHandle,
243}
244
245pub trait EventHandlerResult: Sized {
249 #[doc(hidden)]
250 fn print_error(&self, event_type: Option<&str>);
251}
252
253impl EventHandlerResult for () {
254 fn print_error(&self, _event_type: Option<&str>) {}
255}
256
257impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
258 fn print_error(&self, event_type: Option<&str>) {
259 let msg_fragment = match event_type {
260 Some(event_type) => format!(" for `{event_type}`"),
261 None => "".to_owned(),
262 };
263
264 match self {
265 #[cfg(feature = "anyhow")]
266 Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
267 error!("Event handler{msg_fragment} failed: {e:?}");
268 }
269 #[cfg(feature = "eyre")]
270 Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
271 error!("Event handler{msg_fragment} failed: {e:?}");
272 }
273 Err(e) => {
274 error!("Event handler{msg_fragment} failed: {e}");
275 }
276 Ok(_) => {}
277 }
278 }
279}
280
281#[derive(Deserialize)]
282struct UnsignedDetails {
283 redacted_because: Option<serde::de::IgnoredAny>,
284}
285
286impl Client {
288 pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
289 &self,
290 handler: H,
291 room_id: Option<OwnedRoomId>,
292 ) -> EventHandlerHandle
293 where
294 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
295 H: EventHandler<Ev, Ctx>,
296 {
297 let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
298 let maybe_fut = serde_json::from_str(data.raw.get())
299 .map(|ev| handler.clone().handle_event(ev, data));
300
301 Box::pin(async move {
302 match maybe_fut {
303 Ok(Some(fut)) => {
304 fut.await.print_error(Ev::TYPE);
305 }
306 Ok(None) => {
307 error!(
308 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
309 "Event handler has an invalid context argument",
310 );
311 }
312 Err(e) => {
313 warn!(
314 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
315 "Failed to deserialize event, skipping event handler.\n
316 Deserialization error: {e}",
317 );
318 }
319 }
320 })
321 });
322
323 let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
324 let handle =
325 EventHandlerHandle { ev_kind: Ev::KIND, ev_type: Ev::TYPE, room_id, handler_id };
326
327 self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
328
329 handle
330 }
331
332 pub(crate) async fn handle_sync_events<T>(
333 &self,
334 kind: HandlerKind,
335 room: Option<&Room>,
336 events: &[Raw<T>],
337 ) -> serde_json::Result<()> {
338 #[derive(Deserialize)]
339 struct ExtractType<'a> {
340 #[serde(borrow, rename = "type")]
341 event_type: Cow<'a, str>,
342 }
343
344 for raw_event in events {
345 let event_type = raw_event.deserialize_as::<ExtractType<'_>>()?.event_type;
346 self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
347 }
348
349 Ok(())
350 }
351
352 pub(crate) async fn handle_sync_state_events(
353 &self,
354 room: Option<&Room>,
355 state_events: &[Raw<AnySyncStateEvent>],
356 ) -> serde_json::Result<()> {
357 #[derive(Deserialize)]
358 struct StateEventDetails<'a> {
359 #[serde(borrow, rename = "type")]
360 event_type: Cow<'a, str>,
361 unsigned: Option<UnsignedDetails>,
362 }
363
364 self.handle_sync_events(HandlerKind::State, room, state_events).await?;
366
367 for raw_event in state_events {
369 let StateEventDetails { event_type, unsigned } = raw_event.deserialize_as()?;
370 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
371 let handler_kind = HandlerKind::state_redacted(redacted);
372
373 self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
374 .await;
375 }
376
377 Ok(())
378 }
379
380 pub(crate) async fn handle_sync_timeline_events(
381 &self,
382 room: Option<&Room>,
383 timeline_events: &[TimelineEvent],
384 ) -> serde_json::Result<()> {
385 #[derive(Deserialize)]
386 struct TimelineEventDetails<'a> {
387 #[serde(borrow, rename = "type")]
388 event_type: Cow<'a, str>,
389 state_key: Option<serde::de::IgnoredAny>,
390 unsigned: Option<UnsignedDetails>,
391 }
392
393 for item in timeline_events {
394 let TimelineEventDetails { event_type, state_key, unsigned } =
395 item.raw().deserialize_as()?;
396
397 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
398 let (handler_kind_g, handler_kind_r) = match state_key {
399 Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
400 None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
401 };
402
403 let raw_event = item.raw().json();
404 let encryption_info = item.encryption_info();
405 let push_actions = item.push_actions.as_deref().unwrap_or(&[]);
406
407 self.call_event_handlers(
409 room,
410 raw_event,
411 handler_kind_g,
412 &event_type,
413 encryption_info,
414 push_actions,
415 )
416 .await;
417
418 self.call_event_handlers(
420 room,
421 raw_event,
422 handler_kind_r,
423 &event_type,
424 encryption_info,
425 push_actions,
426 )
427 .await;
428
429 let kind = HandlerKind::Timeline;
431 self.call_event_handlers(
432 room,
433 raw_event,
434 kind,
435 &event_type,
436 encryption_info,
437 push_actions,
438 )
439 .await;
440 }
441
442 Ok(())
443 }
444
445 #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
446 async fn call_event_handlers(
447 &self,
448 room: Option<&Room>,
449 raw: &RawJsonValue,
450 event_kind: HandlerKind,
451 event_type: &str,
452 encryption_info: Option<&EncryptionInfo>,
453 push_actions: &[Action],
454 ) {
455 let room_id = room.map(|r| r.room_id());
456 if let Some(room_id) = room_id {
457 tracing::Span::current().record("room_id", debug(room_id));
458 }
459
460 let mut futures: FuturesUnordered<_> = self
462 .inner
463 .event_handlers
464 .handlers
465 .read()
466 .unwrap()
467 .get_handlers(event_kind, event_type, room_id)
468 .map(|(handle, handler_fn)| {
469 let data = EventHandlerData {
470 client: self.clone(),
471 room: room.cloned(),
472 raw,
473 encryption_info,
474 push_actions,
475 handle,
476 };
477
478 (handler_fn)(data)
479 })
480 .collect();
481
482 if !futures.is_empty() {
483 debug!(amount = futures.len(), "Calling event handlers");
484
485 while let Some(()) = futures.next().await {}
488 }
489 }
490}
491
492#[derive(Debug)]
497pub struct EventHandlerDropGuard {
498 handle: EventHandlerHandle,
499 client: Client,
500}
501
502impl EventHandlerDropGuard {
503 pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
504 Self { handle, client }
505 }
506}
507
508impl Drop for EventHandlerDropGuard {
509 fn drop(&mut self) {
510 self.client.remove_event_handler(self.handle.clone());
511 }
512}
513
514macro_rules! impl_event_handler {
515 ($($ty:ident),* $(,)?) => {
516 impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
517 where
518 Ev: SyncEvent,
519 Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
520 Fut: EventHandlerFuture,
521 $($ty: EventHandlerContext),*
522 {
523 type Future = Fut;
524
525 fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
526 Some((self)(ev, $($ty::from_data(&_d)?),*))
527 }
528 }
529 };
530}
531
532impl_event_handler!();
533impl_event_handler!(A);
534impl_event_handler!(A, B);
535impl_event_handler!(A, B, C);
536impl_event_handler!(A, B, C, D);
537impl_event_handler!(A, B, C, D, E);
538impl_event_handler!(A, B, C, D, E, F);
539impl_event_handler!(A, B, C, D, E, F, G);
540impl_event_handler!(A, B, C, D, E, F, G, H);
541
542#[derive(Debug)]
550pub struct ObservableEventHandler<T> {
551 shared_observable: SharedObservable<Option<T>>,
556
557 event_handler_guard: Arc<EventHandlerDropGuard>,
564}
565
566impl<T> ObservableEventHandler<T> {
567 pub(crate) fn new(
568 shared_observable: SharedObservable<Option<T>>,
569 event_handler_guard: EventHandlerDropGuard,
570 ) -> Self {
571 Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
572 }
573
574 pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
579 EventHandlerSubscriber::new(
580 self.shared_observable.subscribe(),
581 Arc::downgrade(&self.event_handler_guard),
584 )
585 }
586}
587
588pin_project! {
589 #[derive(Debug)]
598 pub struct EventHandlerSubscriber<T> {
599 #[pin]
605 subscriber: Subscriber<Option<T>>,
606
607 event_handler_guard: Weak<EventHandlerDropGuard>,
613 }
614}
615
616impl<T> EventHandlerSubscriber<T> {
617 fn new(
618 subscriber: Subscriber<Option<T>>,
619 event_handler_handle: Weak<EventHandlerDropGuard>,
620 ) -> Self {
621 Self { subscriber, event_handler_guard: event_handler_handle }
622 }
623}
624
625impl<T> Stream for EventHandlerSubscriber<T>
626where
627 T: Clone,
628{
629 type Item = T;
630
631 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
632 let mut this = self.project();
633
634 let Some(_) = this.event_handler_guard.upgrade() else {
635 return Poll::Ready(None);
639 };
640
641 loop {
651 match this.subscriber.as_mut().poll_next(context) {
652 Poll::Ready(None) => return Poll::Ready(None),
654
655 Poll::Ready(Some(None)) => {
658 continue;
660 }
661
662 Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
664
665 Poll::Pending => return Poll::Pending,
667 }
668 }
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use matrix_sdk_test::{
675 async_test,
676 event_factory::{EventFactory, PreviousMembership},
677 InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID,
678 };
679 use stream_assert::{assert_closed, assert_pending, assert_ready};
680 #[cfg(target_arch = "wasm32")]
681 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
682 use std::{
683 future,
684 sync::{
685 atomic::{AtomicU8, Ordering::SeqCst},
686 Arc,
687 },
688 };
689
690 use matrix_sdk_test::{StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder};
691 use once_cell::sync::Lazy;
692 use ruma::{
693 event_id,
694 events::{
695 room::{
696 member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
697 name::OriginalSyncRoomNameEvent,
698 power_levels::OriginalSyncRoomPowerLevelsEvent,
699 },
700 typing::SyncTypingEvent,
701 AnySyncStateEvent, AnySyncTimelineEvent,
702 },
703 room_id,
704 serde::Raw,
705 user_id,
706 };
707 use serde_json::json;
708
709 use crate::{
710 event_handler::Ctx,
711 test_utils::{logged_in_client, no_retry_test_client},
712 Client, Room,
713 };
714
715 static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
716 EventFactory::new()
717 .member(user_id!("@example:localhost"))
718 .membership(MembershipState::Join)
719 .display_name("example")
720 .event_id(event_id!("$151800140517rfvjc:localhost"))
721 .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
722 .into()
723 });
724
725 #[async_test]
726 async fn test_add_event_handler() -> crate::Result<()> {
727 let client = logged_in_client(None).await;
728
729 let member_count = Arc::new(AtomicU8::new(0));
730 let typing_count = Arc::new(AtomicU8::new(0));
731 let power_levels_count = Arc::new(AtomicU8::new(0));
732 let invited_member_count = Arc::new(AtomicU8::new(0));
733
734 client.add_event_handler({
735 let member_count = member_count.clone();
736 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
737 member_count.fetch_add(1, SeqCst);
738 }
739 });
740 client.add_event_handler({
741 let typing_count = typing_count.clone();
742 move |_ev: SyncTypingEvent| async move {
743 typing_count.fetch_add(1, SeqCst);
744 }
745 });
746 client.add_event_handler({
747 let power_levels_count = power_levels_count.clone();
748 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
749 power_levels_count.fetch_add(1, SeqCst);
750 }
751 });
752 client.add_event_handler({
753 let invited_member_count = invited_member_count.clone();
754 move |_ev: StrippedRoomMemberEvent| async move {
755 invited_member_count.fetch_add(1, SeqCst);
756 }
757 });
758
759 let f = EventFactory::new();
760 let response = SyncResponseBuilder::default()
761 .add_joined_room(
762 JoinedRoomBuilder::default()
763 .add_timeline_event(MEMBER_EVENT.clone())
764 .add_typing(
765 f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
766 )
767 .add_state_event(StateTestEvent::PowerLevels),
768 )
769 .add_invited_room(
770 InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
771 StrippedStateTestEvent::Custom(json!({
772 "content": {
773 "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
774 "displayname": "Alice",
775 "membership": "invite",
776 },
777 "event_id": "$143273582443PhrSn:example.org",
778 "origin_server_ts": 1432735824653u64,
779 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
780 "sender": "@example:example.org",
781 "state_key": "@alice:example.org",
782 "type": "m.room.member",
783 "unsigned": {
784 "age": 1234,
785 "invite_room_state": [
786 {
787 "content": {
788 "name": "Example Room"
789 },
790 "sender": "@bob:example.org",
791 "state_key": "",
792 "type": "m.room.name"
793 },
794 {
795 "content": {
796 "join_rule": "invite"
797 },
798 "sender": "@bob:example.org",
799 "state_key": "",
800 "type": "m.room.join_rules"
801 }
802 ]
803 }
804 })),
805 ),
806 )
807 .build_sync_response();
808 client.process_sync(response).await?;
809
810 assert_eq!(member_count.load(SeqCst), 1);
811 assert_eq!(typing_count.load(SeqCst), 1);
812 assert_eq!(power_levels_count.load(SeqCst), 1);
813 assert_eq!(invited_member_count.load(SeqCst), 1);
814
815 Ok(())
816 }
817
818 #[async_test]
819 #[allow(dependency_on_unit_never_type_fallback)]
820 async fn test_add_room_event_handler() -> crate::Result<()> {
821 let client = logged_in_client(None).await;
822
823 let room_id_a = room_id!("!foo:example.org");
824 let room_id_b = room_id!("!bar:matrix.org");
825
826 let member_count = Arc::new(AtomicU8::new(0));
827 let power_levels_count = Arc::new(AtomicU8::new(0));
828
829 client.add_room_event_handler(room_id_a, {
831 let member_count = member_count.clone();
832 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
833 member_count.fetch_add(1, SeqCst);
834 future::ready(())
835 }
836 });
837 client.add_room_event_handler(room_id_b, {
838 let member_count = member_count.clone();
839 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
840 member_count.fetch_add(1, SeqCst);
841 future::ready(())
842 }
843 });
844
845 client.add_room_event_handler(room_id_a, {
847 let power_levels_count = power_levels_count.clone();
848 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
849 power_levels_count.fetch_add(1, SeqCst);
850 future::ready(())
851 }
852 });
853
854 client.add_room_event_handler(room_id_b, move |_ev: OriginalSyncRoomNameEvent| async {
856 unreachable!("No room event in room B")
857 });
858
859 let response = SyncResponseBuilder::default()
860 .add_joined_room(
861 JoinedRoomBuilder::new(room_id_a)
862 .add_timeline_event(MEMBER_EVENT.clone())
863 .add_state_event(StateTestEvent::PowerLevels)
864 .add_state_event(StateTestEvent::RoomName),
865 )
866 .add_joined_room(
867 JoinedRoomBuilder::new(room_id_b)
868 .add_timeline_event(MEMBER_EVENT.clone())
869 .add_state_event(StateTestEvent::PowerLevels),
870 )
871 .build_sync_response();
872 client.process_sync(response).await?;
873
874 assert_eq!(member_count.load(SeqCst), 2);
875 assert_eq!(power_levels_count.load(SeqCst), 1);
876
877 Ok(())
878 }
879
880 #[async_test]
881 #[allow(dependency_on_unit_never_type_fallback)]
882 async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
883 let client = logged_in_client(None).await;
884
885 client.add_event_handler(
886 |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
887 );
888
889 Ok(())
892 }
893
894 #[async_test]
895 #[allow(dependency_on_unit_never_type_fallback)]
896 async fn test_remove_event_handler() -> crate::Result<()> {
897 let client = logged_in_client(None).await;
898
899 let member_count = Arc::new(AtomicU8::new(0));
900
901 client.add_event_handler({
902 let member_count = member_count.clone();
903 move |_ev: OriginalSyncRoomMemberEvent| async move {
904 member_count.fetch_add(1, SeqCst);
905 }
906 });
907
908 let handle_a = client.add_event_handler(move |_ev: OriginalSyncRoomMemberEvent| async {
909 panic!("handler should have been removed");
910 });
911 let handle_b = client.add_room_event_handler(
912 #[allow(unknown_lints, clippy::explicit_auto_deref)] *DEFAULT_TEST_ROOM_ID,
914 move |_ev: OriginalSyncRoomMemberEvent| async {
915 panic!("handler should have been removed");
916 },
917 );
918
919 client.add_event_handler({
920 let member_count = member_count.clone();
921 move |_ev: OriginalSyncRoomMemberEvent| async move {
922 member_count.fetch_add(1, SeqCst);
923 }
924 });
925
926 let response = SyncResponseBuilder::default()
927 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
928 .build_sync_response();
929
930 client.remove_event_handler(handle_a);
931 client.remove_event_handler(handle_b);
932
933 client.process_sync(response).await?;
934
935 assert_eq!(member_count.load(SeqCst), 2);
936
937 Ok(())
938 }
939
940 #[async_test]
941 async fn test_event_handler_drop_guard() {
942 let client = no_retry_test_client(None).await;
943
944 let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
945 assert_eq!(client.inner.event_handlers.len(), 1);
946
947 {
948 let _guard = client.event_handler_drop_guard(handle);
949 assert_eq!(client.inner.event_handlers.len(), 1);
950 }
952
953 assert_eq!(client.inner.event_handlers.len(), 0);
954 }
955
956 #[async_test]
957 async fn test_use_client_in_handler() {
958 let client = no_retry_test_client(None).await;
962
963 client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
964 let _caps = client.get_capabilities().await?;
968 anyhow::Ok(())
969 });
970 }
971
972 #[async_test]
973 async fn test_raw_event_handler() -> crate::Result<()> {
974 let client = logged_in_client(None).await;
975 let counter = Arc::new(AtomicU8::new(0));
976 client.add_event_handler_context(counter.clone());
977 client.add_event_handler(
978 |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
979 counter.fetch_add(1, SeqCst);
980 },
981 );
982
983 let response = SyncResponseBuilder::default()
984 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
985 .build_sync_response();
986 client.process_sync(response).await?;
987
988 assert_eq!(counter.load(SeqCst), 1);
989 Ok(())
990 }
991
992 #[async_test]
993 async fn test_enum_event_handler() -> crate::Result<()> {
994 let client = logged_in_client(None).await;
995 let counter = Arc::new(AtomicU8::new(0));
996 client.add_event_handler_context(counter.clone());
997 client.add_event_handler(
998 |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
999 counter.fetch_add(1, SeqCst);
1000 },
1001 );
1002
1003 let response = SyncResponseBuilder::default()
1004 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1005 .build_sync_response();
1006 client.process_sync(response).await?;
1007
1008 assert_eq!(counter.load(SeqCst), 1);
1009 Ok(())
1010 }
1011
1012 #[async_test]
1013 #[allow(dependency_on_unit_never_type_fallback)]
1014 async fn test_observe_events() -> crate::Result<()> {
1015 let client = logged_in_client(None).await;
1016
1017 let room_id_0 = room_id!("!r0.matrix.org");
1018 let room_id_1 = room_id!("!r1.matrix.org");
1019
1020 let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1021
1022 let mut subscriber = observable.subscribe();
1023
1024 assert_pending!(subscriber);
1025
1026 let mut response_builder = SyncResponseBuilder::new();
1027 let response = response_builder
1028 .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1029 StateTestEvent::Custom(json!({
1030 "content": {
1031 "name": "Name 0"
1032 },
1033 "event_id": "$ev0",
1034 "origin_server_ts": 1,
1035 "sender": "@mnt_io:matrix.org",
1036 "state_key": "",
1037 "type": "m.room.name",
1038 "unsigned": {
1039 "age": 1,
1040 }
1041 })),
1042 ))
1043 .build_sync_response();
1044 client.process_sync(response).await?;
1045
1046 let (room_name, room) = assert_ready!(subscriber);
1047
1048 assert_eq!(room_name.event_id.as_str(), "$ev0");
1049 assert_eq!(room.room_id(), room_id_0);
1050 assert_eq!(room.name().unwrap(), "Name 0");
1051
1052 assert_pending!(subscriber);
1053
1054 let response = response_builder
1055 .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1056 StateTestEvent::Custom(json!({
1057 "content": {
1058 "name": "Name 1"
1059 },
1060 "event_id": "$ev1",
1061 "origin_server_ts": 2,
1062 "sender": "@mnt_io:matrix.org",
1063 "state_key": "",
1064 "type": "m.room.name",
1065 "unsigned": {
1066 "age": 2,
1067 }
1068 })),
1069 ))
1070 .build_sync_response();
1071 client.process_sync(response).await?;
1072
1073 let (room_name, room) = assert_ready!(subscriber);
1074
1075 assert_eq!(room_name.event_id.as_str(), "$ev1");
1076 assert_eq!(room.room_id(), room_id_1);
1077 assert_eq!(room.name().unwrap(), "Name 1");
1078
1079 assert_pending!(subscriber);
1080
1081 drop(observable);
1082 assert_closed!(subscriber);
1083
1084 Ok(())
1085 }
1086
1087 #[async_test]
1088 #[allow(dependency_on_unit_never_type_fallback)]
1089 async fn test_observe_room_events() -> crate::Result<()> {
1090 let client = logged_in_client(None).await;
1091
1092 let room_id = room_id!("!r0.matrix.org");
1093
1094 let observable_for_room =
1095 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1096
1097 let mut subscriber_for_room = observable_for_room.subscribe();
1098
1099 assert_pending!(subscriber_for_room);
1100
1101 let mut response_builder = SyncResponseBuilder::new();
1102 let response = response_builder
1103 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1104 StateTestEvent::Custom(json!({
1105 "content": {
1106 "name": "Name 0"
1107 },
1108 "event_id": "$ev0",
1109 "origin_server_ts": 1,
1110 "sender": "@mnt_io:matrix.org",
1111 "state_key": "",
1112 "type": "m.room.name",
1113 "unsigned": {
1114 "age": 1,
1115 }
1116 })),
1117 ))
1118 .build_sync_response();
1119 client.process_sync(response).await?;
1120
1121 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1122
1123 assert_eq!(room_name.event_id.as_str(), "$ev0");
1124 assert_eq!(room.name().unwrap(), "Name 0");
1125
1126 assert_pending!(subscriber_for_room);
1127
1128 let response = response_builder
1129 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1130 StateTestEvent::Custom(json!({
1131 "content": {
1132 "name": "Name 1"
1133 },
1134 "event_id": "$ev1",
1135 "origin_server_ts": 2,
1136 "sender": "@mnt_io:matrix.org",
1137 "state_key": "",
1138 "type": "m.room.name",
1139 "unsigned": {
1140 "age": 2,
1141 }
1142 })),
1143 ))
1144 .build_sync_response();
1145 client.process_sync(response).await?;
1146
1147 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1148
1149 assert_eq!(room_name.event_id.as_str(), "$ev1");
1150 assert_eq!(room.name().unwrap(), "Name 1");
1151
1152 assert_pending!(subscriber_for_room);
1153
1154 drop(observable_for_room);
1155 assert_closed!(subscriber_for_room);
1156
1157 Ok(())
1158 }
1159
1160 #[async_test]
1161 async fn test_observe_several_room_events() -> crate::Result<()> {
1162 let client = logged_in_client(None).await;
1163
1164 let room_id = room_id!("!r0.matrix.org");
1165
1166 let observable_for_room =
1167 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1168
1169 let mut subscriber_for_room = observable_for_room.subscribe();
1170
1171 assert_pending!(subscriber_for_room);
1172
1173 let mut response_builder = SyncResponseBuilder::new();
1174 let response = response_builder
1175 .add_joined_room(
1176 JoinedRoomBuilder::new(room_id)
1177 .add_state_event(StateTestEvent::Custom(json!({
1178 "content": {
1179 "name": "Name 0"
1180 },
1181 "event_id": "$ev0",
1182 "origin_server_ts": 1,
1183 "sender": "@mnt_io:matrix.org",
1184 "state_key": "",
1185 "type": "m.room.name",
1186 "unsigned": {
1187 "age": 1,
1188 }
1189 })))
1190 .add_state_event(StateTestEvent::Custom(json!({
1191 "content": {
1192 "name": "Name 1"
1193 },
1194 "event_id": "$ev1",
1195 "origin_server_ts": 2,
1196 "sender": "@mnt_io:matrix.org",
1197 "state_key": "",
1198 "type": "m.room.name",
1199 "unsigned": {
1200 "age": 1,
1201 }
1202 })))
1203 .add_state_event(StateTestEvent::Custom(json!({
1204 "content": {
1205 "name": "Name 2"
1206 },
1207 "event_id": "$ev2",
1208 "origin_server_ts": 3,
1209 "sender": "@mnt_io:matrix.org",
1210 "state_key": "",
1211 "type": "m.room.name",
1212 "unsigned": {
1213 "age": 1,
1214 }
1215 }))),
1216 )
1217 .build_sync_response();
1218 client.process_sync(response).await?;
1219
1220 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1221
1222 assert_eq!(room_name.event_id.as_str(), "$ev2");
1224 assert_eq!(room.name().unwrap(), "Name 2");
1225
1226 assert_pending!(subscriber_for_room);
1227
1228 drop(observable_for_room);
1229 assert_closed!(subscriber_for_room);
1230
1231 Ok(())
1232 }
1233}