1#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37 borrow::Cow,
38 fmt,
39 future::Future,
40 pin::Pin,
41 sync::{
42 Arc, RwLock, Weak,
43 atomic::{AtomicU64, Ordering::SeqCst},
44 },
45 task::{Context, Poll},
46};
47
48#[cfg(target_family = "wasm")]
49use anymap2::any::CloneAny;
50#[cfg(not(target_family = "wasm"))]
51use anymap2::any::CloneAnySendSync;
52use eyeball::{SharedObservable, Subscriber};
53use futures_core::Stream;
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use matrix_sdk_base::{
56 SendOutsideWasm, SyncOutsideWasm,
57 deserialized_responses::{EncryptionInfo, TimelineEvent},
58 sync::State,
59};
60use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
61use pin_project_lite::pin_project;
62use ruma::{OwnedRoomId, events::BooleanType, push::Action, serde::Raw};
63use serde::{Deserialize, de::DeserializeOwned};
64use serde_json::value::RawValue as RawJsonValue;
65use tracing::{debug, error, field::debug, instrument, warn};
66
67use self::maps::EventHandlerMaps;
68use crate::{Client, Room};
69
70mod context;
71mod maps;
72mod static_events;
73
74pub use self::context::{Ctx, EventHandlerContext, RawEvent};
75
76#[cfg(not(target_family = "wasm"))]
77type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
78#[cfg(target_family = "wasm")]
79type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
80
81#[cfg(not(target_family = "wasm"))]
82type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
83#[cfg(target_family = "wasm")]
84type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
85
86#[cfg(not(target_family = "wasm"))]
87type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
88#[cfg(target_family = "wasm")]
89type AnyMap = anymap2::Map<dyn CloneAny>;
90
91#[derive(Default)]
92pub(crate) struct EventHandlerStore {
93 handlers: RwLock<EventHandlerMaps>,
94 context: RwLock<AnyMap>,
95 counter: AtomicU64,
96}
97
98impl EventHandlerStore {
99 pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
100 self.handlers.write().unwrap().add(handle, handler_fn);
101 }
102
103 pub fn add_context<T>(&self, ctx: T)
104 where
105 T: Clone + Send + Sync + 'static,
106 {
107 self.context.write().unwrap().insert(ctx);
108 }
109
110 pub fn remove(&self, handle: EventHandlerHandle) {
111 self.handlers.write().unwrap().remove(handle);
112 }
113
114 #[cfg(test)]
115 fn len(&self) -> usize {
116 self.handlers.read().unwrap().len()
117 }
118}
119
120#[doc(hidden)]
121#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
122pub enum HandlerKind {
123 GlobalAccountData,
124 RoomAccountData,
125 EphemeralRoomData,
126 Timeline,
127 MessageLike,
128 OriginalMessageLike,
129 RedactedMessageLike,
130 State,
131 OriginalState,
132 RedactedState,
133 StrippedState,
134 ToDevice,
135 Presence,
136}
137
138impl HandlerKind {
139 fn message_like_redacted(redacted: bool) -> Self {
140 if redacted { Self::RedactedMessageLike } else { Self::OriginalMessageLike }
141 }
142
143 fn state_redacted(redacted: bool) -> Self {
144 if redacted { Self::RedactedState } else { Self::OriginalState }
145 }
146}
147
148pub trait SyncEvent {
150 #[doc(hidden)]
151 const KIND: HandlerKind;
152 #[doc(hidden)]
153 const TYPE: Option<&'static str>;
154 #[doc(hidden)]
155 type IsPrefix: BooleanType;
156}
157
158pub(crate) struct EventHandlerWrapper {
159 handler_fn: Box<EventHandlerFn>,
160 pub handler_id: u64,
161}
162
163#[derive(Clone, Debug)]
166pub struct EventHandlerHandle {
167 pub(crate) ev_kind: HandlerKind,
168 pub(crate) ev_type: Option<StaticEventTypePart>,
169 pub(crate) room_id: Option<OwnedRoomId>,
170 pub(crate) handler_id: u64,
171}
172
173#[derive(Clone, Copy, Debug)]
175pub(crate) enum StaticEventTypePart {
176 Full(&'static str),
178 Prefix(&'static str),
180}
181
182pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
216 #[doc(hidden)]
218 type Future: EventHandlerFuture;
219
220 #[doc(hidden)]
227 fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
228}
229
230#[doc(hidden)]
231pub trait EventHandlerFuture:
232 Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
233{
234 type Output: EventHandlerResult;
235}
236
237impl<T> EventHandlerFuture for T
238where
239 T: Future + SendOutsideWasm + 'static,
240 <T as Future>::Output: EventHandlerResult,
241{
242 type Output = <T as Future>::Output;
243}
244
245#[doc(hidden)]
246#[derive(Debug)]
247pub struct EventHandlerData<'a> {
248 client: Client,
249 room: Option<Room>,
250 raw: &'a RawJsonValue,
251 encryption_info: Option<&'a EncryptionInfo>,
252 push_actions: &'a [Action],
253 handle: EventHandlerHandle,
254}
255
256pub trait EventHandlerResult: Sized {
260 #[doc(hidden)]
261 fn print_error(&self, event_type: Option<&str>);
262}
263
264impl EventHandlerResult for () {
265 fn print_error(&self, _event_type: Option<&str>) {}
266}
267
268impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
269 fn print_error(&self, event_type: Option<&str>) {
270 let msg_fragment = match event_type {
271 Some(event_type) => format!(" for `{event_type}`"),
272 None => "".to_owned(),
273 };
274
275 match self {
276 #[cfg(feature = "anyhow")]
277 Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
278 error!("Event handler{msg_fragment} failed: {e:?}");
279 }
280 #[cfg(feature = "eyre")]
281 Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
282 error!("Event handler{msg_fragment} failed: {e:?}");
283 }
284 Err(e) => {
285 error!("Event handler{msg_fragment} failed: {e}");
286 }
287 Ok(_) => {}
288 }
289 }
290}
291
292#[derive(Deserialize)]
293struct UnsignedDetails {
294 redacted_because: Option<serde::de::IgnoredAny>,
295}
296
297impl Client {
299 pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
300 &self,
301 handler: H,
302 room_id: Option<OwnedRoomId>,
303 ) -> EventHandlerHandle
304 where
305 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
306 H: EventHandler<Ev, Ctx>,
307 {
308 let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
309 let maybe_fut = serde_json::from_str(data.raw.get())
310 .map(|ev| handler.clone().handle_event(ev, data));
311
312 Box::pin(async move {
313 match maybe_fut {
314 Ok(Some(fut)) => {
315 fut.await.print_error(Ev::TYPE);
316 }
317 Ok(None) => {
318 error!(
319 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
320 "Event handler has an invalid context argument",
321 );
322 }
323 Err(e) => {
324 warn!(
325 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
326 "Failed to deserialize event, skipping event handler.\n
327 Deserialization error: {e}",
328 );
329 }
330 }
331 })
332 });
333
334 let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
335 let ev_type = Ev::TYPE.map(|ev_type| {
336 if Ev::IsPrefix::as_bool() {
337 StaticEventTypePart::Prefix(ev_type)
338 } else {
339 StaticEventTypePart::Full(ev_type)
340 }
341 });
342 let handle = EventHandlerHandle { ev_kind: Ev::KIND, ev_type, room_id, handler_id };
343
344 self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
345
346 handle
347 }
348
349 pub(crate) async fn handle_sync_events<T>(
350 &self,
351 kind: HandlerKind,
352 room: Option<&Room>,
353 events: &[Raw<T>],
354 ) -> serde_json::Result<()> {
355 #[derive(Deserialize)]
356 struct ExtractType<'a> {
357 #[serde(borrow, rename = "type")]
358 event_type: Cow<'a, str>,
359 }
360
361 for raw_event in events {
362 let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
363 self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
364 }
365
366 Ok(())
367 }
368
369 pub(crate) async fn handle_sync_to_device_events(
370 &self,
371 events: &[ProcessedToDeviceEvent],
372 ) -> serde_json::Result<()> {
373 #[derive(Deserialize)]
374 struct ExtractType<'a> {
375 #[serde(borrow, rename = "type")]
376 event_type: Cow<'a, str>,
377 }
378
379 for processed_to_device in events {
380 let (raw_event, encryption_info) = match processed_to_device {
381 ProcessedToDeviceEvent::Decrypted { raw, encryption_info } => {
382 (raw, Some(encryption_info))
383 }
384 other => (&other.to_raw(), None),
385 };
386 let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
387 self.call_event_handlers(
388 None,
389 raw_event.json(),
390 HandlerKind::ToDevice,
391 &event_type,
392 encryption_info,
393 &[],
394 )
395 .await;
396 }
397
398 Ok(())
399 }
400
401 pub(crate) async fn handle_sync_state_events(
402 &self,
403 room: Option<&Room>,
404 state: &State,
405 ) -> serde_json::Result<()> {
406 #[derive(Deserialize)]
407 struct StateEventDetails<'a> {
408 #[serde(borrow, rename = "type")]
409 event_type: Cow<'a, str>,
410 unsigned: Option<UnsignedDetails>,
411 }
412
413 let state_events = match state {
414 State::Before(events) => events,
415 State::After(events) => events,
416 };
417
418 self.handle_sync_events(HandlerKind::State, room, state_events).await?;
420
421 for raw_event in state_events {
423 let StateEventDetails { event_type, unsigned } =
424 raw_event.deserialize_as_unchecked()?;
425 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
426 let handler_kind = HandlerKind::state_redacted(redacted);
427
428 self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
429 .await;
430 }
431
432 Ok(())
433 }
434
435 pub(crate) async fn handle_sync_timeline_events(
436 &self,
437 room: Option<&Room>,
438 timeline_events: &[TimelineEvent],
439 ) -> serde_json::Result<()> {
440 #[derive(Deserialize)]
441 struct TimelineEventDetails<'a> {
442 #[serde(borrow, rename = "type")]
443 event_type: Cow<'a, str>,
444 state_key: Option<serde::de::IgnoredAny>,
445 unsigned: Option<UnsignedDetails>,
446 }
447
448 for item in timeline_events {
449 let TimelineEventDetails { event_type, state_key, unsigned } =
450 item.raw().deserialize_as_unchecked()?;
451
452 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
453 let (handler_kind_g, handler_kind_r) = match state_key {
454 Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
455 None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
456 };
457
458 let raw_event = item.raw().json();
459 let encryption_info = item.encryption_info().map(|i| &**i);
460 let push_actions = item.push_actions().unwrap_or(&[]);
461
462 self.call_event_handlers(
464 room,
465 raw_event,
466 handler_kind_g,
467 &event_type,
468 encryption_info,
469 push_actions,
470 )
471 .await;
472
473 self.call_event_handlers(
475 room,
476 raw_event,
477 handler_kind_r,
478 &event_type,
479 encryption_info,
480 push_actions,
481 )
482 .await;
483
484 let kind = HandlerKind::Timeline;
486 self.call_event_handlers(
487 room,
488 raw_event,
489 kind,
490 &event_type,
491 encryption_info,
492 push_actions,
493 )
494 .await;
495 }
496
497 Ok(())
498 }
499
500 #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
501 async fn call_event_handlers(
502 &self,
503 room: Option<&Room>,
504 raw: &RawJsonValue,
505 event_kind: HandlerKind,
506 event_type: &str,
507 encryption_info: Option<&EncryptionInfo>,
508 push_actions: &[Action],
509 ) {
510 let room_id = room.map(|r| r.room_id());
511 if let Some(room_id) = room_id {
512 tracing::Span::current().record("room_id", debug(room_id));
513 }
514
515 let mut futures: FuturesUnordered<_> = self
517 .inner
518 .event_handlers
519 .handlers
520 .read()
521 .unwrap()
522 .get_handlers(event_kind, event_type, room_id)
523 .map(|(handle, handler_fn)| {
524 let data = EventHandlerData {
525 client: self.clone(),
526 room: room.cloned(),
527 raw,
528 encryption_info,
529 push_actions,
530 handle,
531 };
532
533 (handler_fn)(data)
534 })
535 .collect();
536
537 if !futures.is_empty() {
538 debug!(amount = futures.len(), "Calling event handlers");
539
540 while let Some(()) = futures.next().await {}
543 }
544 }
545}
546
547#[derive(Debug)]
552pub struct EventHandlerDropGuard {
553 handle: EventHandlerHandle,
554 client: Client,
555}
556
557impl EventHandlerDropGuard {
558 pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
559 Self { handle, client }
560 }
561}
562
563impl Drop for EventHandlerDropGuard {
564 fn drop(&mut self) {
565 self.client.remove_event_handler(self.handle.clone());
566 }
567}
568
569macro_rules! impl_event_handler {
570 ($($ty:ident),* $(,)?) => {
571 impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
572 where
573 Ev: SyncEvent,
574 Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
575 Fut: EventHandlerFuture,
576 $($ty: EventHandlerContext),*
577 {
578 type Future = Fut;
579
580 fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
581 Some((self)(ev, $($ty::from_data(&_d)?),*))
582 }
583 }
584 };
585}
586
587impl_event_handler!();
588impl_event_handler!(A);
589impl_event_handler!(A, B);
590impl_event_handler!(A, B, C);
591impl_event_handler!(A, B, C, D);
592impl_event_handler!(A, B, C, D, E);
593impl_event_handler!(A, B, C, D, E, F);
594impl_event_handler!(A, B, C, D, E, F, G);
595impl_event_handler!(A, B, C, D, E, F, G, H);
596
597#[derive(Debug)]
605pub struct ObservableEventHandler<T> {
606 shared_observable: SharedObservable<Option<T>>,
611
612 event_handler_guard: Arc<EventHandlerDropGuard>,
619}
620
621impl<T> ObservableEventHandler<T> {
622 pub(crate) fn new(
623 shared_observable: SharedObservable<Option<T>>,
624 event_handler_guard: EventHandlerDropGuard,
625 ) -> Self {
626 Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
627 }
628
629 pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
634 EventHandlerSubscriber::new(
635 self.shared_observable.subscribe(),
636 Arc::downgrade(&self.event_handler_guard),
639 )
640 }
641}
642
643pin_project! {
644 #[derive(Debug)]
653 pub struct EventHandlerSubscriber<T> {
654 #[pin]
660 subscriber: Subscriber<Option<T>>,
661
662 event_handler_guard: Weak<EventHandlerDropGuard>,
668 }
669}
670
671impl<T> EventHandlerSubscriber<T> {
672 fn new(
673 subscriber: Subscriber<Option<T>>,
674 event_handler_handle: Weak<EventHandlerDropGuard>,
675 ) -> Self {
676 Self { subscriber, event_handler_guard: event_handler_handle }
677 }
678}
679
680impl<T> Stream for EventHandlerSubscriber<T>
681where
682 T: Clone,
683{
684 type Item = T;
685
686 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
687 let mut this = self.project();
688
689 let Some(_) = this.event_handler_guard.upgrade() else {
690 return Poll::Ready(None);
694 };
695
696 loop {
706 match this.subscriber.as_mut().poll_next(context) {
707 Poll::Ready(None) => return Poll::Ready(None),
709
710 Poll::Ready(Some(None)) => {
713 continue;
715 }
716
717 Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
719
720 Poll::Pending => return Poll::Pending,
722 }
723 }
724 }
725}
726
727#[cfg(test)]
728mod tests {
729 use matrix_sdk_test::{
730 DEFAULT_TEST_ROOM_ID, InvitedRoomBuilder, JoinedRoomBuilder, async_test,
731 event_factory::{EventFactory, PreviousMembership},
732 };
733 use serde::Serialize;
734 use stream_assert::{assert_closed, assert_pending, assert_ready};
735 #[cfg(target_family = "wasm")]
736 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
737 use std::{
738 future,
739 sync::{
740 Arc, LazyLock,
741 atomic::{AtomicU8, Ordering::SeqCst},
742 },
743 };
744
745 use assert_matches2::assert_let;
746 use matrix_sdk_common::{deserialized_responses::EncryptionInfo, locks::Mutex};
747 use matrix_sdk_test::SyncResponseBuilder;
748 use ruma::{
749 event_id,
750 events::{
751 AnySyncStateEvent, AnySyncTimelineEvent, AnyToDeviceEvent,
752 macros::EventContent,
753 room::{
754 member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
755 name::OriginalSyncRoomNameEvent,
756 power_levels::OriginalSyncRoomPowerLevelsEvent,
757 },
758 secret_storage::key::SecretStorageKeyEvent,
759 typing::SyncTypingEvent,
760 },
761 mxc_uri,
762 room::JoinRule,
763 room_id,
764 serde::Raw,
765 user_id,
766 };
767 use serde_json::json;
768
769 use crate::{
770 Client, Room,
771 event_handler::Ctx,
772 test_utils::{logged_in_client, no_retry_test_client},
773 };
774
775 static MEMBER_EVENT: LazyLock<Raw<AnySyncTimelineEvent>> = LazyLock::new(|| {
776 EventFactory::new()
777 .member(user_id!("@example:localhost"))
778 .membership(MembershipState::Join)
779 .display_name("example")
780 .event_id(event_id!("$151800140517rfvjc:localhost"))
781 .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
782 .into()
783 });
784
785 #[async_test]
786 async fn test_add_event_handler() -> crate::Result<()> {
787 let client = logged_in_client(None).await;
788
789 let member_count = Arc::new(AtomicU8::new(0));
790 let typing_count = Arc::new(AtomicU8::new(0));
791 let power_levels_count = Arc::new(AtomicU8::new(0));
792 let invited_member_count = Arc::new(AtomicU8::new(0));
793
794 client.add_event_handler({
795 let member_count = member_count.clone();
796 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
797 member_count.fetch_add(1, SeqCst);
798 }
799 });
800 client.add_event_handler({
801 let typing_count = typing_count.clone();
802 move |_ev: SyncTypingEvent| async move {
803 typing_count.fetch_add(1, SeqCst);
804 }
805 });
806 client.add_event_handler({
807 let power_levels_count = power_levels_count.clone();
808 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
809 power_levels_count.fetch_add(1, SeqCst);
810 }
811 });
812 client.add_event_handler({
813 let invited_member_count = invited_member_count.clone();
814 move |_ev: StrippedRoomMemberEvent| async move {
815 invited_member_count.fetch_add(1, SeqCst);
816 }
817 });
818
819 let f = EventFactory::new().sender(user_id!("@example:localhost"));
820 let response = SyncResponseBuilder::default()
821 .add_joined_room(
822 JoinedRoomBuilder::default()
823 .add_timeline_event(MEMBER_EVENT.clone())
824 .add_typing(
825 f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
826 )
827 .add_state_event(f.default_power_levels()),
828 )
829 .add_invited_room(
830 InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event({
831 let bob = user_id!("@bob:example.org");
832 EventFactory::new()
833 .sender(user_id!("@example:example.org"))
834 .member(user_id!("@alice:example.org"))
835 .membership(MembershipState::Invite)
836 .display_name("Alice")
837 .avatar_url(mxc_uri!("mxc://example.org/SEsfnsuifSDFSSEF"))
838 .age(1234_i32)
839 .invite_room_state(vec![
840 Raw::from(f.room_name("Example Room").sender(bob)),
841 Raw::from(f.room_join_rules(JoinRule::Invite).sender(bob)),
842 ])
843 }),
844 )
845 .build_sync_response();
846 client.process_sync(response).await?;
847
848 assert_eq!(member_count.load(SeqCst), 1);
849 assert_eq!(typing_count.load(SeqCst), 1);
850 assert_eq!(power_levels_count.load(SeqCst), 1);
851 assert_eq!(invited_member_count.load(SeqCst), 1);
852
853 Ok(())
854 }
855
856 #[async_test]
857 async fn test_add_to_device_event_handler() -> crate::Result<()> {
858 let client = logged_in_client(None).await;
859
860 let captured_event: Arc<Mutex<Option<AnyToDeviceEvent>>> = Arc::new(Mutex::new(None));
861 let captured_info: Arc<Mutex<Option<EncryptionInfo>>> = Arc::new(Mutex::new(None));
862
863 client.add_event_handler({
864 let captured = captured_event.clone();
865 let captured_info = captured_info.clone();
866 move |ev: AnyToDeviceEvent, encryption_info: Option<EncryptionInfo>| {
867 let mut captured_lock = captured.lock();
868 *captured_lock = Some(ev);
869 let mut captured_info_lock = captured_info.lock();
870 *captured_info_lock = encryption_info;
871 future::ready(())
872 }
873 });
874
875 let response = SyncResponseBuilder::default()
876 .add_to_device_event(json!({
877 "sender": "@alice:example.com",
878 "type": "m.custom.to.device.type",
879 "content": {
880 "a": "test",
881 }
882 }))
883 .build_sync_response();
884 client.process_sync(response).await?;
885
886 let captured = captured_event.lock().clone();
887 assert_let!(Some(received_event) = captured);
888 assert_eq!(received_event.event_type().to_string(), "m.custom.to.device.type");
889 let info = captured_info.lock().clone();
890 assert!(info.is_none());
891 Ok(())
892 }
893
894 #[async_test]
895 async fn test_add_room_event_handler() -> crate::Result<()> {
896 let client = logged_in_client(None).await;
897
898 let room_id_a = room_id!("!foo:example.org");
899 let room_id_b = room_id!("!bar:matrix.org");
900
901 let member_count = Arc::new(AtomicU8::new(0));
902 let power_levels_count = Arc::new(AtomicU8::new(0));
903
904 client.add_room_event_handler(room_id_a, {
906 let member_count = member_count.clone();
907 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
908 member_count.fetch_add(1, SeqCst);
909 future::ready(())
910 }
911 });
912 client.add_room_event_handler(room_id_b, {
913 let member_count = member_count.clone();
914 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
915 member_count.fetch_add(1, SeqCst);
916 future::ready(())
917 }
918 });
919
920 client.add_room_event_handler(room_id_a, {
922 let power_levels_count = power_levels_count.clone();
923 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
924 power_levels_count.fetch_add(1, SeqCst);
925 future::ready(())
926 }
927 });
928
929 client.add_room_event_handler(
931 room_id_b,
932 #[allow(clippy::unused_unit)]
935 async move |_ev: OriginalSyncRoomNameEvent| -> () {
936 unreachable!("No room event in room B")
937 },
938 );
939
940 let f = EventFactory::new().sender(user_id!("@example:localhost"));
941 let response = SyncResponseBuilder::default()
942 .add_joined_room(
943 JoinedRoomBuilder::new(room_id_a)
944 .add_timeline_event(MEMBER_EVENT.clone())
945 .add_state_event(f.default_power_levels())
946 .add_state_event(f.room_name("room name")),
947 )
948 .add_joined_room(
949 JoinedRoomBuilder::new(room_id_b)
950 .add_timeline_event(MEMBER_EVENT.clone())
951 .add_state_event(f.default_power_levels()),
952 )
953 .build_sync_response();
954 client.process_sync(response).await?;
955
956 assert_eq!(member_count.load(SeqCst), 2);
957 assert_eq!(power_levels_count.load(SeqCst), 1);
958
959 Ok(())
960 }
961
962 #[async_test]
963 async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
964 let client = logged_in_client(None).await;
965
966 client.add_event_handler(
967 |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
968 );
969
970 Ok(())
973 }
974
975 #[async_test]
976 async fn test_remove_event_handler() -> crate::Result<()> {
977 let client = logged_in_client(None).await;
978
979 let member_count = Arc::new(AtomicU8::new(0));
980
981 client.add_event_handler({
982 let member_count = member_count.clone();
983 move |_ev: OriginalSyncRoomMemberEvent| async move {
984 member_count.fetch_add(1, SeqCst);
985 }
986 });
987
988 let handle_a = client.add_event_handler(
989 #[allow(clippy::unused_unit)]
992 async move |_ev: OriginalSyncRoomMemberEvent| -> () {
993 panic!("handler should have been removed");
994 },
995 );
996 let handle_b = client.add_room_event_handler(
997 #[allow(unknown_lints, clippy::explicit_auto_deref)] *DEFAULT_TEST_ROOM_ID,
999 #[allow(clippy::unused_unit)]
1002 async move |_ev: OriginalSyncRoomMemberEvent| -> () {
1003 panic!("handler should have been removed");
1004 },
1005 );
1006
1007 client.add_event_handler({
1008 let member_count = member_count.clone();
1009 move |_ev: OriginalSyncRoomMemberEvent| async move {
1010 member_count.fetch_add(1, SeqCst);
1011 }
1012 });
1013
1014 let response = SyncResponseBuilder::default()
1015 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1016 .build_sync_response();
1017
1018 client.remove_event_handler(handle_a);
1019 client.remove_event_handler(handle_b);
1020
1021 client.process_sync(response).await?;
1022
1023 assert_eq!(member_count.load(SeqCst), 2);
1024
1025 Ok(())
1026 }
1027
1028 #[async_test]
1029 async fn test_event_handler_drop_guard() {
1030 let client = no_retry_test_client(None).await;
1031
1032 let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
1033 assert_eq!(client.inner.event_handlers.len(), 1);
1034
1035 {
1036 let _guard = client.event_handler_drop_guard(handle);
1037 assert_eq!(client.inner.event_handlers.len(), 1);
1038 }
1040
1041 assert_eq!(client.inner.event_handlers.len(), 0);
1042 }
1043
1044 #[async_test]
1045 async fn test_use_client_in_handler() {
1046 let client = no_retry_test_client(None).await;
1050
1051 client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
1052 let _caps = client.get_capabilities().await.map_err(|e| anyhow::anyhow!("{}", e))?;
1056 anyhow::Ok(())
1057 });
1058 }
1059
1060 #[async_test]
1061 async fn test_raw_event_handler() -> crate::Result<()> {
1062 let client = logged_in_client(None).await;
1063 let counter = Arc::new(AtomicU8::new(0));
1064 client.add_event_handler_context(counter.clone());
1065 client.add_event_handler(
1066 |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
1067 counter.fetch_add(1, SeqCst);
1068 },
1069 );
1070
1071 let response = SyncResponseBuilder::default()
1072 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1073 .build_sync_response();
1074 client.process_sync(response).await?;
1075
1076 assert_eq!(counter.load(SeqCst), 1);
1077 Ok(())
1078 }
1079
1080 #[async_test]
1081 async fn test_enum_event_handler() -> crate::Result<()> {
1082 let client = logged_in_client(None).await;
1083 let counter = Arc::new(AtomicU8::new(0));
1084 client.add_event_handler_context(counter.clone());
1085 client.add_event_handler(
1086 |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
1087 counter.fetch_add(1, SeqCst);
1088 },
1089 );
1090
1091 let response = SyncResponseBuilder::default()
1092 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1093 .build_sync_response();
1094 client.process_sync(response).await?;
1095
1096 assert_eq!(counter.load(SeqCst), 1);
1097 Ok(())
1098 }
1099
1100 #[async_test]
1101 async fn test_observe_events() -> crate::Result<()> {
1102 let client = logged_in_client(None).await;
1103
1104 let room_id_0 = room_id!("!r0.matrix.org");
1105 let room_id_1 = room_id!("!r1.matrix.org");
1106
1107 let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1108
1109 let mut subscriber = observable.subscribe();
1110
1111 assert_pending!(subscriber);
1112
1113 let f = EventFactory::new().sender(user_id!("@mnt_io:matrix.org"));
1114 let mut response_builder = SyncResponseBuilder::new();
1115 let response = response_builder
1116 .add_joined_room(
1117 JoinedRoomBuilder::new(room_id_0)
1118 .add_state_event(f.room_name("Name 0").event_id(event_id!("$ev0"))),
1119 )
1120 .build_sync_response();
1121 client.process_sync(response).await?;
1122
1123 let (room_name, room) = assert_ready!(subscriber);
1124
1125 assert_eq!(room_name.event_id.as_str(), "$ev0");
1126 assert_eq!(room.room_id(), room_id_0);
1127 assert_eq!(room.name().unwrap(), "Name 0");
1128
1129 assert_pending!(subscriber);
1130
1131 let response = response_builder
1132 .add_joined_room(
1133 JoinedRoomBuilder::new(room_id_1)
1134 .add_state_event(f.room_name("Name 1").event_id(event_id!("$ev1"))),
1135 )
1136 .build_sync_response();
1137 client.process_sync(response).await?;
1138
1139 let (room_name, room) = assert_ready!(subscriber);
1140
1141 assert_eq!(room_name.event_id.as_str(), "$ev1");
1142 assert_eq!(room.room_id(), room_id_1);
1143 assert_eq!(room.name().unwrap(), "Name 1");
1144
1145 assert_pending!(subscriber);
1146
1147 drop(observable);
1148 assert_closed!(subscriber);
1149
1150 Ok(())
1151 }
1152
1153 #[async_test]
1154 async fn test_observe_room_events() -> crate::Result<()> {
1155 let client = logged_in_client(None).await;
1156
1157 let room_id = room_id!("!r0.matrix.org");
1158
1159 let observable_for_room =
1160 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1161
1162 let mut subscriber_for_room = observable_for_room.subscribe();
1163
1164 assert_pending!(subscriber_for_room);
1165
1166 let f = EventFactory::new().sender(user_id!("@mnt_io:matrix.org"));
1167 let mut response_builder = SyncResponseBuilder::new();
1168 let response = response_builder
1169 .add_joined_room(
1170 JoinedRoomBuilder::new(room_id)
1171 .add_state_event(f.room_name("Name 0").event_id(event_id!("$ev0"))),
1172 )
1173 .build_sync_response();
1174 client.process_sync(response).await?;
1175
1176 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1177
1178 assert_eq!(room_name.event_id.as_str(), "$ev0");
1179 assert_eq!(room.name().unwrap(), "Name 0");
1180
1181 assert_pending!(subscriber_for_room);
1182
1183 let response = response_builder
1184 .add_joined_room(
1185 JoinedRoomBuilder::new(room_id)
1186 .add_state_event(f.room_name("Name 1").event_id(event_id!("$ev1"))),
1187 )
1188 .build_sync_response();
1189 client.process_sync(response).await?;
1190
1191 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1192
1193 assert_eq!(room_name.event_id.as_str(), "$ev1");
1194 assert_eq!(room.name().unwrap(), "Name 1");
1195
1196 assert_pending!(subscriber_for_room);
1197
1198 drop(observable_for_room);
1199 assert_closed!(subscriber_for_room);
1200
1201 Ok(())
1202 }
1203
1204 #[async_test]
1205 async fn test_observe_several_room_events() -> crate::Result<()> {
1206 let client = logged_in_client(None).await;
1207
1208 let room_id = room_id!("!r0.matrix.org");
1209
1210 let observable_for_room =
1211 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1212
1213 let mut subscriber_for_room = observable_for_room.subscribe();
1214
1215 assert_pending!(subscriber_for_room);
1216
1217 let f = EventFactory::new().sender(user_id!("@mnt_io:matrix.org"));
1218 let mut response_builder = SyncResponseBuilder::new();
1219 let response = response_builder
1220 .add_joined_room(
1221 JoinedRoomBuilder::new(room_id)
1222 .add_state_event(f.room_name("Name 0").event_id(event_id!("$ev0")))
1223 .add_state_event(f.room_name("Name 1").event_id(event_id!("$ev1")))
1224 .add_state_event(f.room_name("Name 2").event_id(event_id!("$ev2"))),
1225 )
1226 .build_sync_response();
1227 client.process_sync(response).await?;
1228
1229 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1230
1231 assert_eq!(room_name.event_id.as_str(), "$ev2");
1233 assert_eq!(room.name().unwrap(), "Name 2");
1234
1235 assert_pending!(subscriber_for_room);
1236
1237 drop(observable_for_room);
1238 assert_closed!(subscriber_for_room);
1239
1240 Ok(())
1241 }
1242
1243 #[async_test]
1244 async fn test_observe_events_with_type_prefix() -> crate::Result<()> {
1245 let client = logged_in_client(None).await;
1246
1247 let observable = client.observe_events::<SecretStorageKeyEvent, ()>();
1248
1249 let mut subscriber = observable.subscribe();
1250
1251 assert_pending!(subscriber);
1252
1253 let mut response_builder = SyncResponseBuilder::new();
1254 let response = response_builder
1255 .add_custom_global_account_data(json!({
1256 "content": {
1257 "algorithm": "m.secret_storage.v1.aes-hmac-sha2",
1258 "iv": "gH2iNpiETFhApvW6/FFEJQ",
1259 "mac": "9Lw12m5SKDipNghdQXKjgpfdj1/K7HFI2brO+UWAGoM",
1260 "passphrase": {
1261 "algorithm": "m.pbkdf2",
1262 "salt": "IuLnH7S85YtZmkkBJKwNUKxWF42g9O1H",
1263 "iterations": 10,
1264 },
1265 },
1266 "type": "m.secret_storage.key.foobar",
1267 }))
1268 .build_sync_response();
1269 client.process_sync(response).await?;
1270
1271 let (secret_storage_key, ()) = assert_ready!(subscriber);
1272
1273 assert_eq!(secret_storage_key.content.key_id, "foobar");
1274
1275 assert_pending!(subscriber);
1276
1277 drop(observable);
1278 assert_closed!(subscriber);
1279
1280 Ok(())
1281 }
1282
1283 #[async_test]
1284 async fn test_observe_room_events_with_type_prefix() -> crate::Result<()> {
1285 #[derive(Debug, Clone, EventContent, Serialize)]
1289 #[ruma_event(type = "fake.event.*", kind = RoomAccountData)]
1290 struct AccountDataWithPrefixEventContent {
1291 #[ruma_event(type_fragment)]
1292 #[serde(skip)]
1293 key_id: String,
1294 }
1295
1296 let room_id = room_id!("!r0.matrix.org");
1297 let client = logged_in_client(None).await;
1298
1299 let observable = client.observe_room_events::<AccountDataWithPrefixEvent, Room>(room_id);
1300
1301 let mut subscriber = observable.subscribe();
1302
1303 assert_pending!(subscriber);
1304
1305 let mut response_builder = SyncResponseBuilder::new();
1306 let response = response_builder
1307 .add_joined_room(
1308 JoinedRoomBuilder::new(room_id).add_account_data_bulk([Raw::new(&json!({
1309 "content": {},
1310 "type": "fake.event.foobar",
1311 }))
1312 .unwrap()
1313 .cast_unchecked()]),
1314 )
1315 .build_sync_response();
1316 client.process_sync(response).await?;
1317
1318 let (secret_storage_key, _room) = assert_ready!(subscriber);
1319
1320 assert_eq!(secret_storage_key.content.key_id, "foobar");
1321
1322 assert_pending!(subscriber);
1323
1324 drop(observable);
1325 assert_closed!(subscriber);
1326
1327 Ok(())
1328 }
1329}