matrix_sdk/event_handler/
mod.rs

1// Copyright 2021 Jonas Platte
2// Copyright 2022 Famedly GmbH
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types and traits related for event handlers. For usage, see
17//! [`Client::add_event_handler`].
18//!
19//! ### How it works
20//!
21//! The `add_event_handler` method registers event handlers of different
22//! signatures by actually storing boxed closures that all have the same
23//! signature of `async (EventHandlerData) -> ()` where `EventHandlerData` is a
24//! private type that contains all of the data an event handler *might* need.
25//!
26//! The stored closure takes care of deserializing the event which the
27//! `EventHandlerData` contains as a (borrowed) [`serde_json::value::RawValue`],
28//! extracting the context arguments from other fields of `EventHandlerData` and
29//! calling / `.await`ing the event handler if the previous steps succeeded.
30//! It also logs any errors from the above chain of function calls.
31//!
32//! For more details, see the [`EventHandler`] trait.
33
34#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37    borrow::Cow,
38    fmt,
39    future::Future,
40    pin::Pin,
41    sync::{
42        atomic::{AtomicU64, Ordering::SeqCst},
43        Arc, RwLock, Weak,
44    },
45    task::{Context, Poll},
46};
47
48use anymap2::any::CloneAnySendSync;
49use eyeball::{SharedObservable, Subscriber};
50use futures_core::Stream;
51use futures_util::stream::{FuturesUnordered, StreamExt};
52use matrix_sdk_base::{
53    deserialized_responses::{EncryptionInfo, TimelineEvent},
54    SendOutsideWasm, SyncOutsideWasm,
55};
56use pin_project_lite::pin_project;
57use ruma::{events::AnySyncStateEvent, push::Action, serde::Raw, OwnedRoomId};
58use serde::{de::DeserializeOwned, Deserialize};
59use serde_json::value::RawValue as RawJsonValue;
60use tracing::{debug, error, field::debug, instrument, warn};
61
62use self::maps::EventHandlerMaps;
63use crate::{Client, Room};
64
65mod context;
66mod maps;
67mod static_events;
68
69pub use self::context::{Ctx, EventHandlerContext, RawEvent};
70
71#[cfg(not(target_arch = "wasm32"))]
72type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
73#[cfg(target_arch = "wasm32")]
74type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
75
76#[cfg(not(target_arch = "wasm32"))]
77type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
78#[cfg(target_arch = "wasm32")]
79type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
80
81type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
82
83#[derive(Default)]
84pub(crate) struct EventHandlerStore {
85    handlers: RwLock<EventHandlerMaps>,
86    context: RwLock<AnyMap>,
87    counter: AtomicU64,
88}
89
90impl EventHandlerStore {
91    pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
92        self.handlers.write().unwrap().add(handle, handler_fn);
93    }
94
95    pub fn add_context<T>(&self, ctx: T)
96    where
97        T: Clone + Send + Sync + 'static,
98    {
99        self.context.write().unwrap().insert(ctx);
100    }
101
102    pub fn remove(&self, handle: EventHandlerHandle) {
103        self.handlers.write().unwrap().remove(handle);
104    }
105
106    #[cfg(test)]
107    fn len(&self) -> usize {
108        self.handlers.read().unwrap().len()
109    }
110}
111
112#[doc(hidden)]
113#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
114pub enum HandlerKind {
115    GlobalAccountData,
116    RoomAccountData,
117    EphemeralRoomData,
118    Timeline,
119    MessageLike,
120    OriginalMessageLike,
121    RedactedMessageLike,
122    State,
123    OriginalState,
124    RedactedState,
125    StrippedState,
126    ToDevice,
127    Presence,
128}
129
130impl HandlerKind {
131    fn message_like_redacted(redacted: bool) -> Self {
132        if redacted {
133            Self::RedactedMessageLike
134        } else {
135            Self::OriginalMessageLike
136        }
137    }
138
139    fn state_redacted(redacted: bool) -> Self {
140        if redacted {
141            Self::RedactedState
142        } else {
143            Self::OriginalState
144        }
145    }
146}
147
148/// A statically-known event kind/type that can be retrieved from an event sync.
149pub trait SyncEvent {
150    #[doc(hidden)]
151    const KIND: HandlerKind;
152    #[doc(hidden)]
153    const TYPE: Option<&'static str>;
154}
155
156pub(crate) struct EventHandlerWrapper {
157    handler_fn: Box<EventHandlerFn>,
158    pub handler_id: u64,
159}
160
161/// Handle to remove a registered event handler by passing it to
162/// [`Client::remove_event_handler`].
163#[derive(Clone, Debug)]
164pub struct EventHandlerHandle {
165    pub(crate) ev_kind: HandlerKind,
166    pub(crate) ev_type: Option<&'static str>,
167    pub(crate) room_id: Option<OwnedRoomId>,
168    pub(crate) handler_id: u64,
169}
170
171/// Interface for event handlers.
172///
173/// This trait is an abstraction for a certain kind of functions / closures,
174/// specifically:
175///
176/// * They must have at least one argument, which is the event itself, a type
177///   that implements [`SyncEvent`]. Any additional arguments need to implement
178///   the [`EventHandlerContext`] trait.
179/// * Their return type has to be one of: `()`, `Result<(), impl Display + Debug
180///   + 'static>` (if you are using `anyhow::Result` or `eyre::Result` you can
181///   additionally enable the `anyhow` / `eyre` feature to get the verbose
182///   `Debug` output printed on error)
183///
184/// ### How it works
185///
186/// This trait is basically a very constrained version of `Fn`: It requires at
187/// least one argument, which is represented as its own generic parameter `Ev`
188/// with the remaining parameter types being represented by the second generic
189/// parameter `Ctx`; they have to be stuffed into one generic parameter as a
190/// tuple because Rust doesn't have variadic generics.
191///
192/// `Ev` and `Ctx` are generic parameters rather than associated types because
193/// the argument list is a generic parameter for the `Fn` traits too, so a
194/// single type could implement `Fn` multiple times with different argument
195/// lists¹. Luckily, when calling [`Client::add_event_handler`] with a
196/// closure argument the trait solver takes into account that only a single one
197/// of the implementations applies (even though this could theoretically change
198/// through a dependency upgrade) and uses that rather than raising an ambiguity
199/// error. This is the same trick used by web frameworks like actix-web and
200/// axum.
201///
202/// ¹ the only thing stopping such types from existing in stable Rust is that
203/// all manual implementations of the `Fn` traits require a Nightly feature
204pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
205    /// The future returned by `handle_event`.
206    #[doc(hidden)]
207    type Future: EventHandlerFuture;
208
209    /// Create a future for handling the given event.
210    ///
211    /// `data` provides additional data about the event, for example the room it
212    /// appeared in.
213    ///
214    /// Returns `None` if one of the context extractors failed.
215    #[doc(hidden)]
216    fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
217}
218
219#[doc(hidden)]
220pub trait EventHandlerFuture:
221    Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
222{
223    type Output: EventHandlerResult;
224}
225
226impl<T> EventHandlerFuture for T
227where
228    T: Future + SendOutsideWasm + 'static,
229    <T as Future>::Output: EventHandlerResult,
230{
231    type Output = <T as Future>::Output;
232}
233
234#[doc(hidden)]
235#[derive(Debug)]
236pub struct EventHandlerData<'a> {
237    client: Client,
238    room: Option<Room>,
239    raw: &'a RawJsonValue,
240    encryption_info: Option<&'a EncryptionInfo>,
241    push_actions: &'a [Action],
242    handle: EventHandlerHandle,
243}
244
245/// Return types supported for event handlers implement this trait.
246///
247/// It is not meant to be implemented outside of matrix-sdk.
248pub trait EventHandlerResult: Sized {
249    #[doc(hidden)]
250    fn print_error(&self, event_type: Option<&str>);
251}
252
253impl EventHandlerResult for () {
254    fn print_error(&self, _event_type: Option<&str>) {}
255}
256
257impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
258    fn print_error(&self, event_type: Option<&str>) {
259        let msg_fragment = match event_type {
260            Some(event_type) => format!(" for `{event_type}`"),
261            None => "".to_owned(),
262        };
263
264        match self {
265            #[cfg(feature = "anyhow")]
266            Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
267                error!("Event handler{msg_fragment} failed: {e:?}");
268            }
269            #[cfg(feature = "eyre")]
270            Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
271                error!("Event handler{msg_fragment} failed: {e:?}");
272            }
273            Err(e) => {
274                error!("Event handler{msg_fragment} failed: {e}");
275            }
276            Ok(_) => {}
277        }
278    }
279}
280
281#[derive(Deserialize)]
282struct UnsignedDetails {
283    redacted_because: Option<serde::de::IgnoredAny>,
284}
285
286/// Event handling internals.
287impl Client {
288    pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
289        &self,
290        handler: H,
291        room_id: Option<OwnedRoomId>,
292    ) -> EventHandlerHandle
293    where
294        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
295        H: EventHandler<Ev, Ctx>,
296    {
297        let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
298            let maybe_fut = serde_json::from_str(data.raw.get())
299                .map(|ev| handler.clone().handle_event(ev, data));
300
301            Box::pin(async move {
302                match maybe_fut {
303                    Ok(Some(fut)) => {
304                        fut.await.print_error(Ev::TYPE);
305                    }
306                    Ok(None) => {
307                        error!(
308                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
309                            "Event handler has an invalid context argument",
310                        );
311                    }
312                    Err(e) => {
313                        warn!(
314                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
315                            "Failed to deserialize event, skipping event handler.\n
316                             Deserialization error: {e}",
317                        );
318                    }
319                }
320            })
321        });
322
323        let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
324        let handle =
325            EventHandlerHandle { ev_kind: Ev::KIND, ev_type: Ev::TYPE, room_id, handler_id };
326
327        self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
328
329        handle
330    }
331
332    pub(crate) async fn handle_sync_events<T>(
333        &self,
334        kind: HandlerKind,
335        room: Option<&Room>,
336        events: &[Raw<T>],
337    ) -> serde_json::Result<()> {
338        #[derive(Deserialize)]
339        struct ExtractType<'a> {
340            #[serde(borrow, rename = "type")]
341            event_type: Cow<'a, str>,
342        }
343
344        for raw_event in events {
345            let event_type = raw_event.deserialize_as::<ExtractType<'_>>()?.event_type;
346            self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
347        }
348
349        Ok(())
350    }
351
352    pub(crate) async fn handle_sync_state_events(
353        &self,
354        room: Option<&Room>,
355        state_events: &[Raw<AnySyncStateEvent>],
356    ) -> serde_json::Result<()> {
357        #[derive(Deserialize)]
358        struct StateEventDetails<'a> {
359            #[serde(borrow, rename = "type")]
360            event_type: Cow<'a, str>,
361            unsigned: Option<UnsignedDetails>,
362        }
363
364        // Event handlers for possibly-redacted state events
365        self.handle_sync_events(HandlerKind::State, room, state_events).await?;
366
367        // Event handlers specifically for redacted OR unredacted state events
368        for raw_event in state_events {
369            let StateEventDetails { event_type, unsigned } = raw_event.deserialize_as()?;
370            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
371            let handler_kind = HandlerKind::state_redacted(redacted);
372
373            self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
374                .await;
375        }
376
377        Ok(())
378    }
379
380    pub(crate) async fn handle_sync_timeline_events(
381        &self,
382        room: Option<&Room>,
383        timeline_events: &[TimelineEvent],
384    ) -> serde_json::Result<()> {
385        #[derive(Deserialize)]
386        struct TimelineEventDetails<'a> {
387            #[serde(borrow, rename = "type")]
388            event_type: Cow<'a, str>,
389            state_key: Option<serde::de::IgnoredAny>,
390            unsigned: Option<UnsignedDetails>,
391        }
392
393        for item in timeline_events {
394            let TimelineEventDetails { event_type, state_key, unsigned } =
395                item.raw().deserialize_as()?;
396
397            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
398            let (handler_kind_g, handler_kind_r) = match state_key {
399                Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
400                None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
401            };
402
403            let raw_event = item.raw().json();
404            let encryption_info = item.encryption_info();
405            let push_actions = item.push_actions.as_deref().unwrap_or(&[]);
406
407            // Event handlers for possibly-redacted timeline events
408            self.call_event_handlers(
409                room,
410                raw_event,
411                handler_kind_g,
412                &event_type,
413                encryption_info,
414                push_actions,
415            )
416            .await;
417
418            // Event handlers specifically for redacted OR unredacted timeline events
419            self.call_event_handlers(
420                room,
421                raw_event,
422                handler_kind_r,
423                &event_type,
424                encryption_info,
425                push_actions,
426            )
427            .await;
428
429            // Event handlers for `AnySyncTimelineEvent`
430            let kind = HandlerKind::Timeline;
431            self.call_event_handlers(
432                room,
433                raw_event,
434                kind,
435                &event_type,
436                encryption_info,
437                push_actions,
438            )
439            .await;
440        }
441
442        Ok(())
443    }
444
445    #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
446    async fn call_event_handlers(
447        &self,
448        room: Option<&Room>,
449        raw: &RawJsonValue,
450        event_kind: HandlerKind,
451        event_type: &str,
452        encryption_info: Option<&EncryptionInfo>,
453        push_actions: &[Action],
454    ) {
455        let room_id = room.map(|r| r.room_id());
456        if let Some(room_id) = room_id {
457            tracing::Span::current().record("room_id", debug(room_id));
458        }
459
460        // Construct event handler futures
461        let mut futures: FuturesUnordered<_> = self
462            .inner
463            .event_handlers
464            .handlers
465            .read()
466            .unwrap()
467            .get_handlers(event_kind, event_type, room_id)
468            .map(|(handle, handler_fn)| {
469                let data = EventHandlerData {
470                    client: self.clone(),
471                    room: room.cloned(),
472                    raw,
473                    encryption_info,
474                    push_actions,
475                    handle,
476                };
477
478                (handler_fn)(data)
479            })
480            .collect();
481
482        if !futures.is_empty() {
483            debug!(amount = futures.len(), "Calling event handlers");
484
485            // Run the event handler futures with the `self.event_handlers.handlers`
486            // lock no longer being held.
487            while let Some(()) = futures.next().await {}
488        }
489    }
490}
491
492/// A guard type that removes an event handler when it drops (goes out of
493/// scope).
494///
495/// Created with [`Client::event_handler_drop_guard`].
496#[derive(Debug)]
497pub struct EventHandlerDropGuard {
498    handle: EventHandlerHandle,
499    client: Client,
500}
501
502impl EventHandlerDropGuard {
503    pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
504        Self { handle, client }
505    }
506}
507
508impl Drop for EventHandlerDropGuard {
509    fn drop(&mut self) {
510        self.client.remove_event_handler(self.handle.clone());
511    }
512}
513
514macro_rules! impl_event_handler {
515    ($($ty:ident),* $(,)?) => {
516        impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
517        where
518            Ev: SyncEvent,
519            Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
520            Fut: EventHandlerFuture,
521            $($ty: EventHandlerContext),*
522        {
523            type Future = Fut;
524
525            fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
526                Some((self)(ev, $($ty::from_data(&_d)?),*))
527            }
528        }
529    };
530}
531
532impl_event_handler!();
533impl_event_handler!(A);
534impl_event_handler!(A, B);
535impl_event_handler!(A, B, C);
536impl_event_handler!(A, B, C, D);
537impl_event_handler!(A, B, C, D, E);
538impl_event_handler!(A, B, C, D, E, F);
539impl_event_handler!(A, B, C, D, E, F, G);
540impl_event_handler!(A, B, C, D, E, F, G, H);
541
542/// An observer of events (may be tailored to a room).
543///
544/// Only the most recent value can be observed. Subscribers are notified when a
545/// new value is sent, but there is no guarantee that they will see all values.
546///
547/// To create such observer, use [`Client::observe_events`] or
548/// [`Client::observe_room_events`].
549#[derive(Debug)]
550pub struct ObservableEventHandler<T> {
551    /// This type is actually nothing more than a thin glue layer between the
552    /// [`EventHandler`] mechanism and the reactive programming types from
553    /// [`eyeball`]. Here, we use a [`SharedObservable`] that is updated by the
554    /// [`EventHandler`].
555    shared_observable: SharedObservable<Option<T>>,
556
557    /// This type owns the [`EventHandlerDropGuard`]. As soon as this type goes
558    /// out of scope, the event handler is unregistered/removed.
559    ///
560    /// [`EventHandlerSubscriber`] holds a weak, non-owning reference, to this
561    /// guard. It is useful to detect when to close the [`Stream`]: as soon as
562    /// this type goes out of scope, the subscriber will close itself on poll.
563    event_handler_guard: Arc<EventHandlerDropGuard>,
564}
565
566impl<T> ObservableEventHandler<T> {
567    pub(crate) fn new(
568        shared_observable: SharedObservable<Option<T>>,
569        event_handler_guard: EventHandlerDropGuard,
570    ) -> Self {
571        Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
572    }
573
574    /// Subscribe to this observer.
575    ///
576    /// It returns an [`EventHandlerSubscriber`], which implements [`Stream`].
577    /// See its documentation to learn more.
578    pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
579        EventHandlerSubscriber::new(
580            self.shared_observable.subscribe(),
581            // The subscriber holds a weak non-owning reference to the event handler guard, so that
582            // it can detect when this observer is dropped, and can close the subscriber's stream.
583            Arc::downgrade(&self.event_handler_guard),
584        )
585    }
586}
587
588pin_project! {
589    /// The subscriber of an [`ObservableEventHandler`].
590    ///
591    /// To create such subscriber, use [`ObservableEventHandler::subscribe`].
592    ///
593    /// This type implements [`Stream`], which means it is possible to poll the
594    /// next value asynchronously. In other terms, polling this type will return
595    /// the new event as soon as they are synced. See [`Client::observe_events`]
596    /// to learn more.
597    #[derive(Debug)]
598    pub struct EventHandlerSubscriber<T> {
599        // The `Subscriber` associated to the `SharedObservable` inside
600        // `ObservableEventHandle`.
601        //
602        // Keep in mind all this API is just a thin glue layer between
603        // `EventHandle` and `SharedObservable`, that's… maagiic!
604        #[pin]
605        subscriber: Subscriber<Option<T>>,
606
607        // A weak non-owning reference to the event handler guard from
608        // `ObservableEventHandler`. When this type is polled (via its `Stream`
609        // implementation), it is possible to detect whether the observable has
610        // been dropped by upgrading this weak reference, and close the `Stream`
611        // if it needs to.
612        event_handler_guard: Weak<EventHandlerDropGuard>,
613    }
614}
615
616impl<T> EventHandlerSubscriber<T> {
617    fn new(
618        subscriber: Subscriber<Option<T>>,
619        event_handler_handle: Weak<EventHandlerDropGuard>,
620    ) -> Self {
621        Self { subscriber, event_handler_guard: event_handler_handle }
622    }
623}
624
625impl<T> Stream for EventHandlerSubscriber<T>
626where
627    T: Clone,
628{
629    type Item = T;
630
631    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
632        let mut this = self.project();
633
634        let Some(_) = this.event_handler_guard.upgrade() else {
635            // The `EventHandlerHandle` has been dropped via `EventHandlerDropGuard`. It
636            // means the `ObservableEventHandler` has been dropped. It's time to
637            // close this stream.
638            return Poll::Ready(None);
639        };
640
641        // First off, the subscriber is of type `Subscriber<Option<T>>` because the
642        // `SharedObservable` starts with a `None` value to indicate it has no yet
643        // received any update. We want the `Stream` to return `T`, not `Option<T>`. We
644        // then filter out all `None` value.
645        //
646        // Second, when a `None` value is met, we want to poll again (hence the `loop`).
647        // At best, there is a new value to return. At worst, the subscriber will return
648        // `Poll::Pending` and will register the wakers accordingly.
649
650        loop {
651            match this.subscriber.as_mut().poll_next(context) {
652                // Stream has been closed somehow.
653                Poll::Ready(None) => return Poll::Ready(None),
654
655                // The initial value (of the `SharedObservable` behind `self.subscriber`) has been
656                // polled. We want to filter it out.
657                Poll::Ready(Some(None)) => {
658                    // Loop over.
659                    continue;
660                }
661
662                // We have a new value!
663                Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
664
665                // Classical pending.
666                Poll::Pending => return Poll::Pending,
667            }
668        }
669    }
670}
671
672#[cfg(test)]
673mod tests {
674    use matrix_sdk_test::{
675        async_test,
676        event_factory::{EventFactory, PreviousMembership},
677        InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID,
678    };
679    use stream_assert::{assert_closed, assert_pending, assert_ready};
680    #[cfg(target_arch = "wasm32")]
681    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
682    use std::{
683        future,
684        sync::{
685            atomic::{AtomicU8, Ordering::SeqCst},
686            Arc,
687        },
688    };
689
690    use matrix_sdk_test::{StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder};
691    use once_cell::sync::Lazy;
692    use ruma::{
693        event_id,
694        events::{
695            room::{
696                member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
697                name::OriginalSyncRoomNameEvent,
698                power_levels::OriginalSyncRoomPowerLevelsEvent,
699            },
700            typing::SyncTypingEvent,
701            AnySyncStateEvent, AnySyncTimelineEvent,
702        },
703        room_id,
704        serde::Raw,
705        user_id,
706    };
707    use serde_json::json;
708
709    use crate::{
710        event_handler::Ctx,
711        test_utils::{logged_in_client, no_retry_test_client},
712        Client, Room,
713    };
714
715    static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
716        EventFactory::new()
717            .member(user_id!("@example:localhost"))
718            .membership(MembershipState::Join)
719            .display_name("example")
720            .event_id(event_id!("$151800140517rfvjc:localhost"))
721            .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
722            .into()
723    });
724
725    #[async_test]
726    async fn test_add_event_handler() -> crate::Result<()> {
727        let client = logged_in_client(None).await;
728
729        let member_count = Arc::new(AtomicU8::new(0));
730        let typing_count = Arc::new(AtomicU8::new(0));
731        let power_levels_count = Arc::new(AtomicU8::new(0));
732        let invited_member_count = Arc::new(AtomicU8::new(0));
733
734        client.add_event_handler({
735            let member_count = member_count.clone();
736            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
737                member_count.fetch_add(1, SeqCst);
738            }
739        });
740        client.add_event_handler({
741            let typing_count = typing_count.clone();
742            move |_ev: SyncTypingEvent| async move {
743                typing_count.fetch_add(1, SeqCst);
744            }
745        });
746        client.add_event_handler({
747            let power_levels_count = power_levels_count.clone();
748            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
749                power_levels_count.fetch_add(1, SeqCst);
750            }
751        });
752        client.add_event_handler({
753            let invited_member_count = invited_member_count.clone();
754            move |_ev: StrippedRoomMemberEvent| async move {
755                invited_member_count.fetch_add(1, SeqCst);
756            }
757        });
758
759        let f = EventFactory::new();
760        let response = SyncResponseBuilder::default()
761            .add_joined_room(
762                JoinedRoomBuilder::default()
763                    .add_timeline_event(MEMBER_EVENT.clone())
764                    .add_typing(
765                        f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
766                    )
767                    .add_state_event(StateTestEvent::PowerLevels),
768            )
769            .add_invited_room(
770                InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
771                    StrippedStateTestEvent::Custom(json!({
772                        "content": {
773                            "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
774                            "displayname": "Alice",
775                            "membership": "invite",
776                        },
777                        "event_id": "$143273582443PhrSn:example.org",
778                        "origin_server_ts": 1432735824653u64,
779                        "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
780                        "sender": "@example:example.org",
781                        "state_key": "@alice:example.org",
782                        "type": "m.room.member",
783                        "unsigned": {
784                            "age": 1234,
785                            "invite_room_state": [
786                                {
787                                    "content": {
788                                        "name": "Example Room"
789                                    },
790                                    "sender": "@bob:example.org",
791                                    "state_key": "",
792                                    "type": "m.room.name"
793                                },
794                                {
795                                    "content": {
796                                        "join_rule": "invite"
797                                    },
798                                    "sender": "@bob:example.org",
799                                    "state_key": "",
800                                    "type": "m.room.join_rules"
801                                }
802                            ]
803                        }
804                    })),
805                ),
806            )
807            .build_sync_response();
808        client.process_sync(response).await?;
809
810        assert_eq!(member_count.load(SeqCst), 1);
811        assert_eq!(typing_count.load(SeqCst), 1);
812        assert_eq!(power_levels_count.load(SeqCst), 1);
813        assert_eq!(invited_member_count.load(SeqCst), 1);
814
815        Ok(())
816    }
817
818    #[async_test]
819    #[allow(dependency_on_unit_never_type_fallback)]
820    async fn test_add_room_event_handler() -> crate::Result<()> {
821        let client = logged_in_client(None).await;
822
823        let room_id_a = room_id!("!foo:example.org");
824        let room_id_b = room_id!("!bar:matrix.org");
825
826        let member_count = Arc::new(AtomicU8::new(0));
827        let power_levels_count = Arc::new(AtomicU8::new(0));
828
829        // Room event handlers for member events in both rooms
830        client.add_room_event_handler(room_id_a, {
831            let member_count = member_count.clone();
832            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
833                member_count.fetch_add(1, SeqCst);
834                future::ready(())
835            }
836        });
837        client.add_room_event_handler(room_id_b, {
838            let member_count = member_count.clone();
839            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
840                member_count.fetch_add(1, SeqCst);
841                future::ready(())
842            }
843        });
844
845        // Power levels event handlers for member events in room A
846        client.add_room_event_handler(room_id_a, {
847            let power_levels_count = power_levels_count.clone();
848            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
849                power_levels_count.fetch_add(1, SeqCst);
850                future::ready(())
851            }
852        });
853
854        // Room name event handler for room name events in room B
855        client.add_room_event_handler(room_id_b, move |_ev: OriginalSyncRoomNameEvent| async {
856            unreachable!("No room event in room B")
857        });
858
859        let response = SyncResponseBuilder::default()
860            .add_joined_room(
861                JoinedRoomBuilder::new(room_id_a)
862                    .add_timeline_event(MEMBER_EVENT.clone())
863                    .add_state_event(StateTestEvent::PowerLevels)
864                    .add_state_event(StateTestEvent::RoomName),
865            )
866            .add_joined_room(
867                JoinedRoomBuilder::new(room_id_b)
868                    .add_timeline_event(MEMBER_EVENT.clone())
869                    .add_state_event(StateTestEvent::PowerLevels),
870            )
871            .build_sync_response();
872        client.process_sync(response).await?;
873
874        assert_eq!(member_count.load(SeqCst), 2);
875        assert_eq!(power_levels_count.load(SeqCst), 1);
876
877        Ok(())
878    }
879
880    #[async_test]
881    #[allow(dependency_on_unit_never_type_fallback)]
882    async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
883        let client = logged_in_client(None).await;
884
885        client.add_event_handler(
886            |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
887        );
888
889        // If it compiles, it works. No need to assert anything.
890
891        Ok(())
892    }
893
894    #[async_test]
895    #[allow(dependency_on_unit_never_type_fallback)]
896    async fn test_remove_event_handler() -> crate::Result<()> {
897        let client = logged_in_client(None).await;
898
899        let member_count = Arc::new(AtomicU8::new(0));
900
901        client.add_event_handler({
902            let member_count = member_count.clone();
903            move |_ev: OriginalSyncRoomMemberEvent| async move {
904                member_count.fetch_add(1, SeqCst);
905            }
906        });
907
908        let handle_a = client.add_event_handler(move |_ev: OriginalSyncRoomMemberEvent| async {
909            panic!("handler should have been removed");
910        });
911        let handle_b = client.add_room_event_handler(
912            #[allow(unknown_lints, clippy::explicit_auto_deref)] // lint is buggy
913            *DEFAULT_TEST_ROOM_ID,
914            move |_ev: OriginalSyncRoomMemberEvent| async {
915                panic!("handler should have been removed");
916            },
917        );
918
919        client.add_event_handler({
920            let member_count = member_count.clone();
921            move |_ev: OriginalSyncRoomMemberEvent| async move {
922                member_count.fetch_add(1, SeqCst);
923            }
924        });
925
926        let response = SyncResponseBuilder::default()
927            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
928            .build_sync_response();
929
930        client.remove_event_handler(handle_a);
931        client.remove_event_handler(handle_b);
932
933        client.process_sync(response).await?;
934
935        assert_eq!(member_count.load(SeqCst), 2);
936
937        Ok(())
938    }
939
940    #[async_test]
941    async fn test_event_handler_drop_guard() {
942        let client = no_retry_test_client(None).await;
943
944        let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
945        assert_eq!(client.inner.event_handlers.len(), 1);
946
947        {
948            let _guard = client.event_handler_drop_guard(handle);
949            assert_eq!(client.inner.event_handlers.len(), 1);
950            // guard dropped here
951        }
952
953        assert_eq!(client.inner.event_handlers.len(), 0);
954    }
955
956    #[async_test]
957    async fn test_use_client_in_handler() {
958        // This used to not work because we were requiring `Send` of event
959        // handler futures even on WASM, where practically all futures that do
960        // I/O aren't.
961        let client = no_retry_test_client(None).await;
962
963        client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
964            // All of Client's async methods that do network requests (and
965            // possibly some that don't) are `!Send` on wasm. We obviously want
966            // to be able to use them in event handlers.
967            let _caps = client.get_capabilities().await?;
968            anyhow::Ok(())
969        });
970    }
971
972    #[async_test]
973    async fn test_raw_event_handler() -> crate::Result<()> {
974        let client = logged_in_client(None).await;
975        let counter = Arc::new(AtomicU8::new(0));
976        client.add_event_handler_context(counter.clone());
977        client.add_event_handler(
978            |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
979                counter.fetch_add(1, SeqCst);
980            },
981        );
982
983        let response = SyncResponseBuilder::default()
984            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
985            .build_sync_response();
986        client.process_sync(response).await?;
987
988        assert_eq!(counter.load(SeqCst), 1);
989        Ok(())
990    }
991
992    #[async_test]
993    async fn test_enum_event_handler() -> crate::Result<()> {
994        let client = logged_in_client(None).await;
995        let counter = Arc::new(AtomicU8::new(0));
996        client.add_event_handler_context(counter.clone());
997        client.add_event_handler(
998            |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
999                counter.fetch_add(1, SeqCst);
1000            },
1001        );
1002
1003        let response = SyncResponseBuilder::default()
1004            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1005            .build_sync_response();
1006        client.process_sync(response).await?;
1007
1008        assert_eq!(counter.load(SeqCst), 1);
1009        Ok(())
1010    }
1011
1012    #[async_test]
1013    #[allow(dependency_on_unit_never_type_fallback)]
1014    async fn test_observe_events() -> crate::Result<()> {
1015        let client = logged_in_client(None).await;
1016
1017        let room_id_0 = room_id!("!r0.matrix.org");
1018        let room_id_1 = room_id!("!r1.matrix.org");
1019
1020        let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1021
1022        let mut subscriber = observable.subscribe();
1023
1024        assert_pending!(subscriber);
1025
1026        let mut response_builder = SyncResponseBuilder::new();
1027        let response = response_builder
1028            .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1029                StateTestEvent::Custom(json!({
1030                    "content": {
1031                        "name": "Name 0"
1032                    },
1033                    "event_id": "$ev0",
1034                    "origin_server_ts": 1,
1035                    "sender": "@mnt_io:matrix.org",
1036                    "state_key": "",
1037                    "type": "m.room.name",
1038                    "unsigned": {
1039                        "age": 1,
1040                    }
1041                })),
1042            ))
1043            .build_sync_response();
1044        client.process_sync(response).await?;
1045
1046        let (room_name, room) = assert_ready!(subscriber);
1047
1048        assert_eq!(room_name.event_id.as_str(), "$ev0");
1049        assert_eq!(room.room_id(), room_id_0);
1050        assert_eq!(room.name().unwrap(), "Name 0");
1051
1052        assert_pending!(subscriber);
1053
1054        let response = response_builder
1055            .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1056                StateTestEvent::Custom(json!({
1057                    "content": {
1058                        "name": "Name 1"
1059                    },
1060                    "event_id": "$ev1",
1061                    "origin_server_ts": 2,
1062                    "sender": "@mnt_io:matrix.org",
1063                    "state_key": "",
1064                    "type": "m.room.name",
1065                    "unsigned": {
1066                        "age": 2,
1067                    }
1068                })),
1069            ))
1070            .build_sync_response();
1071        client.process_sync(response).await?;
1072
1073        let (room_name, room) = assert_ready!(subscriber);
1074
1075        assert_eq!(room_name.event_id.as_str(), "$ev1");
1076        assert_eq!(room.room_id(), room_id_1);
1077        assert_eq!(room.name().unwrap(), "Name 1");
1078
1079        assert_pending!(subscriber);
1080
1081        drop(observable);
1082        assert_closed!(subscriber);
1083
1084        Ok(())
1085    }
1086
1087    #[async_test]
1088    #[allow(dependency_on_unit_never_type_fallback)]
1089    async fn test_observe_room_events() -> crate::Result<()> {
1090        let client = logged_in_client(None).await;
1091
1092        let room_id = room_id!("!r0.matrix.org");
1093
1094        let observable_for_room =
1095            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1096
1097        let mut subscriber_for_room = observable_for_room.subscribe();
1098
1099        assert_pending!(subscriber_for_room);
1100
1101        let mut response_builder = SyncResponseBuilder::new();
1102        let response = response_builder
1103            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1104                StateTestEvent::Custom(json!({
1105                    "content": {
1106                        "name": "Name 0"
1107                    },
1108                    "event_id": "$ev0",
1109                    "origin_server_ts": 1,
1110                    "sender": "@mnt_io:matrix.org",
1111                    "state_key": "",
1112                    "type": "m.room.name",
1113                    "unsigned": {
1114                        "age": 1,
1115                    }
1116                })),
1117            ))
1118            .build_sync_response();
1119        client.process_sync(response).await?;
1120
1121        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1122
1123        assert_eq!(room_name.event_id.as_str(), "$ev0");
1124        assert_eq!(room.name().unwrap(), "Name 0");
1125
1126        assert_pending!(subscriber_for_room);
1127
1128        let response = response_builder
1129            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1130                StateTestEvent::Custom(json!({
1131                    "content": {
1132                        "name": "Name 1"
1133                    },
1134                    "event_id": "$ev1",
1135                    "origin_server_ts": 2,
1136                    "sender": "@mnt_io:matrix.org",
1137                    "state_key": "",
1138                    "type": "m.room.name",
1139                    "unsigned": {
1140                        "age": 2,
1141                    }
1142                })),
1143            ))
1144            .build_sync_response();
1145        client.process_sync(response).await?;
1146
1147        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1148
1149        assert_eq!(room_name.event_id.as_str(), "$ev1");
1150        assert_eq!(room.name().unwrap(), "Name 1");
1151
1152        assert_pending!(subscriber_for_room);
1153
1154        drop(observable_for_room);
1155        assert_closed!(subscriber_for_room);
1156
1157        Ok(())
1158    }
1159
1160    #[async_test]
1161    async fn test_observe_several_room_events() -> crate::Result<()> {
1162        let client = logged_in_client(None).await;
1163
1164        let room_id = room_id!("!r0.matrix.org");
1165
1166        let observable_for_room =
1167            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1168
1169        let mut subscriber_for_room = observable_for_room.subscribe();
1170
1171        assert_pending!(subscriber_for_room);
1172
1173        let mut response_builder = SyncResponseBuilder::new();
1174        let response = response_builder
1175            .add_joined_room(
1176                JoinedRoomBuilder::new(room_id)
1177                    .add_state_event(StateTestEvent::Custom(json!({
1178                        "content": {
1179                            "name": "Name 0"
1180                        },
1181                        "event_id": "$ev0",
1182                        "origin_server_ts": 1,
1183                        "sender": "@mnt_io:matrix.org",
1184                        "state_key": "",
1185                        "type": "m.room.name",
1186                        "unsigned": {
1187                            "age": 1,
1188                        }
1189                    })))
1190                    .add_state_event(StateTestEvent::Custom(json!({
1191                        "content": {
1192                            "name": "Name 1"
1193                        },
1194                        "event_id": "$ev1",
1195                        "origin_server_ts": 2,
1196                        "sender": "@mnt_io:matrix.org",
1197                        "state_key": "",
1198                        "type": "m.room.name",
1199                        "unsigned": {
1200                            "age": 1,
1201                        }
1202                    })))
1203                    .add_state_event(StateTestEvent::Custom(json!({
1204                        "content": {
1205                            "name": "Name 2"
1206                        },
1207                        "event_id": "$ev2",
1208                        "origin_server_ts": 3,
1209                        "sender": "@mnt_io:matrix.org",
1210                        "state_key": "",
1211                        "type": "m.room.name",
1212                        "unsigned": {
1213                            "age": 1,
1214                        }
1215                    }))),
1216            )
1217            .build_sync_response();
1218        client.process_sync(response).await?;
1219
1220        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1221
1222        // Check we only get notified about the latest received event
1223        assert_eq!(room_name.event_id.as_str(), "$ev2");
1224        assert_eq!(room.name().unwrap(), "Name 2");
1225
1226        assert_pending!(subscriber_for_room);
1227
1228        drop(observable_for_room);
1229        assert_closed!(subscriber_for_room);
1230
1231        Ok(())
1232    }
1233}