1#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37 borrow::Cow,
38 fmt,
39 future::Future,
40 pin::Pin,
41 sync::{
42 atomic::{AtomicU64, Ordering::SeqCst},
43 Arc, RwLock, Weak,
44 },
45 task::{Context, Poll},
46};
47
48#[cfg(target_family = "wasm")]
49use anymap2::any::CloneAny;
50#[cfg(not(target_family = "wasm"))]
51use anymap2::any::CloneAnySendSync;
52use eyeball::{SharedObservable, Subscriber};
53use futures_core::Stream;
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use matrix_sdk_base::{
56 deserialized_responses::{EncryptionInfo, TimelineEvent},
57 sync::State,
58 SendOutsideWasm, SyncOutsideWasm,
59};
60use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
61use pin_project_lite::pin_project;
62use ruma::{events::BooleanType, push::Action, serde::Raw, OwnedRoomId};
63use serde::{de::DeserializeOwned, Deserialize};
64use serde_json::value::RawValue as RawJsonValue;
65use tracing::{debug, error, field::debug, instrument, warn};
66
67use self::maps::EventHandlerMaps;
68use crate::{Client, Room};
69
70mod context;
71mod maps;
72mod static_events;
73
74pub use self::context::{Ctx, EventHandlerContext, RawEvent};
75
76#[cfg(not(target_family = "wasm"))]
77type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
78#[cfg(target_family = "wasm")]
79type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
80
81#[cfg(not(target_family = "wasm"))]
82type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
83#[cfg(target_family = "wasm")]
84type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
85
86#[cfg(not(target_family = "wasm"))]
87type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
88#[cfg(target_family = "wasm")]
89type AnyMap = anymap2::Map<dyn CloneAny>;
90
91#[derive(Default)]
92pub(crate) struct EventHandlerStore {
93 handlers: RwLock<EventHandlerMaps>,
94 context: RwLock<AnyMap>,
95 counter: AtomicU64,
96}
97
98impl EventHandlerStore {
99 pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
100 self.handlers.write().unwrap().add(handle, handler_fn);
101 }
102
103 pub fn add_context<T>(&self, ctx: T)
104 where
105 T: Clone + Send + Sync + 'static,
106 {
107 self.context.write().unwrap().insert(ctx);
108 }
109
110 pub fn remove(&self, handle: EventHandlerHandle) {
111 self.handlers.write().unwrap().remove(handle);
112 }
113
114 #[cfg(test)]
115 fn len(&self) -> usize {
116 self.handlers.read().unwrap().len()
117 }
118}
119
120#[doc(hidden)]
121#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
122pub enum HandlerKind {
123 GlobalAccountData,
124 RoomAccountData,
125 EphemeralRoomData,
126 Timeline,
127 MessageLike,
128 OriginalMessageLike,
129 RedactedMessageLike,
130 State,
131 OriginalState,
132 RedactedState,
133 StrippedState,
134 ToDevice,
135 Presence,
136}
137
138impl HandlerKind {
139 fn message_like_redacted(redacted: bool) -> Self {
140 if redacted {
141 Self::RedactedMessageLike
142 } else {
143 Self::OriginalMessageLike
144 }
145 }
146
147 fn state_redacted(redacted: bool) -> Self {
148 if redacted {
149 Self::RedactedState
150 } else {
151 Self::OriginalState
152 }
153 }
154}
155
156pub trait SyncEvent {
158 #[doc(hidden)]
159 const KIND: HandlerKind;
160 #[doc(hidden)]
161 const TYPE: Option<&'static str>;
162 #[doc(hidden)]
163 type IsPrefix: BooleanType;
164}
165
166pub(crate) struct EventHandlerWrapper {
167 handler_fn: Box<EventHandlerFn>,
168 pub handler_id: u64,
169}
170
171#[derive(Clone, Debug)]
174pub struct EventHandlerHandle {
175 pub(crate) ev_kind: HandlerKind,
176 pub(crate) ev_type: Option<StaticEventTypePart>,
177 pub(crate) room_id: Option<OwnedRoomId>,
178 pub(crate) handler_id: u64,
179}
180
181#[derive(Clone, Copy, Debug)]
183pub(crate) enum StaticEventTypePart {
184 Full(&'static str),
186 Prefix(&'static str),
188}
189
190pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
224 #[doc(hidden)]
226 type Future: EventHandlerFuture;
227
228 #[doc(hidden)]
235 fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
236}
237
238#[doc(hidden)]
239pub trait EventHandlerFuture:
240 Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
241{
242 type Output: EventHandlerResult;
243}
244
245impl<T> EventHandlerFuture for T
246where
247 T: Future + SendOutsideWasm + 'static,
248 <T as Future>::Output: EventHandlerResult,
249{
250 type Output = <T as Future>::Output;
251}
252
253#[doc(hidden)]
254#[derive(Debug)]
255pub struct EventHandlerData<'a> {
256 client: Client,
257 room: Option<Room>,
258 raw: &'a RawJsonValue,
259 encryption_info: Option<&'a EncryptionInfo>,
260 push_actions: &'a [Action],
261 handle: EventHandlerHandle,
262}
263
264pub trait EventHandlerResult: Sized {
268 #[doc(hidden)]
269 fn print_error(&self, event_type: Option<&str>);
270}
271
272impl EventHandlerResult for () {
273 fn print_error(&self, _event_type: Option<&str>) {}
274}
275
276impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
277 fn print_error(&self, event_type: Option<&str>) {
278 let msg_fragment = match event_type {
279 Some(event_type) => format!(" for `{event_type}`"),
280 None => "".to_owned(),
281 };
282
283 match self {
284 #[cfg(feature = "anyhow")]
285 Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
286 error!("Event handler{msg_fragment} failed: {e:?}");
287 }
288 #[cfg(feature = "eyre")]
289 Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
290 error!("Event handler{msg_fragment} failed: {e:?}");
291 }
292 Err(e) => {
293 error!("Event handler{msg_fragment} failed: {e}");
294 }
295 Ok(_) => {}
296 }
297 }
298}
299
300#[derive(Deserialize)]
301struct UnsignedDetails {
302 redacted_because: Option<serde::de::IgnoredAny>,
303}
304
305impl Client {
307 pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
308 &self,
309 handler: H,
310 room_id: Option<OwnedRoomId>,
311 ) -> EventHandlerHandle
312 where
313 Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
314 H: EventHandler<Ev, Ctx>,
315 {
316 let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
317 let maybe_fut = serde_json::from_str(data.raw.get())
318 .map(|ev| handler.clone().handle_event(ev, data));
319
320 Box::pin(async move {
321 match maybe_fut {
322 Ok(Some(fut)) => {
323 fut.await.print_error(Ev::TYPE);
324 }
325 Ok(None) => {
326 error!(
327 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
328 "Event handler has an invalid context argument",
329 );
330 }
331 Err(e) => {
332 warn!(
333 event_type = Ev::TYPE, event_kind = ?Ev::KIND,
334 "Failed to deserialize event, skipping event handler.\n
335 Deserialization error: {e}",
336 );
337 }
338 }
339 })
340 });
341
342 let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
343 let ev_type = Ev::TYPE.map(|ev_type| {
344 if Ev::IsPrefix::as_bool() {
345 StaticEventTypePart::Prefix(ev_type)
346 } else {
347 StaticEventTypePart::Full(ev_type)
348 }
349 });
350 let handle = EventHandlerHandle { ev_kind: Ev::KIND, ev_type, room_id, handler_id };
351
352 self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
353
354 handle
355 }
356
357 pub(crate) async fn handle_sync_events<T>(
358 &self,
359 kind: HandlerKind,
360 room: Option<&Room>,
361 events: &[Raw<T>],
362 ) -> serde_json::Result<()> {
363 #[derive(Deserialize)]
364 struct ExtractType<'a> {
365 #[serde(borrow, rename = "type")]
366 event_type: Cow<'a, str>,
367 }
368
369 for raw_event in events {
370 let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
371 self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
372 }
373
374 Ok(())
375 }
376
377 pub(crate) async fn handle_sync_to_device_events(
378 &self,
379 events: &[ProcessedToDeviceEvent],
380 ) -> serde_json::Result<()> {
381 #[derive(Deserialize)]
382 struct ExtractType<'a> {
383 #[serde(borrow, rename = "type")]
384 event_type: Cow<'a, str>,
385 }
386
387 for processed_to_device in events {
388 let (raw_event, encryption_info) = match processed_to_device {
389 ProcessedToDeviceEvent::Decrypted { raw, encryption_info } => {
390 (raw, Some(encryption_info))
391 }
392 other => (&other.to_raw(), None),
393 };
394 let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
395 self.call_event_handlers(
396 None,
397 raw_event.json(),
398 HandlerKind::ToDevice,
399 &event_type,
400 encryption_info,
401 &[],
402 )
403 .await;
404 }
405
406 Ok(())
407 }
408
409 pub(crate) async fn handle_sync_state_events(
410 &self,
411 room: Option<&Room>,
412 state: &State,
413 ) -> serde_json::Result<()> {
414 #[derive(Deserialize)]
415 struct StateEventDetails<'a> {
416 #[serde(borrow, rename = "type")]
417 event_type: Cow<'a, str>,
418 unsigned: Option<UnsignedDetails>,
419 }
420
421 let state_events = match state {
422 State::Before(events) => events,
423 State::After(events) => events,
424 };
425
426 self.handle_sync_events(HandlerKind::State, room, state_events).await?;
428
429 for raw_event in state_events {
431 let StateEventDetails { event_type, unsigned } =
432 raw_event.deserialize_as_unchecked()?;
433 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
434 let handler_kind = HandlerKind::state_redacted(redacted);
435
436 self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
437 .await;
438 }
439
440 Ok(())
441 }
442
443 pub(crate) async fn handle_sync_timeline_events(
444 &self,
445 room: Option<&Room>,
446 timeline_events: &[TimelineEvent],
447 ) -> serde_json::Result<()> {
448 #[derive(Deserialize)]
449 struct TimelineEventDetails<'a> {
450 #[serde(borrow, rename = "type")]
451 event_type: Cow<'a, str>,
452 state_key: Option<serde::de::IgnoredAny>,
453 unsigned: Option<UnsignedDetails>,
454 }
455
456 for item in timeline_events {
457 let TimelineEventDetails { event_type, state_key, unsigned } =
458 item.raw().deserialize_as_unchecked()?;
459
460 let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
461 let (handler_kind_g, handler_kind_r) = match state_key {
462 Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
463 None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
464 };
465
466 let raw_event = item.raw().json();
467 let encryption_info = item.encryption_info().map(|i| &**i);
468 let push_actions = item.push_actions().unwrap_or(&[]);
469
470 self.call_event_handlers(
472 room,
473 raw_event,
474 handler_kind_g,
475 &event_type,
476 encryption_info,
477 push_actions,
478 )
479 .await;
480
481 self.call_event_handlers(
483 room,
484 raw_event,
485 handler_kind_r,
486 &event_type,
487 encryption_info,
488 push_actions,
489 )
490 .await;
491
492 let kind = HandlerKind::Timeline;
494 self.call_event_handlers(
495 room,
496 raw_event,
497 kind,
498 &event_type,
499 encryption_info,
500 push_actions,
501 )
502 .await;
503 }
504
505 Ok(())
506 }
507
508 #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
509 async fn call_event_handlers(
510 &self,
511 room: Option<&Room>,
512 raw: &RawJsonValue,
513 event_kind: HandlerKind,
514 event_type: &str,
515 encryption_info: Option<&EncryptionInfo>,
516 push_actions: &[Action],
517 ) {
518 let room_id = room.map(|r| r.room_id());
519 if let Some(room_id) = room_id {
520 tracing::Span::current().record("room_id", debug(room_id));
521 }
522
523 let mut futures: FuturesUnordered<_> = self
525 .inner
526 .event_handlers
527 .handlers
528 .read()
529 .unwrap()
530 .get_handlers(event_kind, event_type, room_id)
531 .map(|(handle, handler_fn)| {
532 let data = EventHandlerData {
533 client: self.clone(),
534 room: room.cloned(),
535 raw,
536 encryption_info,
537 push_actions,
538 handle,
539 };
540
541 (handler_fn)(data)
542 })
543 .collect();
544
545 if !futures.is_empty() {
546 debug!(amount = futures.len(), "Calling event handlers");
547
548 while let Some(()) = futures.next().await {}
551 }
552 }
553}
554
555#[derive(Debug)]
560pub struct EventHandlerDropGuard {
561 handle: EventHandlerHandle,
562 client: Client,
563}
564
565impl EventHandlerDropGuard {
566 pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
567 Self { handle, client }
568 }
569}
570
571impl Drop for EventHandlerDropGuard {
572 fn drop(&mut self) {
573 self.client.remove_event_handler(self.handle.clone());
574 }
575}
576
577macro_rules! impl_event_handler {
578 ($($ty:ident),* $(,)?) => {
579 impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
580 where
581 Ev: SyncEvent,
582 Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
583 Fut: EventHandlerFuture,
584 $($ty: EventHandlerContext),*
585 {
586 type Future = Fut;
587
588 fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
589 Some((self)(ev, $($ty::from_data(&_d)?),*))
590 }
591 }
592 };
593}
594
595impl_event_handler!();
596impl_event_handler!(A);
597impl_event_handler!(A, B);
598impl_event_handler!(A, B, C);
599impl_event_handler!(A, B, C, D);
600impl_event_handler!(A, B, C, D, E);
601impl_event_handler!(A, B, C, D, E, F);
602impl_event_handler!(A, B, C, D, E, F, G);
603impl_event_handler!(A, B, C, D, E, F, G, H);
604
605#[derive(Debug)]
613pub struct ObservableEventHandler<T> {
614 shared_observable: SharedObservable<Option<T>>,
619
620 event_handler_guard: Arc<EventHandlerDropGuard>,
627}
628
629impl<T> ObservableEventHandler<T> {
630 pub(crate) fn new(
631 shared_observable: SharedObservable<Option<T>>,
632 event_handler_guard: EventHandlerDropGuard,
633 ) -> Self {
634 Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
635 }
636
637 pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
642 EventHandlerSubscriber::new(
643 self.shared_observable.subscribe(),
644 Arc::downgrade(&self.event_handler_guard),
647 )
648 }
649}
650
651pin_project! {
652 #[derive(Debug)]
661 pub struct EventHandlerSubscriber<T> {
662 #[pin]
668 subscriber: Subscriber<Option<T>>,
669
670 event_handler_guard: Weak<EventHandlerDropGuard>,
676 }
677}
678
679impl<T> EventHandlerSubscriber<T> {
680 fn new(
681 subscriber: Subscriber<Option<T>>,
682 event_handler_handle: Weak<EventHandlerDropGuard>,
683 ) -> Self {
684 Self { subscriber, event_handler_guard: event_handler_handle }
685 }
686}
687
688impl<T> Stream for EventHandlerSubscriber<T>
689where
690 T: Clone,
691{
692 type Item = T;
693
694 fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
695 let mut this = self.project();
696
697 let Some(_) = this.event_handler_guard.upgrade() else {
698 return Poll::Ready(None);
702 };
703
704 loop {
714 match this.subscriber.as_mut().poll_next(context) {
715 Poll::Ready(None) => return Poll::Ready(None),
717
718 Poll::Ready(Some(None)) => {
721 continue;
723 }
724
725 Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
727
728 Poll::Pending => return Poll::Pending,
730 }
731 }
732 }
733}
734
735#[cfg(test)]
736mod tests {
737 use matrix_sdk_test::{
738 async_test,
739 event_factory::{EventFactory, PreviousMembership},
740 InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID,
741 };
742 use serde::Serialize;
743 use stream_assert::{assert_closed, assert_pending, assert_ready};
744 #[cfg(target_family = "wasm")]
745 wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
746 use std::{
747 future,
748 sync::{
749 atomic::{AtomicU8, Ordering::SeqCst},
750 Arc,
751 },
752 };
753
754 use assert_matches2::assert_let;
755 use matrix_sdk_common::{deserialized_responses::EncryptionInfo, locks::Mutex};
756 use matrix_sdk_test::{StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder};
757 use once_cell::sync::Lazy;
758 use ruma::{
759 event_id,
760 events::{
761 macros::EventContent,
762 room::{
763 member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
764 name::OriginalSyncRoomNameEvent,
765 power_levels::OriginalSyncRoomPowerLevelsEvent,
766 },
767 secret_storage::key::SecretStorageKeyEvent,
768 typing::SyncTypingEvent,
769 AnySyncStateEvent, AnySyncTimelineEvent, AnyToDeviceEvent,
770 },
771 room_id,
772 serde::Raw,
773 user_id,
774 };
775 use serde_json::json;
776
777 use crate::{
778 event_handler::Ctx,
779 test_utils::{logged_in_client, no_retry_test_client},
780 Client, Room,
781 };
782
783 static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
784 EventFactory::new()
785 .member(user_id!("@example:localhost"))
786 .membership(MembershipState::Join)
787 .display_name("example")
788 .event_id(event_id!("$151800140517rfvjc:localhost"))
789 .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
790 .into()
791 });
792
793 #[async_test]
794 async fn test_add_event_handler() -> crate::Result<()> {
795 let client = logged_in_client(None).await;
796
797 let member_count = Arc::new(AtomicU8::new(0));
798 let typing_count = Arc::new(AtomicU8::new(0));
799 let power_levels_count = Arc::new(AtomicU8::new(0));
800 let invited_member_count = Arc::new(AtomicU8::new(0));
801
802 client.add_event_handler({
803 let member_count = member_count.clone();
804 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
805 member_count.fetch_add(1, SeqCst);
806 }
807 });
808 client.add_event_handler({
809 let typing_count = typing_count.clone();
810 move |_ev: SyncTypingEvent| async move {
811 typing_count.fetch_add(1, SeqCst);
812 }
813 });
814 client.add_event_handler({
815 let power_levels_count = power_levels_count.clone();
816 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
817 power_levels_count.fetch_add(1, SeqCst);
818 }
819 });
820 client.add_event_handler({
821 let invited_member_count = invited_member_count.clone();
822 move |_ev: StrippedRoomMemberEvent| async move {
823 invited_member_count.fetch_add(1, SeqCst);
824 }
825 });
826
827 let f = EventFactory::new();
828 let response = SyncResponseBuilder::default()
829 .add_joined_room(
830 JoinedRoomBuilder::default()
831 .add_timeline_event(MEMBER_EVENT.clone())
832 .add_typing(
833 f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
834 )
835 .add_state_event(StateTestEvent::PowerLevels),
836 )
837 .add_invited_room(
838 InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
839 StrippedStateTestEvent::Custom(json!({
840 "content": {
841 "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
842 "displayname": "Alice",
843 "membership": "invite",
844 },
845 "event_id": "$143273582443PhrSn:example.org",
846 "origin_server_ts": 1432735824653u64,
847 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
848 "sender": "@example:example.org",
849 "state_key": "@alice:example.org",
850 "type": "m.room.member",
851 "unsigned": {
852 "age": 1234,
853 "invite_room_state": [
854 {
855 "content": {
856 "name": "Example Room"
857 },
858 "sender": "@bob:example.org",
859 "state_key": "",
860 "type": "m.room.name"
861 },
862 {
863 "content": {
864 "join_rule": "invite"
865 },
866 "sender": "@bob:example.org",
867 "state_key": "",
868 "type": "m.room.join_rules"
869 }
870 ]
871 }
872 })),
873 ),
874 )
875 .build_sync_response();
876 client.process_sync(response).await?;
877
878 assert_eq!(member_count.load(SeqCst), 1);
879 assert_eq!(typing_count.load(SeqCst), 1);
880 assert_eq!(power_levels_count.load(SeqCst), 1);
881 assert_eq!(invited_member_count.load(SeqCst), 1);
882
883 Ok(())
884 }
885
886 #[async_test]
887 #[allow(dependency_on_unit_never_type_fallback)]
888 async fn test_add_to_device_event_handler() -> crate::Result<()> {
889 let client = logged_in_client(None).await;
890
891 let captured_event: Arc<Mutex<Option<AnyToDeviceEvent>>> = Arc::new(Mutex::new(None));
892 let captured_info: Arc<Mutex<Option<EncryptionInfo>>> = Arc::new(Mutex::new(None));
893
894 client.add_event_handler({
895 let captured = captured_event.clone();
896 let captured_info = captured_info.clone();
897 move |ev: AnyToDeviceEvent, encryption_info: Option<EncryptionInfo>| {
898 let mut captured_lock = captured.lock();
899 *captured_lock = Some(ev);
900 let mut captured_info_lock = captured_info.lock();
901 *captured_info_lock = encryption_info;
902 future::ready(())
903 }
904 });
905
906 let response = SyncResponseBuilder::default()
907 .add_to_device_event(json!({
908 "sender": "@alice:example.com",
909 "type": "m.custom.to.device.type",
910 "content": {
911 "a": "test",
912 }
913 }))
914 .build_sync_response();
915 client.process_sync(response).await?;
916
917 let captured = captured_event.lock().clone();
918 assert_let!(Some(received_event) = captured);
919 assert_eq!(received_event.event_type().to_string(), "m.custom.to.device.type");
920 let info = captured_info.lock().clone();
921 assert!(info.is_none());
922 Ok(())
923 }
924
925 #[async_test]
926 #[allow(dependency_on_unit_never_type_fallback)]
927 async fn test_add_room_event_handler() -> crate::Result<()> {
928 let client = logged_in_client(None).await;
929
930 let room_id_a = room_id!("!foo:example.org");
931 let room_id_b = room_id!("!bar:matrix.org");
932
933 let member_count = Arc::new(AtomicU8::new(0));
934 let power_levels_count = Arc::new(AtomicU8::new(0));
935
936 client.add_room_event_handler(room_id_a, {
938 let member_count = member_count.clone();
939 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
940 member_count.fetch_add(1, SeqCst);
941 future::ready(())
942 }
943 });
944 client.add_room_event_handler(room_id_b, {
945 let member_count = member_count.clone();
946 move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
947 member_count.fetch_add(1, SeqCst);
948 future::ready(())
949 }
950 });
951
952 client.add_room_event_handler(room_id_a, {
954 let power_levels_count = power_levels_count.clone();
955 move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
956 power_levels_count.fetch_add(1, SeqCst);
957 future::ready(())
958 }
959 });
960
961 client.add_room_event_handler(room_id_b, move |_ev: OriginalSyncRoomNameEvent| async {
963 unreachable!("No room event in room B")
964 });
965
966 let response = SyncResponseBuilder::default()
967 .add_joined_room(
968 JoinedRoomBuilder::new(room_id_a)
969 .add_timeline_event(MEMBER_EVENT.clone())
970 .add_state_event(StateTestEvent::PowerLevels)
971 .add_state_event(StateTestEvent::RoomName),
972 )
973 .add_joined_room(
974 JoinedRoomBuilder::new(room_id_b)
975 .add_timeline_event(MEMBER_EVENT.clone())
976 .add_state_event(StateTestEvent::PowerLevels),
977 )
978 .build_sync_response();
979 client.process_sync(response).await?;
980
981 assert_eq!(member_count.load(SeqCst), 2);
982 assert_eq!(power_levels_count.load(SeqCst), 1);
983
984 Ok(())
985 }
986
987 #[async_test]
988 #[allow(dependency_on_unit_never_type_fallback)]
989 async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
990 let client = logged_in_client(None).await;
991
992 client.add_event_handler(
993 |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
994 );
995
996 Ok(())
999 }
1000
1001 #[async_test]
1002 #[allow(dependency_on_unit_never_type_fallback)]
1003 async fn test_remove_event_handler() -> crate::Result<()> {
1004 let client = logged_in_client(None).await;
1005
1006 let member_count = Arc::new(AtomicU8::new(0));
1007
1008 client.add_event_handler({
1009 let member_count = member_count.clone();
1010 move |_ev: OriginalSyncRoomMemberEvent| async move {
1011 member_count.fetch_add(1, SeqCst);
1012 }
1013 });
1014
1015 let handle_a = client.add_event_handler(move |_ev: OriginalSyncRoomMemberEvent| async {
1016 panic!("handler should have been removed");
1017 });
1018 let handle_b = client.add_room_event_handler(
1019 #[allow(unknown_lints, clippy::explicit_auto_deref)] *DEFAULT_TEST_ROOM_ID,
1021 move |_ev: OriginalSyncRoomMemberEvent| async {
1022 panic!("handler should have been removed");
1023 },
1024 );
1025
1026 client.add_event_handler({
1027 let member_count = member_count.clone();
1028 move |_ev: OriginalSyncRoomMemberEvent| async move {
1029 member_count.fetch_add(1, SeqCst);
1030 }
1031 });
1032
1033 let response = SyncResponseBuilder::default()
1034 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1035 .build_sync_response();
1036
1037 client.remove_event_handler(handle_a);
1038 client.remove_event_handler(handle_b);
1039
1040 client.process_sync(response).await?;
1041
1042 assert_eq!(member_count.load(SeqCst), 2);
1043
1044 Ok(())
1045 }
1046
1047 #[async_test]
1048 async fn test_event_handler_drop_guard() {
1049 let client = no_retry_test_client(None).await;
1050
1051 let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
1052 assert_eq!(client.inner.event_handlers.len(), 1);
1053
1054 {
1055 let _guard = client.event_handler_drop_guard(handle);
1056 assert_eq!(client.inner.event_handlers.len(), 1);
1057 }
1059
1060 assert_eq!(client.inner.event_handlers.len(), 0);
1061 }
1062
1063 #[async_test]
1064 async fn test_use_client_in_handler() {
1065 let client = no_retry_test_client(None).await;
1069
1070 client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
1071 let _caps = client.get_capabilities().await.map_err(|e| anyhow::anyhow!("{}", e))?;
1075 anyhow::Ok(())
1076 });
1077 }
1078
1079 #[async_test]
1080 async fn test_raw_event_handler() -> crate::Result<()> {
1081 let client = logged_in_client(None).await;
1082 let counter = Arc::new(AtomicU8::new(0));
1083 client.add_event_handler_context(counter.clone());
1084 client.add_event_handler(
1085 |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
1086 counter.fetch_add(1, SeqCst);
1087 },
1088 );
1089
1090 let response = SyncResponseBuilder::default()
1091 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1092 .build_sync_response();
1093 client.process_sync(response).await?;
1094
1095 assert_eq!(counter.load(SeqCst), 1);
1096 Ok(())
1097 }
1098
1099 #[async_test]
1100 async fn test_enum_event_handler() -> crate::Result<()> {
1101 let client = logged_in_client(None).await;
1102 let counter = Arc::new(AtomicU8::new(0));
1103 client.add_event_handler_context(counter.clone());
1104 client.add_event_handler(
1105 |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
1106 counter.fetch_add(1, SeqCst);
1107 },
1108 );
1109
1110 let response = SyncResponseBuilder::default()
1111 .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1112 .build_sync_response();
1113 client.process_sync(response).await?;
1114
1115 assert_eq!(counter.load(SeqCst), 1);
1116 Ok(())
1117 }
1118
1119 #[async_test]
1120 #[allow(dependency_on_unit_never_type_fallback)]
1121 async fn test_observe_events() -> crate::Result<()> {
1122 let client = logged_in_client(None).await;
1123
1124 let room_id_0 = room_id!("!r0.matrix.org");
1125 let room_id_1 = room_id!("!r1.matrix.org");
1126
1127 let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1128
1129 let mut subscriber = observable.subscribe();
1130
1131 assert_pending!(subscriber);
1132
1133 let mut response_builder = SyncResponseBuilder::new();
1134 let response = response_builder
1135 .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1136 StateTestEvent::Custom(json!({
1137 "content": {
1138 "name": "Name 0"
1139 },
1140 "event_id": "$ev0",
1141 "origin_server_ts": 1,
1142 "sender": "@mnt_io:matrix.org",
1143 "state_key": "",
1144 "type": "m.room.name",
1145 "unsigned": {
1146 "age": 1,
1147 }
1148 })),
1149 ))
1150 .build_sync_response();
1151 client.process_sync(response).await?;
1152
1153 let (room_name, room) = assert_ready!(subscriber);
1154
1155 assert_eq!(room_name.event_id.as_str(), "$ev0");
1156 assert_eq!(room.room_id(), room_id_0);
1157 assert_eq!(room.name().unwrap(), "Name 0");
1158
1159 assert_pending!(subscriber);
1160
1161 let response = response_builder
1162 .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1163 StateTestEvent::Custom(json!({
1164 "content": {
1165 "name": "Name 1"
1166 },
1167 "event_id": "$ev1",
1168 "origin_server_ts": 2,
1169 "sender": "@mnt_io:matrix.org",
1170 "state_key": "",
1171 "type": "m.room.name",
1172 "unsigned": {
1173 "age": 2,
1174 }
1175 })),
1176 ))
1177 .build_sync_response();
1178 client.process_sync(response).await?;
1179
1180 let (room_name, room) = assert_ready!(subscriber);
1181
1182 assert_eq!(room_name.event_id.as_str(), "$ev1");
1183 assert_eq!(room.room_id(), room_id_1);
1184 assert_eq!(room.name().unwrap(), "Name 1");
1185
1186 assert_pending!(subscriber);
1187
1188 drop(observable);
1189 assert_closed!(subscriber);
1190
1191 Ok(())
1192 }
1193
1194 #[async_test]
1195 #[allow(dependency_on_unit_never_type_fallback)]
1196 async fn test_observe_room_events() -> crate::Result<()> {
1197 let client = logged_in_client(None).await;
1198
1199 let room_id = room_id!("!r0.matrix.org");
1200
1201 let observable_for_room =
1202 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1203
1204 let mut subscriber_for_room = observable_for_room.subscribe();
1205
1206 assert_pending!(subscriber_for_room);
1207
1208 let mut response_builder = SyncResponseBuilder::new();
1209 let response = response_builder
1210 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1211 StateTestEvent::Custom(json!({
1212 "content": {
1213 "name": "Name 0"
1214 },
1215 "event_id": "$ev0",
1216 "origin_server_ts": 1,
1217 "sender": "@mnt_io:matrix.org",
1218 "state_key": "",
1219 "type": "m.room.name",
1220 "unsigned": {
1221 "age": 1,
1222 }
1223 })),
1224 ))
1225 .build_sync_response();
1226 client.process_sync(response).await?;
1227
1228 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1229
1230 assert_eq!(room_name.event_id.as_str(), "$ev0");
1231 assert_eq!(room.name().unwrap(), "Name 0");
1232
1233 assert_pending!(subscriber_for_room);
1234
1235 let response = response_builder
1236 .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1237 StateTestEvent::Custom(json!({
1238 "content": {
1239 "name": "Name 1"
1240 },
1241 "event_id": "$ev1",
1242 "origin_server_ts": 2,
1243 "sender": "@mnt_io:matrix.org",
1244 "state_key": "",
1245 "type": "m.room.name",
1246 "unsigned": {
1247 "age": 2,
1248 }
1249 })),
1250 ))
1251 .build_sync_response();
1252 client.process_sync(response).await?;
1253
1254 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1255
1256 assert_eq!(room_name.event_id.as_str(), "$ev1");
1257 assert_eq!(room.name().unwrap(), "Name 1");
1258
1259 assert_pending!(subscriber_for_room);
1260
1261 drop(observable_for_room);
1262 assert_closed!(subscriber_for_room);
1263
1264 Ok(())
1265 }
1266
1267 #[async_test]
1268 async fn test_observe_several_room_events() -> crate::Result<()> {
1269 let client = logged_in_client(None).await;
1270
1271 let room_id = room_id!("!r0.matrix.org");
1272
1273 let observable_for_room =
1274 client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1275
1276 let mut subscriber_for_room = observable_for_room.subscribe();
1277
1278 assert_pending!(subscriber_for_room);
1279
1280 let mut response_builder = SyncResponseBuilder::new();
1281 let response = response_builder
1282 .add_joined_room(
1283 JoinedRoomBuilder::new(room_id)
1284 .add_state_event(StateTestEvent::Custom(json!({
1285 "content": {
1286 "name": "Name 0"
1287 },
1288 "event_id": "$ev0",
1289 "origin_server_ts": 1,
1290 "sender": "@mnt_io:matrix.org",
1291 "state_key": "",
1292 "type": "m.room.name",
1293 "unsigned": {
1294 "age": 1,
1295 }
1296 })))
1297 .add_state_event(StateTestEvent::Custom(json!({
1298 "content": {
1299 "name": "Name 1"
1300 },
1301 "event_id": "$ev1",
1302 "origin_server_ts": 2,
1303 "sender": "@mnt_io:matrix.org",
1304 "state_key": "",
1305 "type": "m.room.name",
1306 "unsigned": {
1307 "age": 1,
1308 }
1309 })))
1310 .add_state_event(StateTestEvent::Custom(json!({
1311 "content": {
1312 "name": "Name 2"
1313 },
1314 "event_id": "$ev2",
1315 "origin_server_ts": 3,
1316 "sender": "@mnt_io:matrix.org",
1317 "state_key": "",
1318 "type": "m.room.name",
1319 "unsigned": {
1320 "age": 1,
1321 }
1322 }))),
1323 )
1324 .build_sync_response();
1325 client.process_sync(response).await?;
1326
1327 let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1328
1329 assert_eq!(room_name.event_id.as_str(), "$ev2");
1331 assert_eq!(room.name().unwrap(), "Name 2");
1332
1333 assert_pending!(subscriber_for_room);
1334
1335 drop(observable_for_room);
1336 assert_closed!(subscriber_for_room);
1337
1338 Ok(())
1339 }
1340
1341 #[async_test]
1342 #[allow(dependency_on_unit_never_type_fallback)]
1343 async fn test_observe_events_with_type_prefix() -> crate::Result<()> {
1344 let client = logged_in_client(None).await;
1345
1346 let observable = client.observe_events::<SecretStorageKeyEvent, ()>();
1347
1348 let mut subscriber = observable.subscribe();
1349
1350 assert_pending!(subscriber);
1351
1352 let mut response_builder = SyncResponseBuilder::new();
1353 let response = response_builder
1354 .add_custom_global_account_data(json!({
1355 "content": {
1356 "algorithm": "m.secret_storage.v1.aes-hmac-sha2",
1357 "iv": "gH2iNpiETFhApvW6/FFEJQ",
1358 "mac": "9Lw12m5SKDipNghdQXKjgpfdj1/K7HFI2brO+UWAGoM",
1359 "passphrase": {
1360 "algorithm": "m.pbkdf2",
1361 "salt": "IuLnH7S85YtZmkkBJKwNUKxWF42g9O1H",
1362 "iterations": 10,
1363 },
1364 },
1365 "type": "m.secret_storage.key.foobar",
1366 }))
1367 .build_sync_response();
1368 client.process_sync(response).await?;
1369
1370 let (secret_storage_key, ()) = assert_ready!(subscriber);
1371
1372 assert_eq!(secret_storage_key.content.key_id, "foobar");
1373
1374 assert_pending!(subscriber);
1375
1376 drop(observable);
1377 assert_closed!(subscriber);
1378
1379 Ok(())
1380 }
1381
1382 #[async_test]
1383 #[allow(dependency_on_unit_never_type_fallback)]
1384 async fn test_observe_room_events_with_type_prefix() -> crate::Result<()> {
1385 #[derive(Debug, Clone, EventContent, Serialize)]
1389 #[ruma_event(type = "fake.event.*", kind = RoomAccountData)]
1390 struct AccountDataWithPrefixEventContent {
1391 #[ruma_event(type_fragment)]
1392 #[serde(skip)]
1393 key_id: String,
1394 }
1395
1396 let room_id = room_id!("!r0.matrix.org");
1397 let client = logged_in_client(None).await;
1398
1399 let observable = client.observe_room_events::<AccountDataWithPrefixEvent, Room>(room_id);
1400
1401 let mut subscriber = observable.subscribe();
1402
1403 assert_pending!(subscriber);
1404
1405 let mut response_builder = SyncResponseBuilder::new();
1406 let response = response_builder
1407 .add_joined_room(
1408 JoinedRoomBuilder::new(room_id).add_account_data_bulk([Raw::new(&json!({
1409 "content": {},
1410 "type": "fake.event.foobar",
1411 }))
1412 .unwrap()
1413 .cast_unchecked()]),
1414 )
1415 .build_sync_response();
1416 client.process_sync(response).await?;
1417
1418 let (secret_storage_key, _room) = assert_ready!(subscriber);
1419
1420 assert_eq!(secret_storage_key.content.key_id, "foobar");
1421
1422 assert_pending!(subscriber);
1423
1424 drop(observable);
1425 assert_closed!(subscriber);
1426
1427 Ok(())
1428 }
1429}