1#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37 borrow::Cow,
38 fmt,
39 future::Future,
40 pin::Pin,
41 sync::{
42 atomic::{AtomicU64, Ordering::SeqCst},
43 Arc, RwLock, Weak,
44 },
45 task::{Context, Poll},
46};
47
48use anymap2::any::CloneAnySendSync;
49use eyeball::{SharedObservable, Subscriber};
50use futures_core::Stream;
51use futures_util::stream::{FuturesUnordered, StreamExt};
52use matrix_sdk_base::{
53 deserialized_responses::{EncryptionInfo, TimelineEvent},
54 SendOutsideWasm, SyncOutsideWasm,
55};
56use pin_project_lite::pin_project;
57use ruma::{events::AnySyncStateEvent, push::Action, serde::Raw, OwnedRoomId};
58use serde::{de::DeserializeOwned, Deserialize};
59use serde_json::value::RawValue as RawJsonValue;
60use tracing::{debug, error, field::debug, instrument, warn};
61
62use self::maps::EventHandlerMaps;
63use crate::{Client, Room};
64
65mod context;
66mod maps;
67mod static_events;
68
69pub use self::context::{Ctx, EventHandlerContext, RawEvent};
70
71#[cfg(not(target_arch = "wasm32"))]
72type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
73#[cfg(target_arch = "wasm32")]
74type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
75
76#[cfg(not(target_arch = "wasm32"))]
77type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
78#[cfg(target_arch = "wasm32")]
79type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
80
81type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
82
83#[derive(Default)]
84pub(crate) struct EventHandlerStore {
85 handlers: RwLock<EventHandlerMaps>,
86 context: RwLock<AnyMap>,
87 counter: AtomicU64,
88}
89
90impl EventHandlerStore {
91 pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
92 self.handlers.write().unwrap().add(handle, handler_fn);
93 }
94
95 pub fn add_context<T>(&self, ctx: T)
96 where
97 T: Clone + Send + Sync + 'static,
98 {
99 self.context.write().unwrap().insert(ctx);
100 }
101
102 pub fn remove(&self, handle: EventHandlerHandle) {
103 self.handlers.write().unwrap().remove(handle);
104 }
105
106 #[cfg(test)]
107 fn len(&self) -> usize {
108 self.handlers.read().unwrap().len()
109 }
110}
111
112#[doc(hidden)]
113#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
114pub enum HandlerKind {
115 GlobalAccountData,
116 RoomAccountData,
117 EphemeralRoomData,
118 Timeline,
119 MessageLike,
120 OriginalMessageLike,
121 RedactedMessageLike,
122 State,
123 OriginalState,
124 RedactedState,
125 StrippedState,
126 ToDevice,
127 Presence,
128}
129
130impl HandlerKind {
131 fn message_like_redacted(redacted: bool) -> Self {
132 if redacted {
133 Self::RedactedMessageLike
134 } else {
135 Self::OriginalMessageLike
136 }
137 }
138
139 fn state_redacted(redacted: bool) -> Self {
140 if redacted {
141 Self::RedactedState
142 } else {
143 Self::OriginalState
144 }
145 }
146}
147
148pub trait SyncEvent {
150 #[doc(hidden)]
151 const KIND: HandlerKind;
152 #[doc(hidden)]
153 const TYPE: Option<&'static str>;
154}
155
156pub(crate) struct EventHandlerWrapper {
157 handler_fn: Box<EventHandlerFn>,
158 pub handler_id: u64,
159}
160
161#[derive(Clone, Debug)]
164pub struct EventHandlerHandle {
165 pub(crate) ev_kind: HandlerKind,
166 pub(crate) ev_type: Option<&'static str>,
167 pub(crate) room_id: Option<OwnedRoomId>,
168 pub(crate) handler_id: u64,
169}
170
171pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
205 #[doc(hidden)]
207 type Future: EventHandlerFuture;
208
209 #[doc(hidden)]
216 fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
217}
218
219#[doc(hidden)]
220pub trait EventHandlerFuture:
221 Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
222{
223 type Output: EventHandlerResult;
224}
225
226impl<T> EventHandlerFuture for T
227where
228 T: Future + SendOutsideWasm + 'static,
229 <T as Future>::Output: EventHandlerResult,
230{
231 type Output = <T as Future>::Output;
232}
233
234#[doc(hidden)]
235#[derive(Debug)]
236pub struct EventHandlerData<'a> {
237 client: Client,
238 room: Option<Room>,
239 raw: &'a RawJsonValue,
240 encryption_info: Option<&'a EncryptionInfo>,
241 push_actions: &'a [Action],
242 handle: EventHandlerHandle,
243}
244
245pub trait EventHandlerResult: Sized {
249 #[doc(hidden)]
250 fn print_error(&self, event_type: Option<&str>);
251}
252
253impl EventHandlerResult for () {
254 fn print_error(&self, _event_type: Option<&str>) {}
255}
256
257impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
258 fn print_error(&self, event_type: Option<&str>) {
259 let msg_fragment = match event_type {
260 Some(event_type) => format!(" for `{event_type}`"),
261 None => "".to_owned(),
262 };
263
264 match self {
265 #[cfg(feature = "anyhow")]
266 Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
267 error!("Event handler{msg_fragment} failed: {e:?}");
268 }
269 #[cfg(feature = "eyre")]
270 Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
271 error!("Event handler{msg_fragment} failed: {e:?}");
272 }
273 Err(e) => {
274 error!("Event handler{msg_fragment} failed: {e}");
275 }
276 Ok(_) => {}
277 }
278 }
279}
280
281#[derive(Deserialize)]
282struct UnsignedDetails {
283 redacted_because: Option<serde::de::IgnoredAny>,
284}
285
286impl Client {
288 pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
289 &self,
290 handler: H,
291 room_id: Option<OwnedRoomId>,
292 ) -> EventHandlerHandle
293 where
294 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
295 H: EventHandler<Ev, Ctx>,
296 {
297 let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
298 let maybe_fut = serde_json::from_str(data.raw.get())
299 .map(|ev| handler.clone().handle_event(ev, data));
300
301 Box::pin(async move {
302 match maybe_fut {
303 Ok(Some(fut)) => {
304 fut.await.print_error(Ev::TYPE);
305 }
306 Ok(None) => {
307 error!(
308 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
309 "Event handler has an invalid context argument",
310 );
311 }
312 Err(e) => {
313 warn!(
314 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
315 "Failed to deserialize event, skipping event handler.\n
316 Deserialization error: {e}",
317 );
318 }
319 }
320 })
321 });
322
323 let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
324 let handle =
325 EventHandlerHandle { ev_kind: Ev::KIND, ev_type: Ev::TYPE, room_id, handler_id };
326
327 self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
328
329 handle
330 }
331
332 pub(crate) async fn handle_sync_events<T>(
333 &self,
334 kind: HandlerKind,
335 room: Option<&Room>,
336 events: &[Raw<T>],
337 ) -> serde_json::Result<()> {
338 #[derive(Deserialize)]
339 struct ExtractType<'a> {
340 #[serde(borrow, rename = "type")]
341 event_type: Cow<'a, str>,
342 }
343
344 for raw_event in events {
345 let event_type = raw_event.deserialize_as::<ExtractType<'_>>()?.event_type;
346 self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
347 }
348
349 Ok(())
350 }
351
352 pub(crate) async fn handle_sync_state_events(
353 &self,
354 room: Option<&Room>,
355 state_events: &[Raw<AnySyncStateEvent>],
356 ) -> serde_json::Result<()> {
357 #[derive(Deserialize)]
358 struct StateEventDetails<'a> {
359 #[serde(borrow, rename = "type")]
360 event_type: Cow<'a, str>,
361 unsigned: Option<UnsignedDetails>,
362 }
363
364 self.handle_sync_events(HandlerKind::State, room, state_events).await?;
366
367 for raw_event in state_events {
369 let StateEventDetails { event_type, unsigned } = raw_event.deserialize_as()?;
370 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
371 let handler_kind = HandlerKind::state_redacted(redacted);
372
373 self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
374 .await;
375 }
376
377 Ok(())
378 }
379
380 pub(crate) async fn handle_sync_timeline_events(
381 &self,
382 room: Option<&Room>,
383 timeline_events: &[TimelineEvent],
384 ) -> serde_json::Result<()> {
385 #[derive(Deserialize)]
386 struct TimelineEventDetails<'a> {
387 #[serde(borrow, rename = "type")]
388 event_type: Cow<'a, str>,
389 state_key: Option<serde::de::IgnoredAny>,
390 unsigned: Option<UnsignedDetails>,
391 }
392
393 for item in timeline_events {
394 let TimelineEventDetails { event_type, state_key, unsigned } =
395 item.raw().deserialize_as()?;
396
397 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
398 let (handler_kind_g, handler_kind_r) = match state_key {
399 Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
400 None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
401 };
402
403 let raw_event = item.raw().json();
404 let encryption_info = item.encryption_info();
405 let push_actions = item.push_actions.as_deref().unwrap_or(&[]);
406
407 self.call_event_handlers(
409 room,
410 raw_event,
411 handler_kind_g,
412 &event_type,
413 encryption_info,
414 push_actions,
415 )
416 .await;
417
418 self.call_event_handlers(
420 room,
421 raw_event,
422 handler_kind_r,
423 &event_type,
424 encryption_info,
425 push_actions,
426 )
427 .await;
428
429 let kind = HandlerKind::Timeline;
431 self.call_event_handlers(
432 room,
433 raw_event,
434 kind,
435 &event_type,
436 encryption_info,
437 push_actions,
438 )
439 .await;
440 }
441
442 Ok(())
443 }
444
445 #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
446 async fn call_event_handlers(
447 &self,
448 room: Option<&Room>,
449 raw: &RawJsonValue,
450 event_kind: HandlerKind,
451 event_type: &str,
452 encryption_info: Option<&EncryptionInfo>,
453 push_actions: &[Action],
454 ) {
455 let room_id = room.map(|r| r.room_id());
456 if let Some(room_id) = room_id {
457 tracing::Span::current().record("room_id", debug(room_id));
458 }
459
460 let mut futures: FuturesUnordered<_> = self
462 .inner
463 .event_handlers
464 .handlers
465 .read()
466 .unwrap()
467 .get_handlers(event_kind, event_type, room_id)
468 .map(|(handle, handler_fn)| {
469 let data = EventHandlerData {
470 client: self.clone(),
471 room: room.cloned(),
472 raw,
473 encryption_info,
474 push_actions,
475 handle,
476 };
477
478 (handler_fn)(data)
479 })
480 .collect();
481
482 if !futures.is_empty() {
483 debug!(amount = futures.len(), "Calling event handlers");
484
485 while let Some(()) = futures.next().await {}
488 }
489 }
490}
491
492#[derive(Debug)]
497pub struct EventHandlerDropGuard {
498 handle: EventHandlerHandle,
499 client: Client,
500}
501
502impl EventHandlerDropGuard {
503 pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
504 Self { handle, client }
505 }
506}
507
508impl Drop for EventHandlerDropGuard {
509 fn drop(&mut self) {
510 self.client.remove_event_handler(self.handle.clone());
511 }
512}
513
514macro_rules! impl_event_handler {
515 ($($ty:ident),* $(,)?) => {
516 impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
517 where
518 Ev: SyncEvent,
519 Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
520 Fut: EventHandlerFuture,
521 $($ty: EventHandlerContext),*
522 {
523 type Future = Fut;
524
525 fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
526 Some((self)(ev, $($ty::from_data(&_d)?),*))
527 }
528 }
529 };
530}
531
532impl_event_handler!();
533impl_event_handler!(A);
534impl_event_handler!(A, B);
535impl_event_handler!(A, B, C);
536impl_event_handler!(A, B, C, D);
537impl_event_handler!(A, B, C, D, E);
538impl_event_handler!(A, B, C, D, E, F);
539impl_event_handler!(A, B, C, D, E, F, G);
540impl_event_handler!(A, B, C, D, E, F, G, H);
541
542#[derive(Debug)]
550pub struct ObservableEventHandler<T> {
551 shared_observable: SharedObservable<Option<T>>,
556
557 event_handler_guard: Arc<EventHandlerDropGuard>,
564}
565
566impl<T> ObservableEventHandler<T> {
567 pub(crate) fn new(
568 shared_observable: SharedObservable<Option<T>>,
569 event_handler_guard: EventHandlerDropGuard,
570 ) -> Self {
571 Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
572 }
573
574 pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
579 EventHandlerSubscriber::new(
580 self.shared_observable.subscribe(),
581 Arc::downgrade(&self.event_handler_guard),
584 )
585 }
586}
587
588pin_project! {
589 #[derive(Debug)]
598 pub struct EventHandlerSubscriber<T> {
599 #[pin]
605 subscriber: Subscriber<Option<T>>,
606
607 event_handler_guard: Weak<EventHandlerDropGuard>,
613 }
614}
615
616impl<T> EventHandlerSubscriber<T> {
617 fn new(
618 subscriber: Subscriber<Option<T>>,
619 event_handler_handle: Weak<EventHandlerDropGuard>,
620 ) -> Self {
621 Self { subscriber, event_handler_guard: event_handler_handle }
622 }
623}
624
625impl<T> Stream for EventHandlerSubscriber<T>
626where
627 T: Clone,
628{
629 type Item = T;
630
631 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
632 let mut this = self.project();
633
634 let Some(_) = this.event_handler_guard.upgrade() else {
635 return Poll::Ready(None);
639 };
640
641 loop {
651 match this.subscriber.as_mut().poll_next(context) {
652 Poll::Ready(None) => return Poll::Ready(None),
654
655 Poll::Ready(Some(None)) => {
658 continue;
660 }
661
662 Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
664
665 Poll::Pending => return Poll::Pending,
667 }
668 }
669 }
670}
671
672#[cfg(test)]
673mod tests {
674 use matrix_sdk_test::{
675 async_test,
676 event_factory::{EventFactory, PreviousMembership},
677 InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID,
678 };
679 use stream_assert::{assert_closed, assert_pending, assert_ready};
680 #[cfg(target_arch = "wasm32")]
681 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
682 use std::{
683 future,
684 sync::{
685 atomic::{AtomicU8, Ordering::SeqCst},
686 Arc,
687 },
688 };
689
690 use matrix_sdk_test::{
691 EphemeralTestEvent, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder,
692 };
693 use once_cell::sync::Lazy;
694 use ruma::{
695 event_id,
696 events::{
697 room::{
698 member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
699 name::OriginalSyncRoomNameEvent,
700 power_levels::OriginalSyncRoomPowerLevelsEvent,
701 },
702 typing::SyncTypingEvent,
703 AnySyncStateEvent, AnySyncTimelineEvent,
704 },
705 room_id,
706 serde::Raw,
707 user_id,
708 };
709 use serde_json::json;
710
711 use crate::{
712 event_handler::Ctx,
713 test_utils::{logged_in_client, no_retry_test_client},
714 Client, Room,
715 };
716
717 static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
718 EventFactory::new()
719 .member(user_id!("@example:localhost"))
720 .membership(MembershipState::Join)
721 .display_name("example")
722 .event_id(event_id!("$151800140517rfvjc:localhost"))
723 .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
724 .into()
725 });
726
727 #[async_test]
728 async fn test_add_event_handler() -> crate::Result<()> {
729 let client = logged_in_client(None).await;
730
731 let member_count = Arc::new(AtomicU8::new(0));
732 let typing_count = Arc::new(AtomicU8::new(0));
733 let power_levels_count = Arc::new(AtomicU8::new(0));
734 let invited_member_count = Arc::new(AtomicU8::new(0));
735
736 client.add_event_handler({
737 let member_count = member_count.clone();
738 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
739 member_count.fetch_add(1, SeqCst);
740 }
741 });
742 client.add_event_handler({
743 let typing_count = typing_count.clone();
744 move |_ev: SyncTypingEvent| async move {
745 typing_count.fetch_add(1, SeqCst);
746 }
747 });
748 client.add_event_handler({
749 let power_levels_count = power_levels_count.clone();
750 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
751 power_levels_count.fetch_add(1, SeqCst);
752 }
753 });
754 client.add_event_handler({
755 let invited_member_count = invited_member_count.clone();
756 move |_ev: StrippedRoomMemberEvent| async move {
757 invited_member_count.fetch_add(1, SeqCst);
758 }
759 });
760
761 let response = SyncResponseBuilder::default()
762 .add_joined_room(
763 JoinedRoomBuilder::default()
764 .add_timeline_event(MEMBER_EVENT.clone())
765 .add_ephemeral_event(EphemeralTestEvent::Typing)
766 .add_state_event(StateTestEvent::PowerLevels),
767 )
768 .add_invited_room(
769 InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
770 StrippedStateTestEvent::Custom(json!({
771 "content": {
772 "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
773 "displayname": "Alice",
774 "membership": "invite",
775 },
776 "event_id": "$143273582443PhrSn:example.org",
777 "origin_server_ts": 1432735824653u64,
778 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
779 "sender": "@example:example.org",
780 "state_key": "@alice:example.org",
781 "type": "m.room.member",
782 "unsigned": {
783 "age": 1234,
784 "invite_room_state": [
785 {
786 "content": {
787 "name": "Example Room"
788 },
789 "sender": "@bob:example.org",
790 "state_key": "",
791 "type": "m.room.name"
792 },
793 {
794 "content": {
795 "join_rule": "invite"
796 },
797 "sender": "@bob:example.org",
798 "state_key": "",
799 "type": "m.room.join_rules"
800 }
801 ]
802 }
803 })),
804 ),
805 )
806 .build_sync_response();
807 client.process_sync(response).await?;
808
809 assert_eq!(member_count.load(SeqCst), 1);
810 assert_eq!(typing_count.load(SeqCst), 1);
811 assert_eq!(power_levels_count.load(SeqCst), 1);
812 assert_eq!(invited_member_count.load(SeqCst), 1);
813
814 Ok(())
815 }
816
817 #[async_test]
818 #[allow(dependency_on_unit_never_type_fallback)]
819 async fn test_add_room_event_handler() -> crate::Result<()> {
820 let client = logged_in_client(None).await;
821
822 let room_id_a = room_id!("!foo:example.org");
823 let room_id_b = room_id!("!bar:matrix.org");
824
825 let member_count = Arc::new(AtomicU8::new(0));
826 let power_levels_count = Arc::new(AtomicU8::new(0));
827
828 client.add_room_event_handler(room_id_a, {
830 let member_count = member_count.clone();
831 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
832 member_count.fetch_add(1, SeqCst);
833 future::ready(())
834 }
835 });
836 client.add_room_event_handler(room_id_b, {
837 let member_count = member_count.clone();
838 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
839 member_count.fetch_add(1, SeqCst);
840 future::ready(())
841 }
842 });
843
844 client.add_room_event_handler(room_id_a, {
846 let power_levels_count = power_levels_count.clone();
847 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
848 power_levels_count.fetch_add(1, SeqCst);
849 future::ready(())
850 }
851 });
852
853 client.add_room_event_handler(room_id_b, move |_ev: OriginalSyncRoomNameEvent| async {
855 unreachable!("No room event in room B")
856 });
857
858 let response = SyncResponseBuilder::default()
859 .add_joined_room(
860 JoinedRoomBuilder::new(room_id_a)
861 .add_timeline_event(MEMBER_EVENT.clone())
862 .add_state_event(StateTestEvent::PowerLevels)
863 .add_state_event(StateTestEvent::RoomName),
864 )
865 .add_joined_room(
866 JoinedRoomBuilder::new(room_id_b)
867 .add_timeline_event(MEMBER_EVENT.clone())
868 .add_state_event(StateTestEvent::PowerLevels),
869 )
870 .build_sync_response();
871 client.process_sync(response).await?;
872
873 assert_eq!(member_count.load(SeqCst), 2);
874 assert_eq!(power_levels_count.load(SeqCst), 1);
875
876 Ok(())
877 }
878
879 #[async_test]
880 #[allow(dependency_on_unit_never_type_fallback)]
881 async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
882 let client = logged_in_client(None).await;
883
884 client.add_event_handler(
885 |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
886 );
887
888 Ok(())
891 }
892
893 #[async_test]
894 #[allow(dependency_on_unit_never_type_fallback)]
895 async fn test_remove_event_handler() -> crate::Result<()> {
896 let client = logged_in_client(None).await;
897
898 let member_count = Arc::new(AtomicU8::new(0));
899
900 client.add_event_handler({
901 let member_count = member_count.clone();
902 move |_ev: OriginalSyncRoomMemberEvent| async move {
903 member_count.fetch_add(1, SeqCst);
904 }
905 });
906
907 let handle_a = client.add_event_handler(move |_ev: OriginalSyncRoomMemberEvent| async {
908 panic!("handler should have been removed");
909 });
910 let handle_b = client.add_room_event_handler(
911 #[allow(unknown_lints, clippy::explicit_auto_deref)] *DEFAULT_TEST_ROOM_ID,
913 move |_ev: OriginalSyncRoomMemberEvent| async {
914 panic!("handler should have been removed");
915 },
916 );
917
918 client.add_event_handler({
919 let member_count = member_count.clone();
920 move |_ev: OriginalSyncRoomMemberEvent| async move {
921 member_count.fetch_add(1, SeqCst);
922 }
923 });
924
925 let response = SyncResponseBuilder::default()
926 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
927 .build_sync_response();
928
929 client.remove_event_handler(handle_a);
930 client.remove_event_handler(handle_b);
931
932 client.process_sync(response).await?;
933
934 assert_eq!(member_count.load(SeqCst), 2);
935
936 Ok(())
937 }
938
939 #[async_test]
940 async fn test_event_handler_drop_guard() {
941 let client = no_retry_test_client(None).await;
942
943 let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
944 assert_eq!(client.inner.event_handlers.len(), 1);
945
946 {
947 let _guard = client.event_handler_drop_guard(handle);
948 assert_eq!(client.inner.event_handlers.len(), 1);
949 }
951
952 assert_eq!(client.inner.event_handlers.len(), 0);
953 }
954
955 #[async_test]
956 async fn test_use_client_in_handler() {
957 let client = no_retry_test_client(None).await;
961
962 client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
963 let _caps = client.get_capabilities().await?;
967 anyhow::Ok(())
968 });
969 }
970
971 #[async_test]
972 async fn test_raw_event_handler() -> crate::Result<()> {
973 let client = logged_in_client(None).await;
974 let counter = Arc::new(AtomicU8::new(0));
975 client.add_event_handler_context(counter.clone());
976 client.add_event_handler(
977 |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
978 counter.fetch_add(1, SeqCst);
979 },
980 );
981
982 let response = SyncResponseBuilder::default()
983 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
984 .build_sync_response();
985 client.process_sync(response).await?;
986
987 assert_eq!(counter.load(SeqCst), 1);
988 Ok(())
989 }
990
991 #[async_test]
992 async fn test_enum_event_handler() -> crate::Result<()> {
993 let client = logged_in_client(None).await;
994 let counter = Arc::new(AtomicU8::new(0));
995 client.add_event_handler_context(counter.clone());
996 client.add_event_handler(
997 |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
998 counter.fetch_add(1, SeqCst);
999 },
1000 );
1001
1002 let response = SyncResponseBuilder::default()
1003 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1004 .build_sync_response();
1005 client.process_sync(response).await?;
1006
1007 assert_eq!(counter.load(SeqCst), 1);
1008 Ok(())
1009 }
1010
1011 #[async_test]
1012 #[allow(dependency_on_unit_never_type_fallback)]
1013 async fn test_observe_events() -> crate::Result<()> {
1014 let client = logged_in_client(None).await;
1015
1016 let room_id_0 = room_id!("!r0.matrix.org");
1017 let room_id_1 = room_id!("!r1.matrix.org");
1018
1019 let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1020
1021 let mut subscriber = observable.subscribe();
1022
1023 assert_pending!(subscriber);
1024
1025 let mut response_builder = SyncResponseBuilder::new();
1026 let response = response_builder
1027 .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1028 StateTestEvent::Custom(json!({
1029 "content": {
1030 "name": "Name 0"
1031 },
1032 "event_id": "$ev0",
1033 "origin_server_ts": 1,
1034 "sender": "@mnt_io:matrix.org",
1035 "state_key": "",
1036 "type": "m.room.name",
1037 "unsigned": {
1038 "age": 1,
1039 }
1040 })),
1041 ))
1042 .build_sync_response();
1043 client.process_sync(response).await?;
1044
1045 let (room_name, room) = assert_ready!(subscriber);
1046
1047 assert_eq!(room_name.event_id.as_str(), "$ev0");
1048 assert_eq!(room.room_id(), room_id_0);
1049 assert_eq!(room.name().unwrap(), "Name 0");
1050
1051 assert_pending!(subscriber);
1052
1053 let response = response_builder
1054 .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1055 StateTestEvent::Custom(json!({
1056 "content": {
1057 "name": "Name 1"
1058 },
1059 "event_id": "$ev1",
1060 "origin_server_ts": 2,
1061 "sender": "@mnt_io:matrix.org",
1062 "state_key": "",
1063 "type": "m.room.name",
1064 "unsigned": {
1065 "age": 2,
1066 }
1067 })),
1068 ))
1069 .build_sync_response();
1070 client.process_sync(response).await?;
1071
1072 let (room_name, room) = assert_ready!(subscriber);
1073
1074 assert_eq!(room_name.event_id.as_str(), "$ev1");
1075 assert_eq!(room.room_id(), room_id_1);
1076 assert_eq!(room.name().unwrap(), "Name 1");
1077
1078 assert_pending!(subscriber);
1079
1080 drop(observable);
1081 assert_closed!(subscriber);
1082
1083 Ok(())
1084 }
1085
1086 #[async_test]
1087 #[allow(dependency_on_unit_never_type_fallback)]
1088 async fn test_observe_room_events() -> crate::Result<()> {
1089 let client = logged_in_client(None).await;
1090
1091 let room_id = room_id!("!r0.matrix.org");
1092
1093 let observable_for_room =
1094 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1095
1096 let mut subscriber_for_room = observable_for_room.subscribe();
1097
1098 assert_pending!(subscriber_for_room);
1099
1100 let mut response_builder = SyncResponseBuilder::new();
1101 let response = response_builder
1102 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1103 StateTestEvent::Custom(json!({
1104 "content": {
1105 "name": "Name 0"
1106 },
1107 "event_id": "$ev0",
1108 "origin_server_ts": 1,
1109 "sender": "@mnt_io:matrix.org",
1110 "state_key": "",
1111 "type": "m.room.name",
1112 "unsigned": {
1113 "age": 1,
1114 }
1115 })),
1116 ))
1117 .build_sync_response();
1118 client.process_sync(response).await?;
1119
1120 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1121
1122 assert_eq!(room_name.event_id.as_str(), "$ev0");
1123 assert_eq!(room.name().unwrap(), "Name 0");
1124
1125 assert_pending!(subscriber_for_room);
1126
1127 let response = response_builder
1128 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1129 StateTestEvent::Custom(json!({
1130 "content": {
1131 "name": "Name 1"
1132 },
1133 "event_id": "$ev1",
1134 "origin_server_ts": 2,
1135 "sender": "@mnt_io:matrix.org",
1136 "state_key": "",
1137 "type": "m.room.name",
1138 "unsigned": {
1139 "age": 2,
1140 }
1141 })),
1142 ))
1143 .build_sync_response();
1144 client.process_sync(response).await?;
1145
1146 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1147
1148 assert_eq!(room_name.event_id.as_str(), "$ev1");
1149 assert_eq!(room.name().unwrap(), "Name 1");
1150
1151 assert_pending!(subscriber_for_room);
1152
1153 drop(observable_for_room);
1154 assert_closed!(subscriber_for_room);
1155
1156 Ok(())
1157 }
1158
1159 #[async_test]
1160 async fn test_observe_several_room_events() -> crate::Result<()> {
1161 let client = logged_in_client(None).await;
1162
1163 let room_id = room_id!("!r0.matrix.org");
1164
1165 let observable_for_room =
1166 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1167
1168 let mut subscriber_for_room = observable_for_room.subscribe();
1169
1170 assert_pending!(subscriber_for_room);
1171
1172 let mut response_builder = SyncResponseBuilder::new();
1173 let response = response_builder
1174 .add_joined_room(
1175 JoinedRoomBuilder::new(room_id)
1176 .add_state_event(StateTestEvent::Custom(json!({
1177 "content": {
1178 "name": "Name 0"
1179 },
1180 "event_id": "$ev0",
1181 "origin_server_ts": 1,
1182 "sender": "@mnt_io:matrix.org",
1183 "state_key": "",
1184 "type": "m.room.name",
1185 "unsigned": {
1186 "age": 1,
1187 }
1188 })))
1189 .add_state_event(StateTestEvent::Custom(json!({
1190 "content": {
1191 "name": "Name 1"
1192 },
1193 "event_id": "$ev1",
1194 "origin_server_ts": 2,
1195 "sender": "@mnt_io:matrix.org",
1196 "state_key": "",
1197 "type": "m.room.name",
1198 "unsigned": {
1199 "age": 1,
1200 }
1201 })))
1202 .add_state_event(StateTestEvent::Custom(json!({
1203 "content": {
1204 "name": "Name 2"
1205 },
1206 "event_id": "$ev2",
1207 "origin_server_ts": 3,
1208 "sender": "@mnt_io:matrix.org",
1209 "state_key": "",
1210 "type": "m.room.name",
1211 "unsigned": {
1212 "age": 1,
1213 }
1214 }))),
1215 )
1216 .build_sync_response();
1217 client.process_sync(response).await?;
1218
1219 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1220
1221 assert_eq!(room_name.event_id.as_str(), "$ev2");
1223 assert_eq!(room.name().unwrap(), "Name 2");
1224
1225 assert_pending!(subscriber_for_room);
1226
1227 drop(observable_for_room);
1228 assert_closed!(subscriber_for_room);
1229
1230 Ok(())
1231 }
1232}