Skip to main content

matrix_sdk/event_handler/
mod.rs

1// Copyright 2021 Jonas Platte
2// Copyright 2022 Famedly GmbH
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types and traits related for event handlers. For usage, see
17//! [`Client::add_event_handler`].
18//!
19//! ### How it works
20//!
21//! The `add_event_handler` method registers event handlers of different
22//! signatures by actually storing boxed closures that all have the same
23//! signature of `async (EventHandlerData) -> ()` where `EventHandlerData` is a
24//! private type that contains all of the data an event handler *might* need.
25//!
26//! The stored closure takes care of deserializing the event which the
27//! `EventHandlerData` contains as a (borrowed) [`serde_json::value::RawValue`],
28//! extracting the context arguments from other fields of `EventHandlerData` and
29//! calling / `.await`ing the event handler if the previous steps succeeded.
30//! It also logs any errors from the above chain of function calls.
31//!
32//! For more details, see the [`EventHandler`] trait.
33
34#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37    borrow::Cow,
38    fmt,
39    future::Future,
40    pin::Pin,
41    sync::{
42        Arc, RwLock, Weak,
43        atomic::{AtomicU64, Ordering::SeqCst},
44    },
45    task::{Context, Poll},
46};
47
48#[cfg(target_family = "wasm")]
49use anymap2::any::CloneAny;
50#[cfg(not(target_family = "wasm"))]
51use anymap2::any::CloneAnySendSync;
52use eyeball::{SharedObservable, Subscriber};
53use futures_core::Stream;
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use matrix_sdk_base::{
56    SendOutsideWasm, SyncOutsideWasm,
57    deserialized_responses::{EncryptionInfo, TimelineEvent},
58    sync::State,
59};
60use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
61use pin_project_lite::pin_project;
62use ruma::{OwnedRoomId, events::BooleanType, push::Action, serde::Raw};
63use serde::{Deserialize, de::DeserializeOwned};
64use serde_json::value::RawValue as RawJsonValue;
65use tracing::{debug, error, field::debug, instrument, warn};
66
67use self::maps::EventHandlerMaps;
68use crate::{Client, Room};
69
70mod context;
71mod maps;
72mod static_events;
73
74pub use self::context::{Ctx, EventHandlerContext, RawEvent};
75
76#[cfg(not(target_family = "wasm"))]
77type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
78#[cfg(target_family = "wasm")]
79type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
80
81#[cfg(not(target_family = "wasm"))]
82type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
83#[cfg(target_family = "wasm")]
84type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
85
86#[cfg(not(target_family = "wasm"))]
87type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
88#[cfg(target_family = "wasm")]
89type AnyMap = anymap2::Map<dyn CloneAny>;
90
91#[derive(Default)]
92pub(crate) struct EventHandlerStore {
93    handlers: RwLock<EventHandlerMaps>,
94    context: RwLock<AnyMap>,
95    counter: AtomicU64,
96}
97
98impl EventHandlerStore {
99    pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
100        self.handlers.write().unwrap().add(handle, handler_fn);
101    }
102
103    pub fn add_context<T>(&self, ctx: T)
104    where
105        T: Clone + Send + Sync + 'static,
106    {
107        self.context.write().unwrap().insert(ctx);
108    }
109
110    pub fn remove(&self, handle: EventHandlerHandle) {
111        self.handlers.write().unwrap().remove(handle);
112    }
113
114    #[cfg(test)]
115    fn len(&self) -> usize {
116        self.handlers.read().unwrap().len()
117    }
118}
119
120#[doc(hidden)]
121#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
122pub enum HandlerKind {
123    GlobalAccountData,
124    RoomAccountData,
125    EphemeralRoomData,
126    Timeline,
127    MessageLike,
128    OriginalMessageLike,
129    RedactedMessageLike,
130    State,
131    OriginalState,
132    RedactedState,
133    StrippedState,
134    ToDevice,
135    Presence,
136}
137
138impl HandlerKind {
139    fn message_like_redacted(redacted: bool) -> Self {
140        if redacted { Self::RedactedMessageLike } else { Self::OriginalMessageLike }
141    }
142
143    fn state_redacted(redacted: bool) -> Self {
144        if redacted { Self::RedactedState } else { Self::OriginalState }
145    }
146}
147
148/// A statically-known event kind/type that can be retrieved from an event sync.
149pub trait SyncEvent {
150    #[doc(hidden)]
151    const KIND: HandlerKind;
152    #[doc(hidden)]
153    const TYPE: Option<&'static str>;
154    #[doc(hidden)]
155    type IsPrefix: BooleanType;
156}
157
158pub(crate) struct EventHandlerWrapper {
159    handler_fn: Box<EventHandlerFn>,
160    pub handler_id: u64,
161}
162
163/// Handle to remove a registered event handler by passing it to
164/// [`Client::remove_event_handler`].
165#[derive(Clone, Debug)]
166pub struct EventHandlerHandle {
167    pub(crate) ev_kind: HandlerKind,
168    pub(crate) ev_type: Option<StaticEventTypePart>,
169    pub(crate) room_id: Option<OwnedRoomId>,
170    pub(crate) handler_id: u64,
171}
172
173/// The static part of an event type.
174#[derive(Clone, Copy, Debug)]
175pub(crate) enum StaticEventTypePart {
176    /// The full event type is static.
177    Full(&'static str),
178    /// Only the prefix of the event type is static.
179    Prefix(&'static str),
180}
181
182/// Interface for event handlers.
183///
184/// This trait is an abstraction for a certain kind of functions / closures,
185/// specifically:
186///
187/// * They must have at least one argument, which is the event itself, a type
188///   that implements [`SyncEvent`]. Any additional arguments need to implement
189///   the [`EventHandlerContext`] trait.
190/// * Their return type has to be one of: `()`, `Result<(), impl Display + Debug
191///   + 'static>` (if you are using `anyhow::Result` or `eyre::Result` you can
192///   additionally enable the `anyhow` / `eyre` feature to get the verbose
193///   `Debug` output printed on error)
194///
195/// ### How it works
196///
197/// This trait is basically a very constrained version of `Fn`: It requires at
198/// least one argument, which is represented as its own generic parameter `Ev`
199/// with the remaining parameter types being represented by the second generic
200/// parameter `Ctx`; they have to be stuffed into one generic parameter as a
201/// tuple because Rust doesn't have variadic generics.
202///
203/// `Ev` and `Ctx` are generic parameters rather than associated types because
204/// the argument list is a generic parameter for the `Fn` traits too, so a
205/// single type could implement `Fn` multiple times with different argument
206/// lists¹. Luckily, when calling [`Client::add_event_handler`] with a
207/// closure argument the trait solver takes into account that only a single one
208/// of the implementations applies (even though this could theoretically change
209/// through a dependency upgrade) and uses that rather than raising an ambiguity
210/// error. This is the same trick used by web frameworks like actix-web and
211/// axum.
212///
213/// ¹ the only thing stopping such types from existing in stable Rust is that
214/// all manual implementations of the `Fn` traits require a Nightly feature
215pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
216    /// The future returned by `handle_event`.
217    #[doc(hidden)]
218    type Future: EventHandlerFuture;
219
220    /// Create a future for handling the given event.
221    ///
222    /// `data` provides additional data about the event, for example the room it
223    /// appeared in.
224    ///
225    /// Returns `None` if one of the context extractors failed.
226    #[doc(hidden)]
227    fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
228}
229
230#[doc(hidden)]
231pub trait EventHandlerFuture:
232    Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
233{
234    type Output: EventHandlerResult;
235}
236
237impl<T> EventHandlerFuture for T
238where
239    T: Future + SendOutsideWasm + 'static,
240    <T as Future>::Output: EventHandlerResult,
241{
242    type Output = <T as Future>::Output;
243}
244
245#[doc(hidden)]
246#[derive(Debug)]
247pub struct EventHandlerData<'a> {
248    client: Client,
249    room: Option<Room>,
250    raw: &'a RawJsonValue,
251    encryption_info: Option<&'a EncryptionInfo>,
252    push_actions: &'a [Action],
253    handle: EventHandlerHandle,
254}
255
256/// Return types supported for event handlers implement this trait.
257///
258/// It is not meant to be implemented outside of matrix-sdk.
259pub trait EventHandlerResult: Sized {
260    #[doc(hidden)]
261    fn print_error(&self, event_type: Option<&str>);
262}
263
264impl EventHandlerResult for () {
265    fn print_error(&self, _event_type: Option<&str>) {}
266}
267
268impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
269    fn print_error(&self, event_type: Option<&str>) {
270        let msg_fragment = match event_type {
271            Some(event_type) => format!(" for `{event_type}`"),
272            None => "".to_owned(),
273        };
274
275        match self {
276            #[cfg(feature = "anyhow")]
277            Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
278                error!("Event handler{msg_fragment} failed: {e:?}");
279            }
280            #[cfg(feature = "eyre")]
281            Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
282                error!("Event handler{msg_fragment} failed: {e:?}");
283            }
284            Err(e) => {
285                error!("Event handler{msg_fragment} failed: {e}");
286            }
287            Ok(_) => {}
288        }
289    }
290}
291
292#[derive(Deserialize)]
293struct UnsignedDetails {
294    redacted_because: Option<serde::de::IgnoredAny>,
295}
296
297/// Event handling internals.
298impl Client {
299    pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
300        &self,
301        handler: H,
302        room_id: Option<OwnedRoomId>,
303    ) -> EventHandlerHandle
304    where
305        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
306        H: EventHandler<Ev, Ctx>,
307    {
308        let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
309            let maybe_fut = serde_json::from_str(data.raw.get())
310                .map(|ev| handler.clone().handle_event(ev, data));
311
312            Box::pin(async move {
313                match maybe_fut {
314                    Ok(Some(fut)) => {
315                        fut.await.print_error(Ev::TYPE);
316                    }
317                    Ok(None) => {
318                        error!(
319                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
320                            "Event handler has an invalid context argument",
321                        );
322                    }
323                    Err(e) => {
324                        warn!(
325                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
326                            "Failed to deserialize event, skipping event handler.\n
327                             Deserialization error: {e}",
328                        );
329                    }
330                }
331            })
332        });
333
334        let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
335        let ev_type = Ev::TYPE.map(|ev_type| {
336            if Ev::IsPrefix::as_bool() {
337                StaticEventTypePart::Prefix(ev_type)
338            } else {
339                StaticEventTypePart::Full(ev_type)
340            }
341        });
342        let handle = EventHandlerHandle { ev_kind: Ev::KIND, ev_type, room_id, handler_id };
343
344        self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
345
346        handle
347    }
348
349    pub(crate) async fn handle_sync_events<T>(
350        &self,
351        kind: HandlerKind,
352        room: Option<&Room>,
353        events: &[Raw<T>],
354    ) -> serde_json::Result<()> {
355        #[derive(Deserialize)]
356        struct ExtractType<'a> {
357            #[serde(borrow, rename = "type")]
358            event_type: Cow<'a, str>,
359        }
360
361        for raw_event in events {
362            let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
363            self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
364        }
365
366        Ok(())
367    }
368
369    pub(crate) async fn handle_sync_to_device_events(
370        &self,
371        events: &[ProcessedToDeviceEvent],
372    ) -> serde_json::Result<()> {
373        #[derive(Deserialize)]
374        struct ExtractType<'a> {
375            #[serde(borrow, rename = "type")]
376            event_type: Cow<'a, str>,
377        }
378
379        for processed_to_device in events {
380            let (raw_event, encryption_info) = match processed_to_device {
381                ProcessedToDeviceEvent::Decrypted { raw, encryption_info } => {
382                    (raw, Some(encryption_info))
383                }
384                other => (&other.to_raw(), None),
385            };
386            let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
387            self.call_event_handlers(
388                None,
389                raw_event.json(),
390                HandlerKind::ToDevice,
391                &event_type,
392                encryption_info,
393                &[],
394            )
395            .await;
396        }
397
398        Ok(())
399    }
400
401    pub(crate) async fn handle_sync_state_events(
402        &self,
403        room: Option<&Room>,
404        state: &State,
405    ) -> serde_json::Result<()> {
406        #[derive(Deserialize)]
407        struct StateEventDetails<'a> {
408            #[serde(borrow, rename = "type")]
409            event_type: Cow<'a, str>,
410            unsigned: Option<UnsignedDetails>,
411        }
412
413        let state_events = match state {
414            State::Before(events) => events,
415            State::After(events) => events,
416        };
417
418        // Event handlers for possibly-redacted state events
419        self.handle_sync_events(HandlerKind::State, room, state_events).await?;
420
421        // Event handlers specifically for redacted OR unredacted state events
422        for raw_event in state_events {
423            let StateEventDetails { event_type, unsigned } =
424                raw_event.deserialize_as_unchecked()?;
425            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
426            let handler_kind = HandlerKind::state_redacted(redacted);
427
428            self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
429                .await;
430        }
431
432        Ok(())
433    }
434
435    pub(crate) async fn handle_sync_timeline_events(
436        &self,
437        room: Option<&Room>,
438        timeline_events: &[TimelineEvent],
439    ) -> serde_json::Result<()> {
440        #[derive(Deserialize)]
441        struct TimelineEventDetails<'a> {
442            #[serde(borrow, rename = "type")]
443            event_type: Cow<'a, str>,
444            state_key: Option<serde::de::IgnoredAny>,
445            unsigned: Option<UnsignedDetails>,
446        }
447
448        for item in timeline_events {
449            let TimelineEventDetails { event_type, state_key, unsigned } =
450                item.raw().deserialize_as_unchecked()?;
451
452            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
453            let (handler_kind_g, handler_kind_r) = match state_key {
454                Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
455                None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
456            };
457
458            let raw_event = item.raw().json();
459            let encryption_info = item.encryption_info().map(|i| &**i);
460            let push_actions = item.push_actions().unwrap_or(&[]);
461
462            // Event handlers for possibly-redacted timeline events
463            self.call_event_handlers(
464                room,
465                raw_event,
466                handler_kind_g,
467                &event_type,
468                encryption_info,
469                push_actions,
470            )
471            .await;
472
473            // Event handlers specifically for redacted OR unredacted timeline events
474            self.call_event_handlers(
475                room,
476                raw_event,
477                handler_kind_r,
478                &event_type,
479                encryption_info,
480                push_actions,
481            )
482            .await;
483
484            // Event handlers for `AnySyncTimelineEvent`
485            let kind = HandlerKind::Timeline;
486            self.call_event_handlers(
487                room,
488                raw_event,
489                kind,
490                &event_type,
491                encryption_info,
492                push_actions,
493            )
494            .await;
495        }
496
497        Ok(())
498    }
499
500    #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
501    async fn call_event_handlers(
502        &self,
503        room: Option<&Room>,
504        raw: &RawJsonValue,
505        event_kind: HandlerKind,
506        event_type: &str,
507        encryption_info: Option<&EncryptionInfo>,
508        push_actions: &[Action],
509    ) {
510        let room_id = room.map(|r| r.room_id());
511        if let Some(room_id) = room_id {
512            tracing::Span::current().record("room_id", debug(room_id));
513        }
514
515        // Construct event handler futures
516        let mut futures: FuturesUnordered<_> = self
517            .inner
518            .event_handlers
519            .handlers
520            .read()
521            .unwrap()
522            .get_handlers(event_kind, event_type, room_id)
523            .map(|(handle, handler_fn)| {
524                let data = EventHandlerData {
525                    client: self.clone(),
526                    room: room.cloned(),
527                    raw,
528                    encryption_info,
529                    push_actions,
530                    handle,
531                };
532
533                (handler_fn)(data)
534            })
535            .collect();
536
537        if !futures.is_empty() {
538            debug!(amount = futures.len(), "Calling event handlers");
539
540            // Run the event handler futures with the `self.event_handlers.handlers`
541            // lock no longer being held.
542            while let Some(()) = futures.next().await {}
543        }
544    }
545}
546
547/// A guard type that removes an event handler when it drops (goes out of
548/// scope).
549///
550/// Created with [`Client::event_handler_drop_guard`].
551#[derive(Debug)]
552pub struct EventHandlerDropGuard {
553    handle: EventHandlerHandle,
554    client: Client,
555}
556
557impl EventHandlerDropGuard {
558    pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
559        Self { handle, client }
560    }
561}
562
563impl Drop for EventHandlerDropGuard {
564    fn drop(&mut self) {
565        self.client.remove_event_handler(self.handle.clone());
566    }
567}
568
569macro_rules! impl_event_handler {
570    ($($ty:ident),* $(,)?) => {
571        impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
572        where
573            Ev: SyncEvent,
574            Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
575            Fut: EventHandlerFuture,
576            $($ty: EventHandlerContext),*
577        {
578            type Future = Fut;
579
580            fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
581                Some((self)(ev, $($ty::from_data(&_d)?),*))
582            }
583        }
584    };
585}
586
587impl_event_handler!();
588impl_event_handler!(A);
589impl_event_handler!(A, B);
590impl_event_handler!(A, B, C);
591impl_event_handler!(A, B, C, D);
592impl_event_handler!(A, B, C, D, E);
593impl_event_handler!(A, B, C, D, E, F);
594impl_event_handler!(A, B, C, D, E, F, G);
595impl_event_handler!(A, B, C, D, E, F, G, H);
596
597/// An observer of events (may be tailored to a room).
598///
599/// Only the most recent value can be observed. Subscribers are notified when a
600/// new value is sent, but there is no guarantee that they will see all values.
601///
602/// To create such observer, use [`Client::observe_events`] or
603/// [`Client::observe_room_events`].
604#[derive(Debug)]
605pub struct ObservableEventHandler<T> {
606    /// This type is actually nothing more than a thin glue layer between the
607    /// [`EventHandler`] mechanism and the reactive programming types from
608    /// [`eyeball`]. Here, we use a [`SharedObservable`] that is updated by the
609    /// [`EventHandler`].
610    shared_observable: SharedObservable<Option<T>>,
611
612    /// This type owns the [`EventHandlerDropGuard`]. As soon as this type goes
613    /// out of scope, the event handler is unregistered/removed.
614    ///
615    /// [`EventHandlerSubscriber`] holds a weak, non-owning reference, to this
616    /// guard. It is useful to detect when to close the [`Stream`]: as soon as
617    /// this type goes out of scope, the subscriber will close itself on poll.
618    event_handler_guard: Arc<EventHandlerDropGuard>,
619}
620
621impl<T> ObservableEventHandler<T> {
622    pub(crate) fn new(
623        shared_observable: SharedObservable<Option<T>>,
624        event_handler_guard: EventHandlerDropGuard,
625    ) -> Self {
626        Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
627    }
628
629    /// Subscribe to this observer.
630    ///
631    /// It returns an [`EventHandlerSubscriber`], which implements [`Stream`].
632    /// See its documentation to learn more.
633    pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
634        EventHandlerSubscriber::new(
635            self.shared_observable.subscribe(),
636            // The subscriber holds a weak non-owning reference to the event handler guard, so that
637            // it can detect when this observer is dropped, and can close the subscriber's stream.
638            Arc::downgrade(&self.event_handler_guard),
639        )
640    }
641}
642
643pin_project! {
644    /// The subscriber of an [`ObservableEventHandler`].
645    ///
646    /// To create such subscriber, use [`ObservableEventHandler::subscribe`].
647    ///
648    /// This type implements [`Stream`], which means it is possible to poll the
649    /// next value asynchronously. In other terms, polling this type will return
650    /// the new event as soon as they are synced. See [`Client::observe_events`]
651    /// to learn more.
652    #[derive(Debug)]
653    pub struct EventHandlerSubscriber<T> {
654        // The `Subscriber` associated to the `SharedObservable` inside
655        // `ObservableEventHandle`.
656        //
657        // Keep in mind all this API is just a thin glue layer between
658        // `EventHandle` and `SharedObservable`, that's… maagiic!
659        #[pin]
660        subscriber: Subscriber<Option<T>>,
661
662        // A weak non-owning reference to the event handler guard from
663        // `ObservableEventHandler`. When this type is polled (via its `Stream`
664        // implementation), it is possible to detect whether the observable has
665        // been dropped by upgrading this weak reference, and close the `Stream`
666        // if it needs to.
667        event_handler_guard: Weak<EventHandlerDropGuard>,
668    }
669}
670
671impl<T> EventHandlerSubscriber<T> {
672    fn new(
673        subscriber: Subscriber<Option<T>>,
674        event_handler_handle: Weak<EventHandlerDropGuard>,
675    ) -> Self {
676        Self { subscriber, event_handler_guard: event_handler_handle }
677    }
678}
679
680impl<T> Stream for EventHandlerSubscriber<T>
681where
682    T: Clone,
683{
684    type Item = T;
685
686    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
687        let mut this = self.project();
688
689        let Some(_) = this.event_handler_guard.upgrade() else {
690            // The `EventHandlerHandle` has been dropped via `EventHandlerDropGuard`. It
691            // means the `ObservableEventHandler` has been dropped. It's time to
692            // close this stream.
693            return Poll::Ready(None);
694        };
695
696        // First off, the subscriber is of type `Subscriber<Option<T>>` because the
697        // `SharedObservable` starts with a `None` value to indicate it has no yet
698        // received any update. We want the `Stream` to return `T`, not `Option<T>`. We
699        // then filter out all `None` value.
700        //
701        // Second, when a `None` value is met, we want to poll again (hence the `loop`).
702        // At best, there is a new value to return. At worst, the subscriber will return
703        // `Poll::Pending` and will register the wakers accordingly.
704
705        loop {
706            match this.subscriber.as_mut().poll_next(context) {
707                // Stream has been closed somehow.
708                Poll::Ready(None) => return Poll::Ready(None),
709
710                // The initial value (of the `SharedObservable` behind `self.subscriber`) has been
711                // polled. We want to filter it out.
712                Poll::Ready(Some(None)) => {
713                    // Loop over.
714                    continue;
715                }
716
717                // We have a new value!
718                Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
719
720                // Classical pending.
721                Poll::Pending => return Poll::Pending,
722            }
723        }
724    }
725}
726
727#[cfg(test)]
728mod tests {
729    use matrix_sdk_test::{
730        DEFAULT_TEST_ROOM_ID, InvitedRoomBuilder, JoinedRoomBuilder, async_test,
731        event_factory::{EventFactory, PreviousMembership},
732    };
733    use serde::Serialize;
734    use stream_assert::{assert_closed, assert_pending, assert_ready};
735    #[cfg(target_family = "wasm")]
736    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
737    use std::{
738        future,
739        sync::{
740            Arc, LazyLock,
741            atomic::{AtomicU8, Ordering::SeqCst},
742        },
743    };
744
745    use assert_matches2::assert_let;
746    use matrix_sdk_common::{deserialized_responses::EncryptionInfo, locks::Mutex};
747    use matrix_sdk_test::SyncResponseBuilder;
748    use ruma::{
749        event_id,
750        events::{
751            AnySyncStateEvent, AnySyncTimelineEvent, AnyToDeviceEvent,
752            macros::EventContent,
753            room::{
754                member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
755                name::OriginalSyncRoomNameEvent,
756                power_levels::OriginalSyncRoomPowerLevelsEvent,
757            },
758            secret_storage::key::SecretStorageKeyEvent,
759            typing::SyncTypingEvent,
760        },
761        mxc_uri,
762        room::JoinRule,
763        room_id,
764        serde::Raw,
765        user_id,
766    };
767    use serde_json::json;
768
769    use crate::{
770        Client, Room,
771        event_handler::Ctx,
772        test_utils::{logged_in_client, no_retry_test_client},
773    };
774
775    static MEMBER_EVENT: LazyLock<Raw<AnySyncTimelineEvent>> = LazyLock::new(|| {
776        EventFactory::new()
777            .member(user_id!("@example:localhost"))
778            .membership(MembershipState::Join)
779            .display_name("example")
780            .event_id(event_id!("$151800140517rfvjc:localhost"))
781            .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
782            .into()
783    });
784
785    #[async_test]
786    async fn test_add_event_handler() -> crate::Result<()> {
787        let client = logged_in_client(None).await;
788
789        let member_count = Arc::new(AtomicU8::new(0));
790        let typing_count = Arc::new(AtomicU8::new(0));
791        let power_levels_count = Arc::new(AtomicU8::new(0));
792        let invited_member_count = Arc::new(AtomicU8::new(0));
793
794        client.add_event_handler({
795            let member_count = member_count.clone();
796            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
797                member_count.fetch_add(1, SeqCst);
798            }
799        });
800        client.add_event_handler({
801            let typing_count = typing_count.clone();
802            move |_ev: SyncTypingEvent| async move {
803                typing_count.fetch_add(1, SeqCst);
804            }
805        });
806        client.add_event_handler({
807            let power_levels_count = power_levels_count.clone();
808            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
809                power_levels_count.fetch_add(1, SeqCst);
810            }
811        });
812        client.add_event_handler({
813            let invited_member_count = invited_member_count.clone();
814            move |_ev: StrippedRoomMemberEvent| async move {
815                invited_member_count.fetch_add(1, SeqCst);
816            }
817        });
818
819        let f = EventFactory::new().sender(user_id!("@example:localhost"));
820        let response = SyncResponseBuilder::default()
821            .add_joined_room(
822                JoinedRoomBuilder::default()
823                    .add_timeline_event(MEMBER_EVENT.clone())
824                    .add_typing(
825                        f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
826                    )
827                    .add_state_event(f.default_power_levels()),
828            )
829            .add_invited_room(
830                InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event({
831                    let bob = user_id!("@bob:example.org");
832                    EventFactory::new()
833                        .sender(user_id!("@example:example.org"))
834                        .member(user_id!("@alice:example.org"))
835                        .membership(MembershipState::Invite)
836                        .display_name("Alice")
837                        .avatar_url(mxc_uri!("mxc://example.org/SEsfnsuifSDFSSEF"))
838                        .age(1234_i32)
839                        .invite_room_state(vec![
840                            Raw::from(f.room_name("Example Room").sender(bob)),
841                            Raw::from(f.room_join_rules(JoinRule::Invite).sender(bob)),
842                        ])
843                }),
844            )
845            .build_sync_response();
846        client.process_sync(response).await?;
847
848        assert_eq!(member_count.load(SeqCst), 1);
849        assert_eq!(typing_count.load(SeqCst), 1);
850        assert_eq!(power_levels_count.load(SeqCst), 1);
851        assert_eq!(invited_member_count.load(SeqCst), 1);
852
853        Ok(())
854    }
855
856    #[async_test]
857    async fn test_add_to_device_event_handler() -> crate::Result<()> {
858        let client = logged_in_client(None).await;
859
860        let captured_event: Arc<Mutex<Option<AnyToDeviceEvent>>> = Arc::new(Mutex::new(None));
861        let captured_info: Arc<Mutex<Option<EncryptionInfo>>> = Arc::new(Mutex::new(None));
862
863        client.add_event_handler({
864            let captured = captured_event.clone();
865            let captured_info = captured_info.clone();
866            move |ev: AnyToDeviceEvent, encryption_info: Option<EncryptionInfo>| {
867                let mut captured_lock = captured.lock();
868                *captured_lock = Some(ev);
869                let mut captured_info_lock = captured_info.lock();
870                *captured_info_lock = encryption_info;
871                future::ready(())
872            }
873        });
874
875        let response = SyncResponseBuilder::default()
876            .add_to_device_event(json!({
877              "sender": "@alice:example.com",
878              "type": "m.custom.to.device.type",
879              "content": {
880                "a": "test",
881              }
882            }))
883            .build_sync_response();
884        client.process_sync(response).await?;
885
886        let captured = captured_event.lock().clone();
887        assert_let!(Some(received_event) = captured);
888        assert_eq!(received_event.event_type().to_string(), "m.custom.to.device.type");
889        let info = captured_info.lock().clone();
890        assert!(info.is_none());
891        Ok(())
892    }
893
894    #[async_test]
895    async fn test_add_room_event_handler() -> crate::Result<()> {
896        let client = logged_in_client(None).await;
897
898        let room_id_a = room_id!("!foo:example.org");
899        let room_id_b = room_id!("!bar:matrix.org");
900
901        let member_count = Arc::new(AtomicU8::new(0));
902        let power_levels_count = Arc::new(AtomicU8::new(0));
903
904        // Room event handlers for member events in both rooms
905        client.add_room_event_handler(room_id_a, {
906            let member_count = member_count.clone();
907            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
908                member_count.fetch_add(1, SeqCst);
909                future::ready(())
910            }
911        });
912        client.add_room_event_handler(room_id_b, {
913            let member_count = member_count.clone();
914            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
915                member_count.fetch_add(1, SeqCst);
916                future::ready(())
917            }
918        });
919
920        // Power levels event handlers for member events in room A
921        client.add_room_event_handler(room_id_a, {
922            let power_levels_count = power_levels_count.clone();
923            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
924                power_levels_count.fetch_add(1, SeqCst);
925                future::ready(())
926            }
927        });
928
929        // Room name event handler for room name events in room B
930        client.add_room_event_handler(
931            room_id_b,
932            // lint is buggy: rustc wants the explicit conversion from ! to () here, but clippy
933            // thinks it's useless.
934            #[allow(clippy::unused_unit)]
935            async move |_ev: OriginalSyncRoomNameEvent| -> () {
936                unreachable!("No room event in room B")
937            },
938        );
939
940        let f = EventFactory::new().sender(user_id!("@example:localhost"));
941        let response = SyncResponseBuilder::default()
942            .add_joined_room(
943                JoinedRoomBuilder::new(room_id_a)
944                    .add_timeline_event(MEMBER_EVENT.clone())
945                    .add_state_event(f.default_power_levels())
946                    .add_state_event(f.room_name("room name")),
947            )
948            .add_joined_room(
949                JoinedRoomBuilder::new(room_id_b)
950                    .add_timeline_event(MEMBER_EVENT.clone())
951                    .add_state_event(f.default_power_levels()),
952            )
953            .build_sync_response();
954        client.process_sync(response).await?;
955
956        assert_eq!(member_count.load(SeqCst), 2);
957        assert_eq!(power_levels_count.load(SeqCst), 1);
958
959        Ok(())
960    }
961
962    #[async_test]
963    async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
964        let client = logged_in_client(None).await;
965
966        client.add_event_handler(
967            |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
968        );
969
970        // If it compiles, it works. No need to assert anything.
971
972        Ok(())
973    }
974
975    #[async_test]
976    async fn test_remove_event_handler() -> crate::Result<()> {
977        let client = logged_in_client(None).await;
978
979        let member_count = Arc::new(AtomicU8::new(0));
980
981        client.add_event_handler({
982            let member_count = member_count.clone();
983            move |_ev: OriginalSyncRoomMemberEvent| async move {
984                member_count.fetch_add(1, SeqCst);
985            }
986        });
987
988        let handle_a = client.add_event_handler(
989            // lint is buggy: rustc wants the explicit conversion from ! to () here, but clippy
990            // thinks it's useless.
991            #[allow(clippy::unused_unit)]
992            async move |_ev: OriginalSyncRoomMemberEvent| -> () {
993                panic!("handler should have been removed");
994            },
995        );
996        let handle_b = client.add_room_event_handler(
997            #[allow(unknown_lints, clippy::explicit_auto_deref)] // lint is buggy
998            *DEFAULT_TEST_ROOM_ID,
999            // lint is buggy: rustc wants the explicit conversion from ! to () here, but clippy
1000            // thinks it's useless.
1001            #[allow(clippy::unused_unit)]
1002            async move |_ev: OriginalSyncRoomMemberEvent| -> () {
1003                panic!("handler should have been removed");
1004            },
1005        );
1006
1007        client.add_event_handler({
1008            let member_count = member_count.clone();
1009            move |_ev: OriginalSyncRoomMemberEvent| async move {
1010                member_count.fetch_add(1, SeqCst);
1011            }
1012        });
1013
1014        let response = SyncResponseBuilder::default()
1015            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1016            .build_sync_response();
1017
1018        client.remove_event_handler(handle_a);
1019        client.remove_event_handler(handle_b);
1020
1021        client.process_sync(response).await?;
1022
1023        assert_eq!(member_count.load(SeqCst), 2);
1024
1025        Ok(())
1026    }
1027
1028    #[async_test]
1029    async fn test_event_handler_drop_guard() {
1030        let client = no_retry_test_client(None).await;
1031
1032        let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
1033        assert_eq!(client.inner.event_handlers.len(), 1);
1034
1035        {
1036            let _guard = client.event_handler_drop_guard(handle);
1037            assert_eq!(client.inner.event_handlers.len(), 1);
1038            // guard dropped here
1039        }
1040
1041        assert_eq!(client.inner.event_handlers.len(), 0);
1042    }
1043
1044    #[async_test]
1045    async fn test_use_client_in_handler() {
1046        // This used to not work because we were requiring `Send` of event
1047        // handler futures even on WASM, where practically all futures that do
1048        // I/O aren't.
1049        let client = no_retry_test_client(None).await;
1050
1051        client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
1052            // All of Client's async methods that do network requests (and
1053            // possibly some that don't) are `!Send` on wasm. We obviously want
1054            // to be able to use them in event handlers.
1055            let _caps = client.get_capabilities().await.map_err(|e| anyhow::anyhow!("{}", e))?;
1056            anyhow::Ok(())
1057        });
1058    }
1059
1060    #[async_test]
1061    async fn test_raw_event_handler() -> crate::Result<()> {
1062        let client = logged_in_client(None).await;
1063        let counter = Arc::new(AtomicU8::new(0));
1064        client.add_event_handler_context(counter.clone());
1065        client.add_event_handler(
1066            |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
1067                counter.fetch_add(1, SeqCst);
1068            },
1069        );
1070
1071        let response = SyncResponseBuilder::default()
1072            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1073            .build_sync_response();
1074        client.process_sync(response).await?;
1075
1076        assert_eq!(counter.load(SeqCst), 1);
1077        Ok(())
1078    }
1079
1080    #[async_test]
1081    async fn test_enum_event_handler() -> crate::Result<()> {
1082        let client = logged_in_client(None).await;
1083        let counter = Arc::new(AtomicU8::new(0));
1084        client.add_event_handler_context(counter.clone());
1085        client.add_event_handler(
1086            |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
1087                counter.fetch_add(1, SeqCst);
1088            },
1089        );
1090
1091        let response = SyncResponseBuilder::default()
1092            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1093            .build_sync_response();
1094        client.process_sync(response).await?;
1095
1096        assert_eq!(counter.load(SeqCst), 1);
1097        Ok(())
1098    }
1099
1100    #[async_test]
1101    async fn test_observe_events() -> crate::Result<()> {
1102        let client = logged_in_client(None).await;
1103
1104        let room_id_0 = room_id!("!r0.matrix.org");
1105        let room_id_1 = room_id!("!r1.matrix.org");
1106
1107        let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1108
1109        let mut subscriber = observable.subscribe();
1110
1111        assert_pending!(subscriber);
1112
1113        let f = EventFactory::new().sender(user_id!("@mnt_io:matrix.org"));
1114        let mut response_builder = SyncResponseBuilder::new();
1115        let response = response_builder
1116            .add_joined_room(
1117                JoinedRoomBuilder::new(room_id_0)
1118                    .add_state_event(f.room_name("Name 0").event_id(event_id!("$ev0"))),
1119            )
1120            .build_sync_response();
1121        client.process_sync(response).await?;
1122
1123        let (room_name, room) = assert_ready!(subscriber);
1124
1125        assert_eq!(room_name.event_id.as_str(), "$ev0");
1126        assert_eq!(room.room_id(), room_id_0);
1127        assert_eq!(room.name().unwrap(), "Name 0");
1128
1129        assert_pending!(subscriber);
1130
1131        let response = response_builder
1132            .add_joined_room(
1133                JoinedRoomBuilder::new(room_id_1)
1134                    .add_state_event(f.room_name("Name 1").event_id(event_id!("$ev1"))),
1135            )
1136            .build_sync_response();
1137        client.process_sync(response).await?;
1138
1139        let (room_name, room) = assert_ready!(subscriber);
1140
1141        assert_eq!(room_name.event_id.as_str(), "$ev1");
1142        assert_eq!(room.room_id(), room_id_1);
1143        assert_eq!(room.name().unwrap(), "Name 1");
1144
1145        assert_pending!(subscriber);
1146
1147        drop(observable);
1148        assert_closed!(subscriber);
1149
1150        Ok(())
1151    }
1152
1153    #[async_test]
1154    async fn test_observe_room_events() -> crate::Result<()> {
1155        let client = logged_in_client(None).await;
1156
1157        let room_id = room_id!("!r0.matrix.org");
1158
1159        let observable_for_room =
1160            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1161
1162        let mut subscriber_for_room = observable_for_room.subscribe();
1163
1164        assert_pending!(subscriber_for_room);
1165
1166        let f = EventFactory::new().sender(user_id!("@mnt_io:matrix.org"));
1167        let mut response_builder = SyncResponseBuilder::new();
1168        let response = response_builder
1169            .add_joined_room(
1170                JoinedRoomBuilder::new(room_id)
1171                    .add_state_event(f.room_name("Name 0").event_id(event_id!("$ev0"))),
1172            )
1173            .build_sync_response();
1174        client.process_sync(response).await?;
1175
1176        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1177
1178        assert_eq!(room_name.event_id.as_str(), "$ev0");
1179        assert_eq!(room.name().unwrap(), "Name 0");
1180
1181        assert_pending!(subscriber_for_room);
1182
1183        let response = response_builder
1184            .add_joined_room(
1185                JoinedRoomBuilder::new(room_id)
1186                    .add_state_event(f.room_name("Name 1").event_id(event_id!("$ev1"))),
1187            )
1188            .build_sync_response();
1189        client.process_sync(response).await?;
1190
1191        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1192
1193        assert_eq!(room_name.event_id.as_str(), "$ev1");
1194        assert_eq!(room.name().unwrap(), "Name 1");
1195
1196        assert_pending!(subscriber_for_room);
1197
1198        drop(observable_for_room);
1199        assert_closed!(subscriber_for_room);
1200
1201        Ok(())
1202    }
1203
1204    #[async_test]
1205    async fn test_observe_several_room_events() -> crate::Result<()> {
1206        let client = logged_in_client(None).await;
1207
1208        let room_id = room_id!("!r0.matrix.org");
1209
1210        let observable_for_room =
1211            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1212
1213        let mut subscriber_for_room = observable_for_room.subscribe();
1214
1215        assert_pending!(subscriber_for_room);
1216
1217        let f = EventFactory::new().sender(user_id!("@mnt_io:matrix.org"));
1218        let mut response_builder = SyncResponseBuilder::new();
1219        let response = response_builder
1220            .add_joined_room(
1221                JoinedRoomBuilder::new(room_id)
1222                    .add_state_event(f.room_name("Name 0").event_id(event_id!("$ev0")))
1223                    .add_state_event(f.room_name("Name 1").event_id(event_id!("$ev1")))
1224                    .add_state_event(f.room_name("Name 2").event_id(event_id!("$ev2"))),
1225            )
1226            .build_sync_response();
1227        client.process_sync(response).await?;
1228
1229        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1230
1231        // Check we only get notified about the latest received event
1232        assert_eq!(room_name.event_id.as_str(), "$ev2");
1233        assert_eq!(room.name().unwrap(), "Name 2");
1234
1235        assert_pending!(subscriber_for_room);
1236
1237        drop(observable_for_room);
1238        assert_closed!(subscriber_for_room);
1239
1240        Ok(())
1241    }
1242
1243    #[async_test]
1244    async fn test_observe_events_with_type_prefix() -> crate::Result<()> {
1245        let client = logged_in_client(None).await;
1246
1247        let observable = client.observe_events::<SecretStorageKeyEvent, ()>();
1248
1249        let mut subscriber = observable.subscribe();
1250
1251        assert_pending!(subscriber);
1252
1253        let mut response_builder = SyncResponseBuilder::new();
1254        let response = response_builder
1255            .add_custom_global_account_data(json!({
1256                "content": {
1257                    "algorithm": "m.secret_storage.v1.aes-hmac-sha2",
1258                    "iv": "gH2iNpiETFhApvW6/FFEJQ",
1259                    "mac": "9Lw12m5SKDipNghdQXKjgpfdj1/K7HFI2brO+UWAGoM",
1260                    "passphrase": {
1261                        "algorithm": "m.pbkdf2",
1262                        "salt": "IuLnH7S85YtZmkkBJKwNUKxWF42g9O1H",
1263                        "iterations": 10,
1264                    },
1265                },
1266                "type": "m.secret_storage.key.foobar",
1267            }))
1268            .build_sync_response();
1269        client.process_sync(response).await?;
1270
1271        let (secret_storage_key, ()) = assert_ready!(subscriber);
1272
1273        assert_eq!(secret_storage_key.content.key_id, "foobar");
1274
1275        assert_pending!(subscriber);
1276
1277        drop(observable);
1278        assert_closed!(subscriber);
1279
1280        Ok(())
1281    }
1282
1283    #[async_test]
1284    async fn test_observe_room_events_with_type_prefix() -> crate::Result<()> {
1285        // To create an event handler for a room account data event type with prefix, we
1286        // need to create a custom event type, none exist in the Matrix specification
1287        // yet.
1288        #[derive(Debug, Clone, EventContent, Serialize)]
1289        #[ruma_event(type = "fake.event.*", kind = RoomAccountData)]
1290        struct AccountDataWithPrefixEventContent {
1291            #[ruma_event(type_fragment)]
1292            #[serde(skip)]
1293            key_id: String,
1294        }
1295
1296        let room_id = room_id!("!r0.matrix.org");
1297        let client = logged_in_client(None).await;
1298
1299        let observable = client.observe_room_events::<AccountDataWithPrefixEvent, Room>(room_id);
1300
1301        let mut subscriber = observable.subscribe();
1302
1303        assert_pending!(subscriber);
1304
1305        let mut response_builder = SyncResponseBuilder::new();
1306        let response = response_builder
1307            .add_joined_room(
1308                JoinedRoomBuilder::new(room_id).add_account_data_bulk([Raw::new(&json!({
1309                    "content": {},
1310                    "type": "fake.event.foobar",
1311                }))
1312                .unwrap()
1313                .cast_unchecked()]),
1314            )
1315            .build_sync_response();
1316        client.process_sync(response).await?;
1317
1318        let (secret_storage_key, _room) = assert_ready!(subscriber);
1319
1320        assert_eq!(secret_storage_key.content.key_id, "foobar");
1321
1322        assert_pending!(subscriber);
1323
1324        drop(observable);
1325        assert_closed!(subscriber);
1326
1327        Ok(())
1328    }
1329}