1#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37 borrow::Cow,
38 fmt,
39 future::Future,
40 pin::Pin,
41 sync::{
42 Arc, RwLock, Weak,
43 atomic::{AtomicU64, Ordering::SeqCst},
44 },
45 task::{Context, Poll},
46};
47
48#[cfg(target_family = "wasm")]
49use anymap2::any::CloneAny;
50#[cfg(not(target_family = "wasm"))]
51use anymap2::any::CloneAnySendSync;
52use eyeball::{SharedObservable, Subscriber};
53use futures_core::Stream;
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use matrix_sdk_base::{
56 SendOutsideWasm, SyncOutsideWasm,
57 deserialized_responses::{EncryptionInfo, TimelineEvent},
58 sync::State,
59};
60use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
61use pin_project_lite::pin_project;
62use ruma::{OwnedRoomId, events::BooleanType, push::Action, serde::Raw};
63use serde::{Deserialize, de::DeserializeOwned};
64use serde_json::value::RawValue as RawJsonValue;
65use tracing::{debug, error, field::debug, instrument, warn};
66
67use self::maps::EventHandlerMaps;
68use crate::{Client, Room};
69
70mod context;
71mod maps;
72mod static_events;
73
74pub use self::context::{Ctx, EventHandlerContext, RawEvent};
75
76#[cfg(not(target_family = "wasm"))]
77type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
78#[cfg(target_family = "wasm")]
79type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
80
81#[cfg(not(target_family = "wasm"))]
82type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
83#[cfg(target_family = "wasm")]
84type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
85
86#[cfg(not(target_family = "wasm"))]
87type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
88#[cfg(target_family = "wasm")]
89type AnyMap = anymap2::Map<dyn CloneAny>;
90
91#[derive(Default)]
92pub(crate) struct EventHandlerStore {
93 handlers: RwLock<EventHandlerMaps>,
94 context: RwLock<AnyMap>,
95 counter: AtomicU64,
96}
97
98impl EventHandlerStore {
99 pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
100 self.handlers.write().unwrap().add(handle, handler_fn);
101 }
102
103 pub fn add_context<T>(&self, ctx: T)
104 where
105 T: Clone + Send + Sync + 'static,
106 {
107 self.context.write().unwrap().insert(ctx);
108 }
109
110 pub fn remove(&self, handle: EventHandlerHandle) {
111 self.handlers.write().unwrap().remove(handle);
112 }
113
114 #[cfg(test)]
115 fn len(&self) -> usize {
116 self.handlers.read().unwrap().len()
117 }
118}
119
120#[doc(hidden)]
121#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
122pub enum HandlerKind {
123 GlobalAccountData,
124 RoomAccountData,
125 EphemeralRoomData,
126 Timeline,
127 MessageLike,
128 OriginalMessageLike,
129 RedactedMessageLike,
130 State,
131 OriginalState,
132 RedactedState,
133 StrippedState,
134 ToDevice,
135 Presence,
136}
137
138impl HandlerKind {
139 fn message_like_redacted(redacted: bool) -> Self {
140 if redacted { Self::RedactedMessageLike } else { Self::OriginalMessageLike }
141 }
142
143 fn state_redacted(redacted: bool) -> Self {
144 if redacted { Self::RedactedState } else { Self::OriginalState }
145 }
146}
147
148pub trait SyncEvent {
150 #[doc(hidden)]
151 const KIND: HandlerKind;
152 #[doc(hidden)]
153 const TYPE: Option<&'static str>;
154 #[doc(hidden)]
155 type IsPrefix: BooleanType;
156}
157
158pub(crate) struct EventHandlerWrapper {
159 handler_fn: Box<EventHandlerFn>,
160 pub handler_id: u64,
161}
162
163#[derive(Clone, Debug)]
166pub struct EventHandlerHandle {
167 pub(crate) ev_kind: HandlerKind,
168 pub(crate) ev_type: Option<StaticEventTypePart>,
169 pub(crate) room_id: Option<OwnedRoomId>,
170 pub(crate) handler_id: u64,
171}
172
173#[derive(Clone, Copy, Debug)]
175pub(crate) enum StaticEventTypePart {
176 Full(&'static str),
178 Prefix(&'static str),
180}
181
182pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
216 #[doc(hidden)]
218 type Future: EventHandlerFuture;
219
220 #[doc(hidden)]
227 fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
228}
229
230#[doc(hidden)]
231pub trait EventHandlerFuture:
232 Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
233{
234 type Output: EventHandlerResult;
235}
236
237impl<T> EventHandlerFuture for T
238where
239 T: Future + SendOutsideWasm + 'static,
240 <T as Future>::Output: EventHandlerResult,
241{
242 type Output = <T as Future>::Output;
243}
244
245#[doc(hidden)]
246#[derive(Debug)]
247pub struct EventHandlerData<'a> {
248 client: Client,
249 room: Option<Room>,
250 raw: &'a RawJsonValue,
251 encryption_info: Option<&'a EncryptionInfo>,
252 push_actions: &'a [Action],
253 handle: EventHandlerHandle,
254}
255
256pub trait EventHandlerResult: Sized {
260 #[doc(hidden)]
261 fn print_error(&self, event_type: Option<&str>);
262}
263
264impl EventHandlerResult for () {
265 fn print_error(&self, _event_type: Option<&str>) {}
266}
267
268impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
269 fn print_error(&self, event_type: Option<&str>) {
270 let msg_fragment = match event_type {
271 Some(event_type) => format!(" for `{event_type}`"),
272 None => "".to_owned(),
273 };
274
275 match self {
276 #[cfg(feature = "anyhow")]
277 Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
278 error!("Event handler{msg_fragment} failed: {e:?}");
279 }
280 #[cfg(feature = "eyre")]
281 Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
282 error!("Event handler{msg_fragment} failed: {e:?}");
283 }
284 Err(e) => {
285 error!("Event handler{msg_fragment} failed: {e}");
286 }
287 Ok(_) => {}
288 }
289 }
290}
291
292#[derive(Deserialize)]
293struct UnsignedDetails {
294 redacted_because: Option<serde::de::IgnoredAny>,
295}
296
297impl Client {
299 pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
300 &self,
301 handler: H,
302 room_id: Option<OwnedRoomId>,
303 ) -> EventHandlerHandle
304 where
305 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
306 H: EventHandler<Ev, Ctx>,
307 {
308 let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
309 let maybe_fut = serde_json::from_str(data.raw.get())
310 .map(|ev| handler.clone().handle_event(ev, data));
311
312 Box::pin(async move {
313 match maybe_fut {
314 Ok(Some(fut)) => {
315 fut.await.print_error(Ev::TYPE);
316 }
317 Ok(None) => {
318 error!(
319 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
320 "Event handler has an invalid context argument",
321 );
322 }
323 Err(e) => {
324 warn!(
325 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
326 "Failed to deserialize event, skipping event handler.\n
327 Deserialization error: {e}",
328 );
329 }
330 }
331 })
332 });
333
334 let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
335 let ev_type = Ev::TYPE.map(|ev_type| {
336 if Ev::IsPrefix::as_bool() {
337 StaticEventTypePart::Prefix(ev_type)
338 } else {
339 StaticEventTypePart::Full(ev_type)
340 }
341 });
342 let handle = EventHandlerHandle { ev_kind: Ev::KIND, ev_type, room_id, handler_id };
343
344 self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
345
346 handle
347 }
348
349 pub(crate) async fn handle_sync_events<T>(
350 &self,
351 kind: HandlerKind,
352 room: Option<&Room>,
353 events: &[Raw<T>],
354 ) -> serde_json::Result<()> {
355 #[derive(Deserialize)]
356 struct ExtractType<'a> {
357 #[serde(borrow, rename = "type")]
358 event_type: Cow<'a, str>,
359 }
360
361 for raw_event in events {
362 let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
363 self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
364 }
365
366 Ok(())
367 }
368
369 pub(crate) async fn handle_sync_to_device_events(
370 &self,
371 events: &[ProcessedToDeviceEvent],
372 ) -> serde_json::Result<()> {
373 #[derive(Deserialize)]
374 struct ExtractType<'a> {
375 #[serde(borrow, rename = "type")]
376 event_type: Cow<'a, str>,
377 }
378
379 for processed_to_device in events {
380 let (raw_event, encryption_info) = match processed_to_device {
381 ProcessedToDeviceEvent::Decrypted { raw, encryption_info } => {
382 (raw, Some(encryption_info))
383 }
384 other => (&other.to_raw(), None),
385 };
386 let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
387 self.call_event_handlers(
388 None,
389 raw_event.json(),
390 HandlerKind::ToDevice,
391 &event_type,
392 encryption_info,
393 &[],
394 )
395 .await;
396 }
397
398 Ok(())
399 }
400
401 pub(crate) async fn handle_sync_state_events(
402 &self,
403 room: Option<&Room>,
404 state: &State,
405 ) -> serde_json::Result<()> {
406 #[derive(Deserialize)]
407 struct StateEventDetails<'a> {
408 #[serde(borrow, rename = "type")]
409 event_type: Cow<'a, str>,
410 unsigned: Option<UnsignedDetails>,
411 }
412
413 let state_events = match state {
414 State::Before(events) => events,
415 State::After(events) => events,
416 };
417
418 self.handle_sync_events(HandlerKind::State, room, state_events).await?;
420
421 for raw_event in state_events {
423 let StateEventDetails { event_type, unsigned } =
424 raw_event.deserialize_as_unchecked()?;
425 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
426 let handler_kind = HandlerKind::state_redacted(redacted);
427
428 self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
429 .await;
430 }
431
432 Ok(())
433 }
434
435 pub(crate) async fn handle_sync_timeline_events(
436 &self,
437 room: Option<&Room>,
438 timeline_events: &[TimelineEvent],
439 ) -> serde_json::Result<()> {
440 #[derive(Deserialize)]
441 struct TimelineEventDetails<'a> {
442 #[serde(borrow, rename = "type")]
443 event_type: Cow<'a, str>,
444 state_key: Option<serde::de::IgnoredAny>,
445 unsigned: Option<UnsignedDetails>,
446 }
447
448 for item in timeline_events {
449 let TimelineEventDetails { event_type, state_key, unsigned } =
450 item.raw().deserialize_as_unchecked()?;
451
452 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
453 let (handler_kind_g, handler_kind_r) = match state_key {
454 Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
455 None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
456 };
457
458 let raw_event = item.raw().json();
459 let encryption_info = item.encryption_info().map(|i| &**i);
460 let push_actions = item.push_actions().unwrap_or(&[]);
461
462 self.call_event_handlers(
464 room,
465 raw_event,
466 handler_kind_g,
467 &event_type,
468 encryption_info,
469 push_actions,
470 )
471 .await;
472
473 self.call_event_handlers(
475 room,
476 raw_event,
477 handler_kind_r,
478 &event_type,
479 encryption_info,
480 push_actions,
481 )
482 .await;
483
484 let kind = HandlerKind::Timeline;
486 self.call_event_handlers(
487 room,
488 raw_event,
489 kind,
490 &event_type,
491 encryption_info,
492 push_actions,
493 )
494 .await;
495 }
496
497 Ok(())
498 }
499
500 #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
501 async fn call_event_handlers(
502 &self,
503 room: Option<&Room>,
504 raw: &RawJsonValue,
505 event_kind: HandlerKind,
506 event_type: &str,
507 encryption_info: Option<&EncryptionInfo>,
508 push_actions: &[Action],
509 ) {
510 let room_id = room.map(|r| r.room_id());
511 if let Some(room_id) = room_id {
512 tracing::Span::current().record("room_id", debug(room_id));
513 }
514
515 let mut futures: FuturesUnordered<_> = self
517 .inner
518 .event_handlers
519 .handlers
520 .read()
521 .unwrap()
522 .get_handlers(event_kind, event_type, room_id)
523 .map(|(handle, handler_fn)| {
524 let data = EventHandlerData {
525 client: self.clone(),
526 room: room.cloned(),
527 raw,
528 encryption_info,
529 push_actions,
530 handle,
531 };
532
533 (handler_fn)(data)
534 })
535 .collect();
536
537 if !futures.is_empty() {
538 debug!(amount = futures.len(), "Calling event handlers");
539
540 while let Some(()) = futures.next().await {}
543 }
544 }
545}
546
547#[derive(Debug)]
552pub struct EventHandlerDropGuard {
553 handle: EventHandlerHandle,
554 client: Client,
555}
556
557impl EventHandlerDropGuard {
558 pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
559 Self { handle, client }
560 }
561}
562
563impl Drop for EventHandlerDropGuard {
564 fn drop(&mut self) {
565 self.client.remove_event_handler(self.handle.clone());
566 }
567}
568
569macro_rules! impl_event_handler {
570 ($($ty:ident),* $(,)?) => {
571 impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
572 where
573 Ev: SyncEvent,
574 Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
575 Fut: EventHandlerFuture,
576 $($ty: EventHandlerContext),*
577 {
578 type Future = Fut;
579
580 fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
581 Some((self)(ev, $($ty::from_data(&_d)?),*))
582 }
583 }
584 };
585}
586
587impl_event_handler!();
588impl_event_handler!(A);
589impl_event_handler!(A, B);
590impl_event_handler!(A, B, C);
591impl_event_handler!(A, B, C, D);
592impl_event_handler!(A, B, C, D, E);
593impl_event_handler!(A, B, C, D, E, F);
594impl_event_handler!(A, B, C, D, E, F, G);
595impl_event_handler!(A, B, C, D, E, F, G, H);
596
597#[derive(Debug)]
605pub struct ObservableEventHandler<T> {
606 shared_observable: SharedObservable<Option<T>>,
611
612 event_handler_guard: Arc<EventHandlerDropGuard>,
619}
620
621impl<T> ObservableEventHandler<T> {
622 pub(crate) fn new(
623 shared_observable: SharedObservable<Option<T>>,
624 event_handler_guard: EventHandlerDropGuard,
625 ) -> Self {
626 Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
627 }
628
629 pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
634 EventHandlerSubscriber::new(
635 self.shared_observable.subscribe(),
636 Arc::downgrade(&self.event_handler_guard),
639 )
640 }
641}
642
643pin_project! {
644 #[derive(Debug)]
653 pub struct EventHandlerSubscriber<T> {
654 #[pin]
660 subscriber: Subscriber<Option<T>>,
661
662 event_handler_guard: Weak<EventHandlerDropGuard>,
668 }
669}
670
671impl<T> EventHandlerSubscriber<T> {
672 fn new(
673 subscriber: Subscriber<Option<T>>,
674 event_handler_handle: Weak<EventHandlerDropGuard>,
675 ) -> Self {
676 Self { subscriber, event_handler_guard: event_handler_handle }
677 }
678}
679
680impl<T> Stream for EventHandlerSubscriber<T>
681where
682 T: Clone,
683{
684 type Item = T;
685
686 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
687 let mut this = self.project();
688
689 let Some(_) = this.event_handler_guard.upgrade() else {
690 return Poll::Ready(None);
694 };
695
696 loop {
706 match this.subscriber.as_mut().poll_next(context) {
707 Poll::Ready(None) => return Poll::Ready(None),
709
710 Poll::Ready(Some(None)) => {
713 continue;
715 }
716
717 Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
719
720 Poll::Pending => return Poll::Pending,
722 }
723 }
724 }
725}
726
727#[cfg(test)]
728mod tests {
729 use matrix_sdk_test::{
730 DEFAULT_TEST_ROOM_ID, InvitedRoomBuilder, JoinedRoomBuilder, async_test,
731 event_factory::{EventFactory, PreviousMembership},
732 };
733 use serde::Serialize;
734 use stream_assert::{assert_closed, assert_pending, assert_ready};
735 #[cfg(target_family = "wasm")]
736 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
737 use std::{
738 future,
739 sync::{
740 Arc,
741 atomic::{AtomicU8, Ordering::SeqCst},
742 },
743 };
744
745 use assert_matches2::assert_let;
746 use matrix_sdk_common::{deserialized_responses::EncryptionInfo, locks::Mutex};
747 use matrix_sdk_test::{StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder};
748 use once_cell::sync::Lazy;
749 use ruma::{
750 event_id,
751 events::{
752 AnySyncStateEvent, AnySyncTimelineEvent, AnyToDeviceEvent,
753 macros::EventContent,
754 room::{
755 member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
756 name::OriginalSyncRoomNameEvent,
757 power_levels::OriginalSyncRoomPowerLevelsEvent,
758 },
759 secret_storage::key::SecretStorageKeyEvent,
760 typing::SyncTypingEvent,
761 },
762 room_id,
763 serde::Raw,
764 user_id,
765 };
766 use serde_json::json;
767
768 use crate::{
769 Client, Room,
770 event_handler::Ctx,
771 test_utils::{logged_in_client, no_retry_test_client},
772 };
773
774 static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
775 EventFactory::new()
776 .member(user_id!("@example:localhost"))
777 .membership(MembershipState::Join)
778 .display_name("example")
779 .event_id(event_id!("$151800140517rfvjc:localhost"))
780 .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
781 .into()
782 });
783
784 #[async_test]
785 async fn test_add_event_handler() -> crate::Result<()> {
786 let client = logged_in_client(None).await;
787
788 let member_count = Arc::new(AtomicU8::new(0));
789 let typing_count = Arc::new(AtomicU8::new(0));
790 let power_levels_count = Arc::new(AtomicU8::new(0));
791 let invited_member_count = Arc::new(AtomicU8::new(0));
792
793 client.add_event_handler({
794 let member_count = member_count.clone();
795 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
796 member_count.fetch_add(1, SeqCst);
797 }
798 });
799 client.add_event_handler({
800 let typing_count = typing_count.clone();
801 move |_ev: SyncTypingEvent| async move {
802 typing_count.fetch_add(1, SeqCst);
803 }
804 });
805 client.add_event_handler({
806 let power_levels_count = power_levels_count.clone();
807 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
808 power_levels_count.fetch_add(1, SeqCst);
809 }
810 });
811 client.add_event_handler({
812 let invited_member_count = invited_member_count.clone();
813 move |_ev: StrippedRoomMemberEvent| async move {
814 invited_member_count.fetch_add(1, SeqCst);
815 }
816 });
817
818 let f = EventFactory::new();
819 let response = SyncResponseBuilder::default()
820 .add_joined_room(
821 JoinedRoomBuilder::default()
822 .add_timeline_event(MEMBER_EVENT.clone())
823 .add_typing(
824 f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
825 )
826 .add_state_event(StateTestEvent::PowerLevels),
827 )
828 .add_invited_room(
829 InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
830 StrippedStateTestEvent::Custom(json!({
831 "content": {
832 "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
833 "displayname": "Alice",
834 "membership": "invite",
835 },
836 "event_id": "$143273582443PhrSn:example.org",
837 "origin_server_ts": 1432735824653u64,
838 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
839 "sender": "@example:example.org",
840 "state_key": "@alice:example.org",
841 "type": "m.room.member",
842 "unsigned": {
843 "age": 1234,
844 "invite_room_state": [
845 {
846 "content": {
847 "name": "Example Room"
848 },
849 "sender": "@bob:example.org",
850 "state_key": "",
851 "type": "m.room.name"
852 },
853 {
854 "content": {
855 "join_rule": "invite"
856 },
857 "sender": "@bob:example.org",
858 "state_key": "",
859 "type": "m.room.join_rules"
860 }
861 ]
862 }
863 })),
864 ),
865 )
866 .build_sync_response();
867 client.process_sync(response).await?;
868
869 assert_eq!(member_count.load(SeqCst), 1);
870 assert_eq!(typing_count.load(SeqCst), 1);
871 assert_eq!(power_levels_count.load(SeqCst), 1);
872 assert_eq!(invited_member_count.load(SeqCst), 1);
873
874 Ok(())
875 }
876
877 #[async_test]
878 async fn test_add_to_device_event_handler() -> crate::Result<()> {
879 let client = logged_in_client(None).await;
880
881 let captured_event: Arc<Mutex<Option<AnyToDeviceEvent>>> = Arc::new(Mutex::new(None));
882 let captured_info: Arc<Mutex<Option<EncryptionInfo>>> = Arc::new(Mutex::new(None));
883
884 client.add_event_handler({
885 let captured = captured_event.clone();
886 let captured_info = captured_info.clone();
887 move |ev: AnyToDeviceEvent, encryption_info: Option<EncryptionInfo>| {
888 let mut captured_lock = captured.lock();
889 *captured_lock = Some(ev);
890 let mut captured_info_lock = captured_info.lock();
891 *captured_info_lock = encryption_info;
892 future::ready(())
893 }
894 });
895
896 let response = SyncResponseBuilder::default()
897 .add_to_device_event(json!({
898 "sender": "@alice:example.com",
899 "type": "m.custom.to.device.type",
900 "content": {
901 "a": "test",
902 }
903 }))
904 .build_sync_response();
905 client.process_sync(response).await?;
906
907 let captured = captured_event.lock().clone();
908 assert_let!(Some(received_event) = captured);
909 assert_eq!(received_event.event_type().to_string(), "m.custom.to.device.type");
910 let info = captured_info.lock().clone();
911 assert!(info.is_none());
912 Ok(())
913 }
914
915 #[async_test]
916 async fn test_add_room_event_handler() -> crate::Result<()> {
917 let client = logged_in_client(None).await;
918
919 let room_id_a = room_id!("!foo:example.org");
920 let room_id_b = room_id!("!bar:matrix.org");
921
922 let member_count = Arc::new(AtomicU8::new(0));
923 let power_levels_count = Arc::new(AtomicU8::new(0));
924
925 client.add_room_event_handler(room_id_a, {
927 let member_count = member_count.clone();
928 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
929 member_count.fetch_add(1, SeqCst);
930 future::ready(())
931 }
932 });
933 client.add_room_event_handler(room_id_b, {
934 let member_count = member_count.clone();
935 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
936 member_count.fetch_add(1, SeqCst);
937 future::ready(())
938 }
939 });
940
941 client.add_room_event_handler(room_id_a, {
943 let power_levels_count = power_levels_count.clone();
944 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
945 power_levels_count.fetch_add(1, SeqCst);
946 future::ready(())
947 }
948 });
949
950 client.add_room_event_handler(
952 room_id_b,
953 #[allow(clippy::unused_unit)]
956 async move |_ev: OriginalSyncRoomNameEvent| -> () {
957 unreachable!("No room event in room B")
958 },
959 );
960
961 let response = SyncResponseBuilder::default()
962 .add_joined_room(
963 JoinedRoomBuilder::new(room_id_a)
964 .add_timeline_event(MEMBER_EVENT.clone())
965 .add_state_event(StateTestEvent::PowerLevels)
966 .add_state_event(StateTestEvent::RoomName),
967 )
968 .add_joined_room(
969 JoinedRoomBuilder::new(room_id_b)
970 .add_timeline_event(MEMBER_EVENT.clone())
971 .add_state_event(StateTestEvent::PowerLevels),
972 )
973 .build_sync_response();
974 client.process_sync(response).await?;
975
976 assert_eq!(member_count.load(SeqCst), 2);
977 assert_eq!(power_levels_count.load(SeqCst), 1);
978
979 Ok(())
980 }
981
982 #[async_test]
983 async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
984 let client = logged_in_client(None).await;
985
986 client.add_event_handler(
987 |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
988 );
989
990 Ok(())
993 }
994
995 #[async_test]
996 async fn test_remove_event_handler() -> crate::Result<()> {
997 let client = logged_in_client(None).await;
998
999 let member_count = Arc::new(AtomicU8::new(0));
1000
1001 client.add_event_handler({
1002 let member_count = member_count.clone();
1003 move |_ev: OriginalSyncRoomMemberEvent| async move {
1004 member_count.fetch_add(1, SeqCst);
1005 }
1006 });
1007
1008 let handle_a = client.add_event_handler(
1009 #[allow(clippy::unused_unit)]
1012 async move |_ev: OriginalSyncRoomMemberEvent| -> () {
1013 panic!("handler should have been removed");
1014 },
1015 );
1016 let handle_b = client.add_room_event_handler(
1017 #[allow(unknown_lints, clippy::explicit_auto_deref)] *DEFAULT_TEST_ROOM_ID,
1019 #[allow(clippy::unused_unit)]
1022 async move |_ev: OriginalSyncRoomMemberEvent| -> () {
1023 panic!("handler should have been removed");
1024 },
1025 );
1026
1027 client.add_event_handler({
1028 let member_count = member_count.clone();
1029 move |_ev: OriginalSyncRoomMemberEvent| async move {
1030 member_count.fetch_add(1, SeqCst);
1031 }
1032 });
1033
1034 let response = SyncResponseBuilder::default()
1035 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1036 .build_sync_response();
1037
1038 client.remove_event_handler(handle_a);
1039 client.remove_event_handler(handle_b);
1040
1041 client.process_sync(response).await?;
1042
1043 assert_eq!(member_count.load(SeqCst), 2);
1044
1045 Ok(())
1046 }
1047
1048 #[async_test]
1049 async fn test_event_handler_drop_guard() {
1050 let client = no_retry_test_client(None).await;
1051
1052 let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
1053 assert_eq!(client.inner.event_handlers.len(), 1);
1054
1055 {
1056 let _guard = client.event_handler_drop_guard(handle);
1057 assert_eq!(client.inner.event_handlers.len(), 1);
1058 }
1060
1061 assert_eq!(client.inner.event_handlers.len(), 0);
1062 }
1063
1064 #[async_test]
1065 async fn test_use_client_in_handler() {
1066 let client = no_retry_test_client(None).await;
1070
1071 client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
1072 let _caps = client.get_capabilities().await.map_err(|e| anyhow::anyhow!("{}", e))?;
1076 anyhow::Ok(())
1077 });
1078 }
1079
1080 #[async_test]
1081 async fn test_raw_event_handler() -> crate::Result<()> {
1082 let client = logged_in_client(None).await;
1083 let counter = Arc::new(AtomicU8::new(0));
1084 client.add_event_handler_context(counter.clone());
1085 client.add_event_handler(
1086 |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
1087 counter.fetch_add(1, SeqCst);
1088 },
1089 );
1090
1091 let response = SyncResponseBuilder::default()
1092 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1093 .build_sync_response();
1094 client.process_sync(response).await?;
1095
1096 assert_eq!(counter.load(SeqCst), 1);
1097 Ok(())
1098 }
1099
1100 #[async_test]
1101 async fn test_enum_event_handler() -> crate::Result<()> {
1102 let client = logged_in_client(None).await;
1103 let counter = Arc::new(AtomicU8::new(0));
1104 client.add_event_handler_context(counter.clone());
1105 client.add_event_handler(
1106 |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
1107 counter.fetch_add(1, SeqCst);
1108 },
1109 );
1110
1111 let response = SyncResponseBuilder::default()
1112 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1113 .build_sync_response();
1114 client.process_sync(response).await?;
1115
1116 assert_eq!(counter.load(SeqCst), 1);
1117 Ok(())
1118 }
1119
1120 #[async_test]
1121 async fn test_observe_events() -> crate::Result<()> {
1122 let client = logged_in_client(None).await;
1123
1124 let room_id_0 = room_id!("!r0.matrix.org");
1125 let room_id_1 = room_id!("!r1.matrix.org");
1126
1127 let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1128
1129 let mut subscriber = observable.subscribe();
1130
1131 assert_pending!(subscriber);
1132
1133 let mut response_builder = SyncResponseBuilder::new();
1134 let response = response_builder
1135 .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1136 StateTestEvent::Custom(json!({
1137 "content": {
1138 "name": "Name 0"
1139 },
1140 "event_id": "$ev0",
1141 "origin_server_ts": 1,
1142 "sender": "@mnt_io:matrix.org",
1143 "state_key": "",
1144 "type": "m.room.name",
1145 "unsigned": {
1146 "age": 1,
1147 }
1148 })),
1149 ))
1150 .build_sync_response();
1151 client.process_sync(response).await?;
1152
1153 let (room_name, room) = assert_ready!(subscriber);
1154
1155 assert_eq!(room_name.event_id.as_str(), "$ev0");
1156 assert_eq!(room.room_id(), room_id_0);
1157 assert_eq!(room.name().unwrap(), "Name 0");
1158
1159 assert_pending!(subscriber);
1160
1161 let response = response_builder
1162 .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1163 StateTestEvent::Custom(json!({
1164 "content": {
1165 "name": "Name 1"
1166 },
1167 "event_id": "$ev1",
1168 "origin_server_ts": 2,
1169 "sender": "@mnt_io:matrix.org",
1170 "state_key": "",
1171 "type": "m.room.name",
1172 "unsigned": {
1173 "age": 2,
1174 }
1175 })),
1176 ))
1177 .build_sync_response();
1178 client.process_sync(response).await?;
1179
1180 let (room_name, room) = assert_ready!(subscriber);
1181
1182 assert_eq!(room_name.event_id.as_str(), "$ev1");
1183 assert_eq!(room.room_id(), room_id_1);
1184 assert_eq!(room.name().unwrap(), "Name 1");
1185
1186 assert_pending!(subscriber);
1187
1188 drop(observable);
1189 assert_closed!(subscriber);
1190
1191 Ok(())
1192 }
1193
1194 #[async_test]
1195 async fn test_observe_room_events() -> crate::Result<()> {
1196 let client = logged_in_client(None).await;
1197
1198 let room_id = room_id!("!r0.matrix.org");
1199
1200 let observable_for_room =
1201 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1202
1203 let mut subscriber_for_room = observable_for_room.subscribe();
1204
1205 assert_pending!(subscriber_for_room);
1206
1207 let mut response_builder = SyncResponseBuilder::new();
1208 let response = response_builder
1209 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1210 StateTestEvent::Custom(json!({
1211 "content": {
1212 "name": "Name 0"
1213 },
1214 "event_id": "$ev0",
1215 "origin_server_ts": 1,
1216 "sender": "@mnt_io:matrix.org",
1217 "state_key": "",
1218 "type": "m.room.name",
1219 "unsigned": {
1220 "age": 1,
1221 }
1222 })),
1223 ))
1224 .build_sync_response();
1225 client.process_sync(response).await?;
1226
1227 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1228
1229 assert_eq!(room_name.event_id.as_str(), "$ev0");
1230 assert_eq!(room.name().unwrap(), "Name 0");
1231
1232 assert_pending!(subscriber_for_room);
1233
1234 let response = response_builder
1235 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1236 StateTestEvent::Custom(json!({
1237 "content": {
1238 "name": "Name 1"
1239 },
1240 "event_id": "$ev1",
1241 "origin_server_ts": 2,
1242 "sender": "@mnt_io:matrix.org",
1243 "state_key": "",
1244 "type": "m.room.name",
1245 "unsigned": {
1246 "age": 2,
1247 }
1248 })),
1249 ))
1250 .build_sync_response();
1251 client.process_sync(response).await?;
1252
1253 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1254
1255 assert_eq!(room_name.event_id.as_str(), "$ev1");
1256 assert_eq!(room.name().unwrap(), "Name 1");
1257
1258 assert_pending!(subscriber_for_room);
1259
1260 drop(observable_for_room);
1261 assert_closed!(subscriber_for_room);
1262
1263 Ok(())
1264 }
1265
1266 #[async_test]
1267 async fn test_observe_several_room_events() -> crate::Result<()> {
1268 let client = logged_in_client(None).await;
1269
1270 let room_id = room_id!("!r0.matrix.org");
1271
1272 let observable_for_room =
1273 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1274
1275 let mut subscriber_for_room = observable_for_room.subscribe();
1276
1277 assert_pending!(subscriber_for_room);
1278
1279 let mut response_builder = SyncResponseBuilder::new();
1280 let response = response_builder
1281 .add_joined_room(
1282 JoinedRoomBuilder::new(room_id)
1283 .add_state_event(StateTestEvent::Custom(json!({
1284 "content": {
1285 "name": "Name 0"
1286 },
1287 "event_id": "$ev0",
1288 "origin_server_ts": 1,
1289 "sender": "@mnt_io:matrix.org",
1290 "state_key": "",
1291 "type": "m.room.name",
1292 "unsigned": {
1293 "age": 1,
1294 }
1295 })))
1296 .add_state_event(StateTestEvent::Custom(json!({
1297 "content": {
1298 "name": "Name 1"
1299 },
1300 "event_id": "$ev1",
1301 "origin_server_ts": 2,
1302 "sender": "@mnt_io:matrix.org",
1303 "state_key": "",
1304 "type": "m.room.name",
1305 "unsigned": {
1306 "age": 1,
1307 }
1308 })))
1309 .add_state_event(StateTestEvent::Custom(json!({
1310 "content": {
1311 "name": "Name 2"
1312 },
1313 "event_id": "$ev2",
1314 "origin_server_ts": 3,
1315 "sender": "@mnt_io:matrix.org",
1316 "state_key": "",
1317 "type": "m.room.name",
1318 "unsigned": {
1319 "age": 1,
1320 }
1321 }))),
1322 )
1323 .build_sync_response();
1324 client.process_sync(response).await?;
1325
1326 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1327
1328 assert_eq!(room_name.event_id.as_str(), "$ev2");
1330 assert_eq!(room.name().unwrap(), "Name 2");
1331
1332 assert_pending!(subscriber_for_room);
1333
1334 drop(observable_for_room);
1335 assert_closed!(subscriber_for_room);
1336
1337 Ok(())
1338 }
1339
1340 #[async_test]
1341 async fn test_observe_events_with_type_prefix() -> crate::Result<()> {
1342 let client = logged_in_client(None).await;
1343
1344 let observable = client.observe_events::<SecretStorageKeyEvent, ()>();
1345
1346 let mut subscriber = observable.subscribe();
1347
1348 assert_pending!(subscriber);
1349
1350 let mut response_builder = SyncResponseBuilder::new();
1351 let response = response_builder
1352 .add_custom_global_account_data(json!({
1353 "content": {
1354 "algorithm": "m.secret_storage.v1.aes-hmac-sha2",
1355 "iv": "gH2iNpiETFhApvW6/FFEJQ",
1356 "mac": "9Lw12m5SKDipNghdQXKjgpfdj1/K7HFI2brO+UWAGoM",
1357 "passphrase": {
1358 "algorithm": "m.pbkdf2",
1359 "salt": "IuLnH7S85YtZmkkBJKwNUKxWF42g9O1H",
1360 "iterations": 10,
1361 },
1362 },
1363 "type": "m.secret_storage.key.foobar",
1364 }))
1365 .build_sync_response();
1366 client.process_sync(response).await?;
1367
1368 let (secret_storage_key, ()) = assert_ready!(subscriber);
1369
1370 assert_eq!(secret_storage_key.content.key_id, "foobar");
1371
1372 assert_pending!(subscriber);
1373
1374 drop(observable);
1375 assert_closed!(subscriber);
1376
1377 Ok(())
1378 }
1379
1380 #[async_test]
1381 async fn test_observe_room_events_with_type_prefix() -> crate::Result<()> {
1382 #[derive(Debug, Clone, EventContent, Serialize)]
1386 #[ruma_event(type = "fake.event.*", kind = RoomAccountData)]
1387 struct AccountDataWithPrefixEventContent {
1388 #[ruma_event(type_fragment)]
1389 #[serde(skip)]
1390 key_id: String,
1391 }
1392
1393 let room_id = room_id!("!r0.matrix.org");
1394 let client = logged_in_client(None).await;
1395
1396 let observable = client.observe_room_events::<AccountDataWithPrefixEvent, Room>(room_id);
1397
1398 let mut subscriber = observable.subscribe();
1399
1400 assert_pending!(subscriber);
1401
1402 let mut response_builder = SyncResponseBuilder::new();
1403 let response = response_builder
1404 .add_joined_room(
1405 JoinedRoomBuilder::new(room_id).add_account_data_bulk([Raw::new(&json!({
1406 "content": {},
1407 "type": "fake.event.foobar",
1408 }))
1409 .unwrap()
1410 .cast_unchecked()]),
1411 )
1412 .build_sync_response();
1413 client.process_sync(response).await?;
1414
1415 let (secret_storage_key, _room) = assert_ready!(subscriber);
1416
1417 assert_eq!(secret_storage_key.content.key_id, "foobar");
1418
1419 assert_pending!(subscriber);
1420
1421 drop(observable);
1422 assert_closed!(subscriber);
1423
1424 Ok(())
1425 }
1426}