matrix_sdk/event_handler/
mod.rs

1// Copyright 2021 Jonas Platte
2// Copyright 2022 Famedly GmbH
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types and traits related for event handlers. For usage, see
17//! [`Client::add_event_handler`].
18//!
19//! ### How it works
20//!
21//! The `add_event_handler` method registers event handlers of different
22//! signatures by actually storing boxed closures that all have the same
23//! signature of `async (EventHandlerData) -> ()` where `EventHandlerData` is a
24//! private type that contains all of the data an event handler *might* need.
25//!
26//! The stored closure takes care of deserializing the event which the
27//! `EventHandlerData` contains as a (borrowed) [`serde_json::value::RawValue`],
28//! extracting the context arguments from other fields of `EventHandlerData` and
29//! calling / `.await`ing the event handler if the previous steps succeeded.
30//! It also logs any errors from the above chain of function calls.
31//!
32//! For more details, see the [`EventHandler`] trait.
33
34#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37    borrow::Cow,
38    fmt,
39    future::Future,
40    pin::Pin,
41    sync::{
42        atomic::{AtomicU64, Ordering::SeqCst},
43        Arc, RwLock, Weak,
44    },
45    task::{Context, Poll},
46};
47
48use anymap2::any::CloneAnySendSync;
49use eyeball::{SharedObservable, Subscriber};
50use futures_core::Stream;
51use futures_util::stream::{FuturesUnordered, StreamExt};
52use matrix_sdk_base::{
53    deserialized_responses::{EncryptionInfo, TimelineEvent},
54    SendOutsideWasm, SyncOutsideWasm,
55};
56use pin_project_lite::pin_project;
57use ruma::{events::AnySyncStateEvent, push::Action, serde::Raw, OwnedRoomId};
58use serde::{de::DeserializeOwned, Deserialize};
59use serde_json::value::RawValue as RawJsonValue;
60use tracing::{debug, error, field::debug, instrument, warn};
61
62use self::maps::EventHandlerMaps;
63use crate::{Client, Room};
64
65mod context;
66mod maps;
67mod static_events;
68
69pub use self::context::{Ctx, EventHandlerContext, RawEvent};
70
71#[cfg(not(target_arch = "wasm32"))]
72type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
73#[cfg(target_arch = "wasm32")]
74type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
75
76#[cfg(not(target_arch = "wasm32"))]
77type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
78#[cfg(target_arch = "wasm32")]
79type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
80
81type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
82
83#[derive(Default)]
84pub(crate) struct EventHandlerStore {
85    handlers: RwLock<EventHandlerMaps>,
86    context: RwLock<AnyMap>,
87    counter: AtomicU64,
88}
89
90impl EventHandlerStore {
91    pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
92        self.handlers.write().unwrap().add(handle, handler_fn);
93    }
94
95    pub fn add_context<T>(&self, ctx: T)
96    where
97        T: Clone + Send + Sync + 'static,
98    {
99        self.context.write().unwrap().insert(ctx);
100    }
101
102    pub fn remove(&self, handle: EventHandlerHandle) {
103        self.handlers.write().unwrap().remove(handle);
104    }
105
106    #[cfg(test)]
107    fn len(&self) -> usize {
108        self.handlers.read().unwrap().len()
109    }
110}
111
112#[doc(hidden)]
113#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
114pub enum HandlerKind {
115    GlobalAccountData,
116    RoomAccountData,
117    EphemeralRoomData,
118    Timeline,
119    MessageLike,
120    OriginalMessageLike,
121    RedactedMessageLike,
122    State,
123    OriginalState,
124    RedactedState,
125    StrippedState,
126    ToDevice,
127    Presence,
128}
129
130impl HandlerKind {
131    fn message_like_redacted(redacted: bool) -> Self {
132        if redacted {
133            Self::RedactedMessageLike
134        } else {
135            Self::OriginalMessageLike
136        }
137    }
138
139    fn state_redacted(redacted: bool) -> Self {
140        if redacted {
141            Self::RedactedState
142        } else {
143            Self::OriginalState
144        }
145    }
146}
147
148/// A statically-known event kind/type that can be retrieved from an event sync.
149pub trait SyncEvent {
150    #[doc(hidden)]
151    const KIND: HandlerKind;
152    #[doc(hidden)]
153    const TYPE: Option<&'static str>;
154}
155
156pub(crate) struct EventHandlerWrapper {
157    handler_fn: Box<EventHandlerFn>,
158    pub handler_id: u64,
159}
160
161/// Handle to remove a registered event handler by passing it to
162/// [`Client::remove_event_handler`].
163#[derive(Clone, Debug)]
164pub struct EventHandlerHandle {
165    pub(crate) ev_kind: HandlerKind,
166    pub(crate) ev_type: Option<&'static str>,
167    pub(crate) room_id: Option<OwnedRoomId>,
168    pub(crate) handler_id: u64,
169}
170
171/// Interface for event handlers.
172///
173/// This trait is an abstraction for a certain kind of functions / closures,
174/// specifically:
175///
176/// * They must have at least one argument, which is the event itself, a type
177///   that implements [`SyncEvent`]. Any additional arguments need to implement
178///   the [`EventHandlerContext`] trait.
179/// * Their return type has to be one of: `()`, `Result<(), impl Display + Debug
180///   + 'static>` (if you are using `anyhow::Result` or `eyre::Result` you can
181///   additionally enable the `anyhow` / `eyre` feature to get the verbose
182///   `Debug` output printed on error)
183///
184/// ### How it works
185///
186/// This trait is basically a very constrained version of `Fn`: It requires at
187/// least one argument, which is represented as its own generic parameter `Ev`
188/// with the remaining parameter types being represented by the second generic
189/// parameter `Ctx`; they have to be stuffed into one generic parameter as a
190/// tuple because Rust doesn't have variadic generics.
191///
192/// `Ev` and `Ctx` are generic parameters rather than associated types because
193/// the argument list is a generic parameter for the `Fn` traits too, so a
194/// single type could implement `Fn` multiple times with different argument
195/// lists¹. Luckily, when calling [`Client::add_event_handler`] with a
196/// closure argument the trait solver takes into account that only a single one
197/// of the implementations applies (even though this could theoretically change
198/// through a dependency upgrade) and uses that rather than raising an ambiguity
199/// error. This is the same trick used by web frameworks like actix-web and
200/// axum.
201///
202/// ¹ the only thing stopping such types from existing in stable Rust is that
203/// all manual implementations of the `Fn` traits require a Nightly feature
204pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
205    /// The future returned by `handle_event`.
206    #[doc(hidden)]
207    type Future: EventHandlerFuture;
208
209    /// Create a future for handling the given event.
210    ///
211    /// `data` provides additional data about the event, for example the room it
212    /// appeared in.
213    ///
214    /// Returns `None` if one of the context extractors failed.
215    #[doc(hidden)]
216    fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
217}
218
219#[doc(hidden)]
220pub trait EventHandlerFuture:
221    Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
222{
223    type Output: EventHandlerResult;
224}
225
226impl<T> EventHandlerFuture for T
227where
228    T: Future + SendOutsideWasm + 'static,
229    <T as Future>::Output: EventHandlerResult,
230{
231    type Output = <T as Future>::Output;
232}
233
234#[doc(hidden)]
235#[derive(Debug)]
236pub struct EventHandlerData<'a> {
237    client: Client,
238    room: Option<Room>,
239    raw: &'a RawJsonValue,
240    encryption_info: Option<&'a EncryptionInfo>,
241    push_actions: &'a [Action],
242    handle: EventHandlerHandle,
243}
244
245/// Return types supported for event handlers implement this trait.
246///
247/// It is not meant to be implemented outside of matrix-sdk.
248pub trait EventHandlerResult: Sized {
249    #[doc(hidden)]
250    fn print_error(&self, event_type: Option<&str>);
251}
252
253impl EventHandlerResult for () {
254    fn print_error(&self, _event_type: Option<&str>) {}
255}
256
257impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
258    fn print_error(&self, event_type: Option<&str>) {
259        let msg_fragment = match event_type {
260            Some(event_type) => format!(" for `{event_type}`"),
261            None => "".to_owned(),
262        };
263
264        match self {
265            #[cfg(feature = "anyhow")]
266            Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
267                error!("Event handler{msg_fragment} failed: {e:?}");
268            }
269            #[cfg(feature = "eyre")]
270            Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
271                error!("Event handler{msg_fragment} failed: {e:?}");
272            }
273            Err(e) => {
274                error!("Event handler{msg_fragment} failed: {e}");
275            }
276            Ok(_) => {}
277        }
278    }
279}
280
281#[derive(Deserialize)]
282struct UnsignedDetails {
283    redacted_because: Option<serde::de::IgnoredAny>,
284}
285
286/// Event handling internals.
287impl Client {
288    pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
289        &self,
290        handler: H,
291        room_id: Option<OwnedRoomId>,
292    ) -> EventHandlerHandle
293    where
294        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
295        H: EventHandler<Ev, Ctx>,
296    {
297        let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
298            let maybe_fut = serde_json::from_str(data.raw.get())
299                .map(|ev| handler.clone().handle_event(ev, data));
300
301            Box::pin(async move {
302                match maybe_fut {
303                    Ok(Some(fut)) => {
304                        fut.await.print_error(Ev::TYPE);
305                    }
306                    Ok(None) => {
307                        error!(
308                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
309                            "Event handler has an invalid context argument",
310                        );
311                    }
312                    Err(e) => {
313                        warn!(
314                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
315                            "Failed to deserialize event, skipping event handler.\n
316                             Deserialization error: {e}",
317                        );
318                    }
319                }
320            })
321        });
322
323        let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
324        let handle =
325            EventHandlerHandle { ev_kind: Ev::KIND, ev_type: Ev::TYPE, room_id, handler_id };
326
327        self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
328
329        handle
330    }
331
332    pub(crate) async fn handle_sync_events<T>(
333        &self,
334        kind: HandlerKind,
335        room: Option<&Room>,
336        events: &[Raw<T>],
337    ) -> serde_json::Result<()> {
338        #[derive(Deserialize)]
339        struct ExtractType<'a> {
340            #[serde(borrow, rename = "type")]
341            event_type: Cow<'a, str>,
342        }
343
344        for raw_event in events {
345            let event_type = raw_event.deserialize_as::<ExtractType<'_>>()?.event_type;
346            self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
347        }
348
349        Ok(())
350    }
351
352    pub(crate) async fn handle_sync_state_events(
353        &self,
354        room: Option<&Room>,
355        state_events: &[Raw<AnySyncStateEvent>],
356    ) -> serde_json::Result<()> {
357        #[derive(Deserialize)]
358        struct StateEventDetails<'a> {
359            #[serde(borrow, rename = "type")]
360            event_type: Cow<'a, str>,
361            unsigned: Option<UnsignedDetails>,
362        }
363
364        // Event handlers for possibly-redacted state events
365        self.handle_sync_events(HandlerKind::State, room, state_events).await?;
366
367        // Event handlers specifically for redacted OR unredacted state events
368        for raw_event in state_events {
369            let StateEventDetails { event_type, unsigned } = raw_event.deserialize_as()?;
370            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
371            let handler_kind = HandlerKind::state_redacted(redacted);
372
373            self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
374                .await;
375        }
376
377        Ok(())
378    }
379
380    pub(crate) async fn handle_sync_timeline_events(
381        &self,
382        room: Option<&Room>,
383        timeline_events: &[TimelineEvent],
384    ) -> serde_json::Result<()> {
385        #[derive(Deserialize)]
386        struct TimelineEventDetails<'a> {
387            #[serde(borrow, rename = "type")]
388            event_type: Cow<'a, str>,
389            state_key: Option<serde::de::IgnoredAny>,
390            unsigned: Option<UnsignedDetails>,
391        }
392
393        for item in timeline_events {
394            let TimelineEventDetails { event_type, state_key, unsigned } =
395                item.raw().deserialize_as()?;
396
397            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
398            let (handler_kind_g, handler_kind_r) = match state_key {
399                Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
400                None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
401            };
402
403            let raw_event = item.raw().json();
404            let encryption_info = item.encryption_info();
405            let push_actions = item.push_actions.as_deref().unwrap_or(&[]);
406
407            // Event handlers for possibly-redacted timeline events
408            self.call_event_handlers(
409                room,
410                raw_event,
411                handler_kind_g,
412                &event_type,
413                encryption_info,
414                push_actions,
415            )
416            .await;
417
418            // Event handlers specifically for redacted OR unredacted timeline events
419            self.call_event_handlers(
420                room,
421                raw_event,
422                handler_kind_r,
423                &event_type,
424                encryption_info,
425                push_actions,
426            )
427            .await;
428
429            // Event handlers for `AnySyncTimelineEvent`
430            let kind = HandlerKind::Timeline;
431            self.call_event_handlers(
432                room,
433                raw_event,
434                kind,
435                &event_type,
436                encryption_info,
437                push_actions,
438            )
439            .await;
440        }
441
442        Ok(())
443    }
444
445    #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
446    async fn call_event_handlers(
447        &self,
448        room: Option<&Room>,
449        raw: &RawJsonValue,
450        event_kind: HandlerKind,
451        event_type: &str,
452        encryption_info: Option<&EncryptionInfo>,
453        push_actions: &[Action],
454    ) {
455        let room_id = room.map(|r| r.room_id());
456        if let Some(room_id) = room_id {
457            tracing::Span::current().record("room_id", debug(room_id));
458        }
459
460        // Construct event handler futures
461        let mut futures: FuturesUnordered<_> = self
462            .inner
463            .event_handlers
464            .handlers
465            .read()
466            .unwrap()
467            .get_handlers(event_kind, event_type, room_id)
468            .map(|(handle, handler_fn)| {
469                let data = EventHandlerData {
470                    client: self.clone(),
471                    room: room.cloned(),
472                    raw,
473                    encryption_info,
474                    push_actions,
475                    handle,
476                };
477
478                (handler_fn)(data)
479            })
480            .collect();
481
482        if !futures.is_empty() {
483            debug!(amount = futures.len(), "Calling event handlers");
484
485            // Run the event handler futures with the `self.event_handlers.handlers`
486            // lock no longer being held.
487            while let Some(()) = futures.next().await {}
488        }
489    }
490}
491
492/// A guard type that removes an event handler when it drops (goes out of
493/// scope).
494///
495/// Created with [`Client::event_handler_drop_guard`].
496#[derive(Debug)]
497pub struct EventHandlerDropGuard {
498    handle: EventHandlerHandle,
499    client: Client,
500}
501
502impl EventHandlerDropGuard {
503    pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
504        Self { handle, client }
505    }
506}
507
508impl Drop for EventHandlerDropGuard {
509    fn drop(&mut self) {
510        self.client.remove_event_handler(self.handle.clone());
511    }
512}
513
514macro_rules! impl_event_handler {
515    ($($ty:ident),* $(,)?) => {
516        impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
517        where
518            Ev: SyncEvent,
519            Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
520            Fut: EventHandlerFuture,
521            $($ty: EventHandlerContext),*
522        {
523            type Future = Fut;
524
525            fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
526                Some((self)(ev, $($ty::from_data(&_d)?),*))
527            }
528        }
529    };
530}
531
532impl_event_handler!();
533impl_event_handler!(A);
534impl_event_handler!(A, B);
535impl_event_handler!(A, B, C);
536impl_event_handler!(A, B, C, D);
537impl_event_handler!(A, B, C, D, E);
538impl_event_handler!(A, B, C, D, E, F);
539impl_event_handler!(A, B, C, D, E, F, G);
540impl_event_handler!(A, B, C, D, E, F, G, H);
541
542/// An observer of events (may be tailored to a room).
543///
544/// Only the most recent value can be observed. Subscribers are notified when a
545/// new value is sent, but there is no guarantee that they will see all values.
546///
547/// To create such observer, use [`Client::observe_events`] or
548/// [`Client::observe_room_events`].
549#[derive(Debug)]
550pub struct ObservableEventHandler<T> {
551    /// This type is actually nothing more than a thin glue layer between the
552    /// [`EventHandler`] mechanism and the reactive programming types from
553    /// [`eyeball`]. Here, we use a [`SharedObservable`] that is updated by the
554    /// [`EventHandler`].
555    shared_observable: SharedObservable<Option<T>>,
556
557    /// This type owns the [`EventHandlerDropGuard`]. As soon as this type goes
558    /// out of scope, the event handler is unregistered/removed.
559    ///
560    /// [`EventHandlerSubscriber`] holds a weak, non-owning reference, to this
561    /// guard. It is useful to detect when to close the [`Stream`]: as soon as
562    /// this type goes out of scope, the subscriber will close itself on poll.
563    event_handler_guard: Arc<EventHandlerDropGuard>,
564}
565
566impl<T> ObservableEventHandler<T> {
567    pub(crate) fn new(
568        shared_observable: SharedObservable<Option<T>>,
569        event_handler_guard: EventHandlerDropGuard,
570    ) -> Self {
571        Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
572    }
573
574    /// Subscribe to this observer.
575    ///
576    /// It returns an [`EventHandlerSubscriber`], which implements [`Stream`].
577    /// See its documentation to learn more.
578    pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
579        EventHandlerSubscriber::new(
580            self.shared_observable.subscribe(),
581            // The subscriber holds a weak non-owning reference to the event handler guard, so that
582            // it can detect when this observer is dropped, and can close the subscriber's stream.
583            Arc::downgrade(&self.event_handler_guard),
584        )
585    }
586}
587
588pin_project! {
589    /// The subscriber of an [`ObservableEventHandler`].
590    ///
591    /// To create such subscriber, use [`ObservableEventHandler::subscribe`].
592    ///
593    /// This type implements [`Stream`], which means it is possible to poll the
594    /// next value asynchronously. In other terms, polling this type will return
595    /// the new event as soon as they are synced. See [`Client::observe_events`]
596    /// to learn more.
597    #[derive(Debug)]
598    pub struct EventHandlerSubscriber<T> {
599        // The `Subscriber` associated to the `SharedObservable` inside
600        // `ObservableEventHandle`.
601        //
602        // Keep in mind all this API is just a thin glue layer between
603        // `EventHandle` and `SharedObservable`, that's… maagiic!
604        #[pin]
605        subscriber: Subscriber<Option<T>>,
606
607        // A weak non-owning reference to the event handler guard from
608        // `ObservableEventHandler`. When this type is polled (via its `Stream`
609        // implementation), it is possible to detect whether the observable has
610        // been dropped by upgrading this weak reference, and close the `Stream`
611        // if it needs to.
612        event_handler_guard: Weak<EventHandlerDropGuard>,
613    }
614}
615
616impl<T> EventHandlerSubscriber<T> {
617    fn new(
618        subscriber: Subscriber<Option<T>>,
619        event_handler_handle: Weak<EventHandlerDropGuard>,
620    ) -> Self {
621        Self { subscriber, event_handler_guard: event_handler_handle }
622    }
623}
624
625impl<T> Stream for EventHandlerSubscriber<T>
626where
627    T: Clone,
628{
629    type Item = T;
630
631    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
632        let mut this = self.project();
633
634        let Some(_) = this.event_handler_guard.upgrade() else {
635            // The `EventHandlerHandle` has been dropped via `EventHandlerDropGuard`. It
636            // means the `ObservableEventHandler` has been dropped. It's time to
637            // close this stream.
638            return Poll::Ready(None);
639        };
640
641        // First off, the subscriber is of type `Subscriber<Option<T>>` because the
642        // `SharedObservable` starts with a `None` value to indicate it has no yet
643        // received any update. We want the `Stream` to return `T`, not `Option<T>`. We
644        // then filter out all `None` value.
645        //
646        // Second, when a `None` value is met, we want to poll again (hence the `loop`).
647        // At best, there is a new value to return. At worst, the subscriber will return
648        // `Poll::Pending` and will register the wakers accordingly.
649
650        loop {
651            match this.subscriber.as_mut().poll_next(context) {
652                // Stream has been closed somehow.
653                Poll::Ready(None) => return Poll::Ready(None),
654
655                // The initial value (of the `SharedObservable` behind `self.subscriber`) has been
656                // polled. We want to filter it out.
657                Poll::Ready(Some(None)) => {
658                    // Loop over.
659                    continue;
660                }
661
662                // We have a new value!
663                Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
664
665                // Classical pending.
666                Poll::Pending => return Poll::Pending,
667            }
668        }
669    }
670}
671
672#[cfg(test)]
673mod tests {
674    use matrix_sdk_test::{
675        async_test,
676        event_factory::{EventFactory, PreviousMembership},
677        InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID,
678    };
679    use stream_assert::{assert_closed, assert_pending, assert_ready};
680    #[cfg(target_arch = "wasm32")]
681    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
682    use std::{
683        future,
684        sync::{
685            atomic::{AtomicU8, Ordering::SeqCst},
686            Arc,
687        },
688    };
689
690    use matrix_sdk_test::{
691        EphemeralTestEvent, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder,
692    };
693    use once_cell::sync::Lazy;
694    use ruma::{
695        event_id,
696        events::{
697            room::{
698                member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
699                name::OriginalSyncRoomNameEvent,
700                power_levels::OriginalSyncRoomPowerLevelsEvent,
701            },
702            typing::SyncTypingEvent,
703            AnySyncStateEvent, AnySyncTimelineEvent,
704        },
705        room_id,
706        serde::Raw,
707        user_id,
708    };
709    use serde_json::json;
710
711    use crate::{
712        event_handler::Ctx,
713        test_utils::{logged_in_client, no_retry_test_client},
714        Client, Room,
715    };
716
717    static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
718        EventFactory::new()
719            .member(user_id!("@example:localhost"))
720            .membership(MembershipState::Join)
721            .display_name("example")
722            .event_id(event_id!("$151800140517rfvjc:localhost"))
723            .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
724            .into()
725    });
726
727    #[async_test]
728    async fn test_add_event_handler() -> crate::Result<()> {
729        let client = logged_in_client(None).await;
730
731        let member_count = Arc::new(AtomicU8::new(0));
732        let typing_count = Arc::new(AtomicU8::new(0));
733        let power_levels_count = Arc::new(AtomicU8::new(0));
734        let invited_member_count = Arc::new(AtomicU8::new(0));
735
736        client.add_event_handler({
737            let member_count = member_count.clone();
738            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
739                member_count.fetch_add(1, SeqCst);
740            }
741        });
742        client.add_event_handler({
743            let typing_count = typing_count.clone();
744            move |_ev: SyncTypingEvent| async move {
745                typing_count.fetch_add(1, SeqCst);
746            }
747        });
748        client.add_event_handler({
749            let power_levels_count = power_levels_count.clone();
750            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
751                power_levels_count.fetch_add(1, SeqCst);
752            }
753        });
754        client.add_event_handler({
755            let invited_member_count = invited_member_count.clone();
756            move |_ev: StrippedRoomMemberEvent| async move {
757                invited_member_count.fetch_add(1, SeqCst);
758            }
759        });
760
761        let response = SyncResponseBuilder::default()
762            .add_joined_room(
763                JoinedRoomBuilder::default()
764                    .add_timeline_event(MEMBER_EVENT.clone())
765                    .add_ephemeral_event(EphemeralTestEvent::Typing)
766                    .add_state_event(StateTestEvent::PowerLevels),
767            )
768            .add_invited_room(
769                InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
770                    StrippedStateTestEvent::Custom(json!({
771                        "content": {
772                            "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
773                            "displayname": "Alice",
774                            "membership": "invite",
775                        },
776                        "event_id": "$143273582443PhrSn:example.org",
777                        "origin_server_ts": 1432735824653u64,
778                        "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
779                        "sender": "@example:example.org",
780                        "state_key": "@alice:example.org",
781                        "type": "m.room.member",
782                        "unsigned": {
783                            "age": 1234,
784                            "invite_room_state": [
785                                {
786                                    "content": {
787                                        "name": "Example Room"
788                                    },
789                                    "sender": "@bob:example.org",
790                                    "state_key": "",
791                                    "type": "m.room.name"
792                                },
793                                {
794                                    "content": {
795                                        "join_rule": "invite"
796                                    },
797                                    "sender": "@bob:example.org",
798                                    "state_key": "",
799                                    "type": "m.room.join_rules"
800                                }
801                            ]
802                        }
803                    })),
804                ),
805            )
806            .build_sync_response();
807        client.process_sync(response).await?;
808
809        assert_eq!(member_count.load(SeqCst), 1);
810        assert_eq!(typing_count.load(SeqCst), 1);
811        assert_eq!(power_levels_count.load(SeqCst), 1);
812        assert_eq!(invited_member_count.load(SeqCst), 1);
813
814        Ok(())
815    }
816
817    #[async_test]
818    #[allow(dependency_on_unit_never_type_fallback)]
819    async fn test_add_room_event_handler() -> crate::Result<()> {
820        let client = logged_in_client(None).await;
821
822        let room_id_a = room_id!("!foo:example.org");
823        let room_id_b = room_id!("!bar:matrix.org");
824
825        let member_count = Arc::new(AtomicU8::new(0));
826        let power_levels_count = Arc::new(AtomicU8::new(0));
827
828        // Room event handlers for member events in both rooms
829        client.add_room_event_handler(room_id_a, {
830            let member_count = member_count.clone();
831            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
832                member_count.fetch_add(1, SeqCst);
833                future::ready(())
834            }
835        });
836        client.add_room_event_handler(room_id_b, {
837            let member_count = member_count.clone();
838            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
839                member_count.fetch_add(1, SeqCst);
840                future::ready(())
841            }
842        });
843
844        // Power levels event handlers for member events in room A
845        client.add_room_event_handler(room_id_a, {
846            let power_levels_count = power_levels_count.clone();
847            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
848                power_levels_count.fetch_add(1, SeqCst);
849                future::ready(())
850            }
851        });
852
853        // Room name event handler for room name events in room B
854        client.add_room_event_handler(room_id_b, move |_ev: OriginalSyncRoomNameEvent| async {
855            unreachable!("No room event in room B")
856        });
857
858        let response = SyncResponseBuilder::default()
859            .add_joined_room(
860                JoinedRoomBuilder::new(room_id_a)
861                    .add_timeline_event(MEMBER_EVENT.clone())
862                    .add_state_event(StateTestEvent::PowerLevels)
863                    .add_state_event(StateTestEvent::RoomName),
864            )
865            .add_joined_room(
866                JoinedRoomBuilder::new(room_id_b)
867                    .add_timeline_event(MEMBER_EVENT.clone())
868                    .add_state_event(StateTestEvent::PowerLevels),
869            )
870            .build_sync_response();
871        client.process_sync(response).await?;
872
873        assert_eq!(member_count.load(SeqCst), 2);
874        assert_eq!(power_levels_count.load(SeqCst), 1);
875
876        Ok(())
877    }
878
879    #[async_test]
880    #[allow(dependency_on_unit_never_type_fallback)]
881    async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
882        let client = logged_in_client(None).await;
883
884        client.add_event_handler(
885            |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
886        );
887
888        // If it compiles, it works. No need to assert anything.
889
890        Ok(())
891    }
892
893    #[async_test]
894    #[allow(dependency_on_unit_never_type_fallback)]
895    async fn test_remove_event_handler() -> crate::Result<()> {
896        let client = logged_in_client(None).await;
897
898        let member_count = Arc::new(AtomicU8::new(0));
899
900        client.add_event_handler({
901            let member_count = member_count.clone();
902            move |_ev: OriginalSyncRoomMemberEvent| async move {
903                member_count.fetch_add(1, SeqCst);
904            }
905        });
906
907        let handle_a = client.add_event_handler(move |_ev: OriginalSyncRoomMemberEvent| async {
908            panic!("handler should have been removed");
909        });
910        let handle_b = client.add_room_event_handler(
911            #[allow(unknown_lints, clippy::explicit_auto_deref)] // lint is buggy
912            *DEFAULT_TEST_ROOM_ID,
913            move |_ev: OriginalSyncRoomMemberEvent| async {
914                panic!("handler should have been removed");
915            },
916        );
917
918        client.add_event_handler({
919            let member_count = member_count.clone();
920            move |_ev: OriginalSyncRoomMemberEvent| async move {
921                member_count.fetch_add(1, SeqCst);
922            }
923        });
924
925        let response = SyncResponseBuilder::default()
926            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
927            .build_sync_response();
928
929        client.remove_event_handler(handle_a);
930        client.remove_event_handler(handle_b);
931
932        client.process_sync(response).await?;
933
934        assert_eq!(member_count.load(SeqCst), 2);
935
936        Ok(())
937    }
938
939    #[async_test]
940    async fn test_event_handler_drop_guard() {
941        let client = no_retry_test_client(None).await;
942
943        let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
944        assert_eq!(client.inner.event_handlers.len(), 1);
945
946        {
947            let _guard = client.event_handler_drop_guard(handle);
948            assert_eq!(client.inner.event_handlers.len(), 1);
949            // guard dropped here
950        }
951
952        assert_eq!(client.inner.event_handlers.len(), 0);
953    }
954
955    #[async_test]
956    async fn test_use_client_in_handler() {
957        // This used to not work because we were requiring `Send` of event
958        // handler futures even on WASM, where practically all futures that do
959        // I/O aren't.
960        let client = no_retry_test_client(None).await;
961
962        client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
963            // All of Client's async methods that do network requests (and
964            // possibly some that don't) are `!Send` on wasm. We obviously want
965            // to be able to use them in event handlers.
966            let _caps = client.get_capabilities().await?;
967            anyhow::Ok(())
968        });
969    }
970
971    #[async_test]
972    async fn test_raw_event_handler() -> crate::Result<()> {
973        let client = logged_in_client(None).await;
974        let counter = Arc::new(AtomicU8::new(0));
975        client.add_event_handler_context(counter.clone());
976        client.add_event_handler(
977            |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
978                counter.fetch_add(1, SeqCst);
979            },
980        );
981
982        let response = SyncResponseBuilder::default()
983            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
984            .build_sync_response();
985        client.process_sync(response).await?;
986
987        assert_eq!(counter.load(SeqCst), 1);
988        Ok(())
989    }
990
991    #[async_test]
992    async fn test_enum_event_handler() -> crate::Result<()> {
993        let client = logged_in_client(None).await;
994        let counter = Arc::new(AtomicU8::new(0));
995        client.add_event_handler_context(counter.clone());
996        client.add_event_handler(
997            |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
998                counter.fetch_add(1, SeqCst);
999            },
1000        );
1001
1002        let response = SyncResponseBuilder::default()
1003            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1004            .build_sync_response();
1005        client.process_sync(response).await?;
1006
1007        assert_eq!(counter.load(SeqCst), 1);
1008        Ok(())
1009    }
1010
1011    #[async_test]
1012    #[allow(dependency_on_unit_never_type_fallback)]
1013    async fn test_observe_events() -> crate::Result<()> {
1014        let client = logged_in_client(None).await;
1015
1016        let room_id_0 = room_id!("!r0.matrix.org");
1017        let room_id_1 = room_id!("!r1.matrix.org");
1018
1019        let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1020
1021        let mut subscriber = observable.subscribe();
1022
1023        assert_pending!(subscriber);
1024
1025        let mut response_builder = SyncResponseBuilder::new();
1026        let response = response_builder
1027            .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1028                StateTestEvent::Custom(json!({
1029                    "content": {
1030                        "name": "Name 0"
1031                    },
1032                    "event_id": "$ev0",
1033                    "origin_server_ts": 1,
1034                    "sender": "@mnt_io:matrix.org",
1035                    "state_key": "",
1036                    "type": "m.room.name",
1037                    "unsigned": {
1038                        "age": 1,
1039                    }
1040                })),
1041            ))
1042            .build_sync_response();
1043        client.process_sync(response).await?;
1044
1045        let (room_name, room) = assert_ready!(subscriber);
1046
1047        assert_eq!(room_name.event_id.as_str(), "$ev0");
1048        assert_eq!(room.room_id(), room_id_0);
1049        assert_eq!(room.name().unwrap(), "Name 0");
1050
1051        assert_pending!(subscriber);
1052
1053        let response = response_builder
1054            .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1055                StateTestEvent::Custom(json!({
1056                    "content": {
1057                        "name": "Name 1"
1058                    },
1059                    "event_id": "$ev1",
1060                    "origin_server_ts": 2,
1061                    "sender": "@mnt_io:matrix.org",
1062                    "state_key": "",
1063                    "type": "m.room.name",
1064                    "unsigned": {
1065                        "age": 2,
1066                    }
1067                })),
1068            ))
1069            .build_sync_response();
1070        client.process_sync(response).await?;
1071
1072        let (room_name, room) = assert_ready!(subscriber);
1073
1074        assert_eq!(room_name.event_id.as_str(), "$ev1");
1075        assert_eq!(room.room_id(), room_id_1);
1076        assert_eq!(room.name().unwrap(), "Name 1");
1077
1078        assert_pending!(subscriber);
1079
1080        drop(observable);
1081        assert_closed!(subscriber);
1082
1083        Ok(())
1084    }
1085
1086    #[async_test]
1087    #[allow(dependency_on_unit_never_type_fallback)]
1088    async fn test_observe_room_events() -> crate::Result<()> {
1089        let client = logged_in_client(None).await;
1090
1091        let room_id = room_id!("!r0.matrix.org");
1092
1093        let observable_for_room =
1094            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1095
1096        let mut subscriber_for_room = observable_for_room.subscribe();
1097
1098        assert_pending!(subscriber_for_room);
1099
1100        let mut response_builder = SyncResponseBuilder::new();
1101        let response = response_builder
1102            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1103                StateTestEvent::Custom(json!({
1104                    "content": {
1105                        "name": "Name 0"
1106                    },
1107                    "event_id": "$ev0",
1108                    "origin_server_ts": 1,
1109                    "sender": "@mnt_io:matrix.org",
1110                    "state_key": "",
1111                    "type": "m.room.name",
1112                    "unsigned": {
1113                        "age": 1,
1114                    }
1115                })),
1116            ))
1117            .build_sync_response();
1118        client.process_sync(response).await?;
1119
1120        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1121
1122        assert_eq!(room_name.event_id.as_str(), "$ev0");
1123        assert_eq!(room.name().unwrap(), "Name 0");
1124
1125        assert_pending!(subscriber_for_room);
1126
1127        let response = response_builder
1128            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1129                StateTestEvent::Custom(json!({
1130                    "content": {
1131                        "name": "Name 1"
1132                    },
1133                    "event_id": "$ev1",
1134                    "origin_server_ts": 2,
1135                    "sender": "@mnt_io:matrix.org",
1136                    "state_key": "",
1137                    "type": "m.room.name",
1138                    "unsigned": {
1139                        "age": 2,
1140                    }
1141                })),
1142            ))
1143            .build_sync_response();
1144        client.process_sync(response).await?;
1145
1146        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1147
1148        assert_eq!(room_name.event_id.as_str(), "$ev1");
1149        assert_eq!(room.name().unwrap(), "Name 1");
1150
1151        assert_pending!(subscriber_for_room);
1152
1153        drop(observable_for_room);
1154        assert_closed!(subscriber_for_room);
1155
1156        Ok(())
1157    }
1158
1159    #[async_test]
1160    async fn test_observe_several_room_events() -> crate::Result<()> {
1161        let client = logged_in_client(None).await;
1162
1163        let room_id = room_id!("!r0.matrix.org");
1164
1165        let observable_for_room =
1166            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1167
1168        let mut subscriber_for_room = observable_for_room.subscribe();
1169
1170        assert_pending!(subscriber_for_room);
1171
1172        let mut response_builder = SyncResponseBuilder::new();
1173        let response = response_builder
1174            .add_joined_room(
1175                JoinedRoomBuilder::new(room_id)
1176                    .add_state_event(StateTestEvent::Custom(json!({
1177                        "content": {
1178                            "name": "Name 0"
1179                        },
1180                        "event_id": "$ev0",
1181                        "origin_server_ts": 1,
1182                        "sender": "@mnt_io:matrix.org",
1183                        "state_key": "",
1184                        "type": "m.room.name",
1185                        "unsigned": {
1186                            "age": 1,
1187                        }
1188                    })))
1189                    .add_state_event(StateTestEvent::Custom(json!({
1190                        "content": {
1191                            "name": "Name 1"
1192                        },
1193                        "event_id": "$ev1",
1194                        "origin_server_ts": 2,
1195                        "sender": "@mnt_io:matrix.org",
1196                        "state_key": "",
1197                        "type": "m.room.name",
1198                        "unsigned": {
1199                            "age": 1,
1200                        }
1201                    })))
1202                    .add_state_event(StateTestEvent::Custom(json!({
1203                        "content": {
1204                            "name": "Name 2"
1205                        },
1206                        "event_id": "$ev2",
1207                        "origin_server_ts": 3,
1208                        "sender": "@mnt_io:matrix.org",
1209                        "state_key": "",
1210                        "type": "m.room.name",
1211                        "unsigned": {
1212                            "age": 1,
1213                        }
1214                    }))),
1215            )
1216            .build_sync_response();
1217        client.process_sync(response).await?;
1218
1219        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1220
1221        // Check we only get notified about the latest received event
1222        assert_eq!(room_name.event_id.as_str(), "$ev2");
1223        assert_eq!(room.name().unwrap(), "Name 2");
1224
1225        assert_pending!(subscriber_for_room);
1226
1227        drop(observable_for_room);
1228        assert_closed!(subscriber_for_room);
1229
1230        Ok(())
1231    }
1232}