matrix_sdk/event_handler/
mod.rs

1// Copyright 2021 Jonas Platte
2// Copyright 2022 Famedly GmbH
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types and traits related for event handlers. For usage, see
17//! [`Client::add_event_handler`].
18//!
19//! ### How it works
20//!
21//! The `add_event_handler` method registers event handlers of different
22//! signatures by actually storing boxed closures that all have the same
23//! signature of `async (EventHandlerData) -> ()` where `EventHandlerData` is a
24//! private type that contains all of the data an event handler *might* need.
25//!
26//! The stored closure takes care of deserializing the event which the
27//! `EventHandlerData` contains as a (borrowed) [`serde_json::value::RawValue`],
28//! extracting the context arguments from other fields of `EventHandlerData` and
29//! calling / `.await`ing the event handler if the previous steps succeeded.
30//! It also logs any errors from the above chain of function calls.
31//!
32//! For more details, see the [`EventHandler`] trait.
33
34#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37    borrow::Cow,
38    fmt,
39    future::Future,
40    pin::Pin,
41    sync::{
42        Arc, RwLock, Weak,
43        atomic::{AtomicU64, Ordering::SeqCst},
44    },
45    task::{Context, Poll},
46};
47
48#[cfg(target_family = "wasm")]
49use anymap2::any::CloneAny;
50#[cfg(not(target_family = "wasm"))]
51use anymap2::any::CloneAnySendSync;
52use eyeball::{SharedObservable, Subscriber};
53use futures_core::Stream;
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use matrix_sdk_base::{
56    SendOutsideWasm, SyncOutsideWasm,
57    deserialized_responses::{EncryptionInfo, TimelineEvent},
58    sync::State,
59};
60use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
61use pin_project_lite::pin_project;
62use ruma::{OwnedRoomId, events::BooleanType, push::Action, serde::Raw};
63use serde::{Deserialize, de::DeserializeOwned};
64use serde_json::value::RawValue as RawJsonValue;
65use tracing::{debug, error, field::debug, instrument, warn};
66
67use self::maps::EventHandlerMaps;
68use crate::{Client, Room};
69
70mod context;
71mod maps;
72mod static_events;
73
74pub use self::context::{Ctx, EventHandlerContext, RawEvent};
75
76#[cfg(not(target_family = "wasm"))]
77type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
78#[cfg(target_family = "wasm")]
79type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
80
81#[cfg(not(target_family = "wasm"))]
82type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
83#[cfg(target_family = "wasm")]
84type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
85
86#[cfg(not(target_family = "wasm"))]
87type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
88#[cfg(target_family = "wasm")]
89type AnyMap = anymap2::Map<dyn CloneAny>;
90
91#[derive(Default)]
92pub(crate) struct EventHandlerStore {
93    handlers: RwLock<EventHandlerMaps>,
94    context: RwLock<AnyMap>,
95    counter: AtomicU64,
96}
97
98impl EventHandlerStore {
99    pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
100        self.handlers.write().unwrap().add(handle, handler_fn);
101    }
102
103    pub fn add_context<T>(&self, ctx: T)
104    where
105        T: Clone + Send + Sync + 'static,
106    {
107        self.context.write().unwrap().insert(ctx);
108    }
109
110    pub fn remove(&self, handle: EventHandlerHandle) {
111        self.handlers.write().unwrap().remove(handle);
112    }
113
114    #[cfg(test)]
115    fn len(&self) -> usize {
116        self.handlers.read().unwrap().len()
117    }
118}
119
120#[doc(hidden)]
121#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
122pub enum HandlerKind {
123    GlobalAccountData,
124    RoomAccountData,
125    EphemeralRoomData,
126    Timeline,
127    MessageLike,
128    OriginalMessageLike,
129    RedactedMessageLike,
130    State,
131    OriginalState,
132    RedactedState,
133    StrippedState,
134    ToDevice,
135    Presence,
136}
137
138impl HandlerKind {
139    fn message_like_redacted(redacted: bool) -> Self {
140        if redacted { Self::RedactedMessageLike } else { Self::OriginalMessageLike }
141    }
142
143    fn state_redacted(redacted: bool) -> Self {
144        if redacted { Self::RedactedState } else { Self::OriginalState }
145    }
146}
147
148/// A statically-known event kind/type that can be retrieved from an event sync.
149pub trait SyncEvent {
150    #[doc(hidden)]
151    const KIND: HandlerKind;
152    #[doc(hidden)]
153    const TYPE: Option<&'static str>;
154    #[doc(hidden)]
155    type IsPrefix: BooleanType;
156}
157
158pub(crate) struct EventHandlerWrapper {
159    handler_fn: Box<EventHandlerFn>,
160    pub handler_id: u64,
161}
162
163/// Handle to remove a registered event handler by passing it to
164/// [`Client::remove_event_handler`].
165#[derive(Clone, Debug)]
166pub struct EventHandlerHandle {
167    pub(crate) ev_kind: HandlerKind,
168    pub(crate) ev_type: Option<StaticEventTypePart>,
169    pub(crate) room_id: Option<OwnedRoomId>,
170    pub(crate) handler_id: u64,
171}
172
173/// The static part of an event type.
174#[derive(Clone, Copy, Debug)]
175pub(crate) enum StaticEventTypePart {
176    /// The full event type is static.
177    Full(&'static str),
178    /// Only the prefix of the event type is static.
179    Prefix(&'static str),
180}
181
182/// Interface for event handlers.
183///
184/// This trait is an abstraction for a certain kind of functions / closures,
185/// specifically:
186///
187/// * They must have at least one argument, which is the event itself, a type
188///   that implements [`SyncEvent`]. Any additional arguments need to implement
189///   the [`EventHandlerContext`] trait.
190/// * Their return type has to be one of: `()`, `Result<(), impl Display + Debug
191///   + 'static>` (if you are using `anyhow::Result` or `eyre::Result` you can
192///   additionally enable the `anyhow` / `eyre` feature to get the verbose
193///   `Debug` output printed on error)
194///
195/// ### How it works
196///
197/// This trait is basically a very constrained version of `Fn`: It requires at
198/// least one argument, which is represented as its own generic parameter `Ev`
199/// with the remaining parameter types being represented by the second generic
200/// parameter `Ctx`; they have to be stuffed into one generic parameter as a
201/// tuple because Rust doesn't have variadic generics.
202///
203/// `Ev` and `Ctx` are generic parameters rather than associated types because
204/// the argument list is a generic parameter for the `Fn` traits too, so a
205/// single type could implement `Fn` multiple times with different argument
206/// lists¹. Luckily, when calling [`Client::add_event_handler`] with a
207/// closure argument the trait solver takes into account that only a single one
208/// of the implementations applies (even though this could theoretically change
209/// through a dependency upgrade) and uses that rather than raising an ambiguity
210/// error. This is the same trick used by web frameworks like actix-web and
211/// axum.
212///
213/// ¹ the only thing stopping such types from existing in stable Rust is that
214/// all manual implementations of the `Fn` traits require a Nightly feature
215pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
216    /// The future returned by `handle_event`.
217    #[doc(hidden)]
218    type Future: EventHandlerFuture;
219
220    /// Create a future for handling the given event.
221    ///
222    /// `data` provides additional data about the event, for example the room it
223    /// appeared in.
224    ///
225    /// Returns `None` if one of the context extractors failed.
226    #[doc(hidden)]
227    fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
228}
229
230#[doc(hidden)]
231pub trait EventHandlerFuture:
232    Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
233{
234    type Output: EventHandlerResult;
235}
236
237impl<T> EventHandlerFuture for T
238where
239    T: Future + SendOutsideWasm + 'static,
240    <T as Future>::Output: EventHandlerResult,
241{
242    type Output = <T as Future>::Output;
243}
244
245#[doc(hidden)]
246#[derive(Debug)]
247pub struct EventHandlerData<'a> {
248    client: Client,
249    room: Option<Room>,
250    raw: &'a RawJsonValue,
251    encryption_info: Option<&'a EncryptionInfo>,
252    push_actions: &'a [Action],
253    handle: EventHandlerHandle,
254}
255
256/// Return types supported for event handlers implement this trait.
257///
258/// It is not meant to be implemented outside of matrix-sdk.
259pub trait EventHandlerResult: Sized {
260    #[doc(hidden)]
261    fn print_error(&self, event_type: Option<&str>);
262}
263
264impl EventHandlerResult for () {
265    fn print_error(&self, _event_type: Option<&str>) {}
266}
267
268impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
269    fn print_error(&self, event_type: Option<&str>) {
270        let msg_fragment = match event_type {
271            Some(event_type) => format!(" for `{event_type}`"),
272            None => "".to_owned(),
273        };
274
275        match self {
276            #[cfg(feature = "anyhow")]
277            Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
278                error!("Event handler{msg_fragment} failed: {e:?}");
279            }
280            #[cfg(feature = "eyre")]
281            Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
282                error!("Event handler{msg_fragment} failed: {e:?}");
283            }
284            Err(e) => {
285                error!("Event handler{msg_fragment} failed: {e}");
286            }
287            Ok(_) => {}
288        }
289    }
290}
291
292#[derive(Deserialize)]
293struct UnsignedDetails {
294    redacted_because: Option<serde::de::IgnoredAny>,
295}
296
297/// Event handling internals.
298impl Client {
299    pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
300        &self,
301        handler: H,
302        room_id: Option<OwnedRoomId>,
303    ) -> EventHandlerHandle
304    where
305        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
306        H: EventHandler<Ev, Ctx>,
307    {
308        let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
309            let maybe_fut = serde_json::from_str(data.raw.get())
310                .map(|ev| handler.clone().handle_event(ev, data));
311
312            Box::pin(async move {
313                match maybe_fut {
314                    Ok(Some(fut)) => {
315                        fut.await.print_error(Ev::TYPE);
316                    }
317                    Ok(None) => {
318                        error!(
319                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
320                            "Event handler has an invalid context argument",
321                        );
322                    }
323                    Err(e) => {
324                        warn!(
325                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
326                            "Failed to deserialize event, skipping event handler.\n
327                             Deserialization error: {e}",
328                        );
329                    }
330                }
331            })
332        });
333
334        let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
335        let ev_type = Ev::TYPE.map(|ev_type| {
336            if Ev::IsPrefix::as_bool() {
337                StaticEventTypePart::Prefix(ev_type)
338            } else {
339                StaticEventTypePart::Full(ev_type)
340            }
341        });
342        let handle = EventHandlerHandle { ev_kind: Ev::KIND, ev_type, room_id, handler_id };
343
344        self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
345
346        handle
347    }
348
349    pub(crate) async fn handle_sync_events<T>(
350        &self,
351        kind: HandlerKind,
352        room: Option<&Room>,
353        events: &[Raw<T>],
354    ) -> serde_json::Result<()> {
355        #[derive(Deserialize)]
356        struct ExtractType<'a> {
357            #[serde(borrow, rename = "type")]
358            event_type: Cow<'a, str>,
359        }
360
361        for raw_event in events {
362            let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
363            self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
364        }
365
366        Ok(())
367    }
368
369    pub(crate) async fn handle_sync_to_device_events(
370        &self,
371        events: &[ProcessedToDeviceEvent],
372    ) -> serde_json::Result<()> {
373        #[derive(Deserialize)]
374        struct ExtractType<'a> {
375            #[serde(borrow, rename = "type")]
376            event_type: Cow<'a, str>,
377        }
378
379        for processed_to_device in events {
380            let (raw_event, encryption_info) = match processed_to_device {
381                ProcessedToDeviceEvent::Decrypted { raw, encryption_info } => {
382                    (raw, Some(encryption_info))
383                }
384                other => (&other.to_raw(), None),
385            };
386            let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
387            self.call_event_handlers(
388                None,
389                raw_event.json(),
390                HandlerKind::ToDevice,
391                &event_type,
392                encryption_info,
393                &[],
394            )
395            .await;
396        }
397
398        Ok(())
399    }
400
401    pub(crate) async fn handle_sync_state_events(
402        &self,
403        room: Option<&Room>,
404        state: &State,
405    ) -> serde_json::Result<()> {
406        #[derive(Deserialize)]
407        struct StateEventDetails<'a> {
408            #[serde(borrow, rename = "type")]
409            event_type: Cow<'a, str>,
410            unsigned: Option<UnsignedDetails>,
411        }
412
413        let state_events = match state {
414            State::Before(events) => events,
415            State::After(events) => events,
416        };
417
418        // Event handlers for possibly-redacted state events
419        self.handle_sync_events(HandlerKind::State, room, state_events).await?;
420
421        // Event handlers specifically for redacted OR unredacted state events
422        for raw_event in state_events {
423            let StateEventDetails { event_type, unsigned } =
424                raw_event.deserialize_as_unchecked()?;
425            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
426            let handler_kind = HandlerKind::state_redacted(redacted);
427
428            self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
429                .await;
430        }
431
432        Ok(())
433    }
434
435    pub(crate) async fn handle_sync_timeline_events(
436        &self,
437        room: Option<&Room>,
438        timeline_events: &[TimelineEvent],
439    ) -> serde_json::Result<()> {
440        #[derive(Deserialize)]
441        struct TimelineEventDetails<'a> {
442            #[serde(borrow, rename = "type")]
443            event_type: Cow<'a, str>,
444            state_key: Option<serde::de::IgnoredAny>,
445            unsigned: Option<UnsignedDetails>,
446        }
447
448        for item in timeline_events {
449            let TimelineEventDetails { event_type, state_key, unsigned } =
450                item.raw().deserialize_as_unchecked()?;
451
452            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
453            let (handler_kind_g, handler_kind_r) = match state_key {
454                Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
455                None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
456            };
457
458            let raw_event = item.raw().json();
459            let encryption_info = item.encryption_info().map(|i| &**i);
460            let push_actions = item.push_actions().unwrap_or(&[]);
461
462            // Event handlers for possibly-redacted timeline events
463            self.call_event_handlers(
464                room,
465                raw_event,
466                handler_kind_g,
467                &event_type,
468                encryption_info,
469                push_actions,
470            )
471            .await;
472
473            // Event handlers specifically for redacted OR unredacted timeline events
474            self.call_event_handlers(
475                room,
476                raw_event,
477                handler_kind_r,
478                &event_type,
479                encryption_info,
480                push_actions,
481            )
482            .await;
483
484            // Event handlers for `AnySyncTimelineEvent`
485            let kind = HandlerKind::Timeline;
486            self.call_event_handlers(
487                room,
488                raw_event,
489                kind,
490                &event_type,
491                encryption_info,
492                push_actions,
493            )
494            .await;
495        }
496
497        Ok(())
498    }
499
500    #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
501    async fn call_event_handlers(
502        &self,
503        room: Option<&Room>,
504        raw: &RawJsonValue,
505        event_kind: HandlerKind,
506        event_type: &str,
507        encryption_info: Option<&EncryptionInfo>,
508        push_actions: &[Action],
509    ) {
510        let room_id = room.map(|r| r.room_id());
511        if let Some(room_id) = room_id {
512            tracing::Span::current().record("room_id", debug(room_id));
513        }
514
515        // Construct event handler futures
516        let mut futures: FuturesUnordered<_> = self
517            .inner
518            .event_handlers
519            .handlers
520            .read()
521            .unwrap()
522            .get_handlers(event_kind, event_type, room_id)
523            .map(|(handle, handler_fn)| {
524                let data = EventHandlerData {
525                    client: self.clone(),
526                    room: room.cloned(),
527                    raw,
528                    encryption_info,
529                    push_actions,
530                    handle,
531                };
532
533                (handler_fn)(data)
534            })
535            .collect();
536
537        if !futures.is_empty() {
538            debug!(amount = futures.len(), "Calling event handlers");
539
540            // Run the event handler futures with the `self.event_handlers.handlers`
541            // lock no longer being held.
542            while let Some(()) = futures.next().await {}
543        }
544    }
545}
546
547/// A guard type that removes an event handler when it drops (goes out of
548/// scope).
549///
550/// Created with [`Client::event_handler_drop_guard`].
551#[derive(Debug)]
552pub struct EventHandlerDropGuard {
553    handle: EventHandlerHandle,
554    client: Client,
555}
556
557impl EventHandlerDropGuard {
558    pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
559        Self { handle, client }
560    }
561}
562
563impl Drop for EventHandlerDropGuard {
564    fn drop(&mut self) {
565        self.client.remove_event_handler(self.handle.clone());
566    }
567}
568
569macro_rules! impl_event_handler {
570    ($($ty:ident),* $(,)?) => {
571        impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
572        where
573            Ev: SyncEvent,
574            Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
575            Fut: EventHandlerFuture,
576            $($ty: EventHandlerContext),*
577        {
578            type Future = Fut;
579
580            fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
581                Some((self)(ev, $($ty::from_data(&_d)?),*))
582            }
583        }
584    };
585}
586
587impl_event_handler!();
588impl_event_handler!(A);
589impl_event_handler!(A, B);
590impl_event_handler!(A, B, C);
591impl_event_handler!(A, B, C, D);
592impl_event_handler!(A, B, C, D, E);
593impl_event_handler!(A, B, C, D, E, F);
594impl_event_handler!(A, B, C, D, E, F, G);
595impl_event_handler!(A, B, C, D, E, F, G, H);
596
597/// An observer of events (may be tailored to a room).
598///
599/// Only the most recent value can be observed. Subscribers are notified when a
600/// new value is sent, but there is no guarantee that they will see all values.
601///
602/// To create such observer, use [`Client::observe_events`] or
603/// [`Client::observe_room_events`].
604#[derive(Debug)]
605pub struct ObservableEventHandler<T> {
606    /// This type is actually nothing more than a thin glue layer between the
607    /// [`EventHandler`] mechanism and the reactive programming types from
608    /// [`eyeball`]. Here, we use a [`SharedObservable`] that is updated by the
609    /// [`EventHandler`].
610    shared_observable: SharedObservable<Option<T>>,
611
612    /// This type owns the [`EventHandlerDropGuard`]. As soon as this type goes
613    /// out of scope, the event handler is unregistered/removed.
614    ///
615    /// [`EventHandlerSubscriber`] holds a weak, non-owning reference, to this
616    /// guard. It is useful to detect when to close the [`Stream`]: as soon as
617    /// this type goes out of scope, the subscriber will close itself on poll.
618    event_handler_guard: Arc<EventHandlerDropGuard>,
619}
620
621impl<T> ObservableEventHandler<T> {
622    pub(crate) fn new(
623        shared_observable: SharedObservable<Option<T>>,
624        event_handler_guard: EventHandlerDropGuard,
625    ) -> Self {
626        Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
627    }
628
629    /// Subscribe to this observer.
630    ///
631    /// It returns an [`EventHandlerSubscriber`], which implements [`Stream`].
632    /// See its documentation to learn more.
633    pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
634        EventHandlerSubscriber::new(
635            self.shared_observable.subscribe(),
636            // The subscriber holds a weak non-owning reference to the event handler guard, so that
637            // it can detect when this observer is dropped, and can close the subscriber's stream.
638            Arc::downgrade(&self.event_handler_guard),
639        )
640    }
641}
642
643pin_project! {
644    /// The subscriber of an [`ObservableEventHandler`].
645    ///
646    /// To create such subscriber, use [`ObservableEventHandler::subscribe`].
647    ///
648    /// This type implements [`Stream`], which means it is possible to poll the
649    /// next value asynchronously. In other terms, polling this type will return
650    /// the new event as soon as they are synced. See [`Client::observe_events`]
651    /// to learn more.
652    #[derive(Debug)]
653    pub struct EventHandlerSubscriber<T> {
654        // The `Subscriber` associated to the `SharedObservable` inside
655        // `ObservableEventHandle`.
656        //
657        // Keep in mind all this API is just a thin glue layer between
658        // `EventHandle` and `SharedObservable`, that's… maagiic!
659        #[pin]
660        subscriber: Subscriber<Option<T>>,
661
662        // A weak non-owning reference to the event handler guard from
663        // `ObservableEventHandler`. When this type is polled (via its `Stream`
664        // implementation), it is possible to detect whether the observable has
665        // been dropped by upgrading this weak reference, and close the `Stream`
666        // if it needs to.
667        event_handler_guard: Weak<EventHandlerDropGuard>,
668    }
669}
670
671impl<T> EventHandlerSubscriber<T> {
672    fn new(
673        subscriber: Subscriber<Option<T>>,
674        event_handler_handle: Weak<EventHandlerDropGuard>,
675    ) -> Self {
676        Self { subscriber, event_handler_guard: event_handler_handle }
677    }
678}
679
680impl<T> Stream for EventHandlerSubscriber<T>
681where
682    T: Clone,
683{
684    type Item = T;
685
686    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
687        let mut this = self.project();
688
689        let Some(_) = this.event_handler_guard.upgrade() else {
690            // The `EventHandlerHandle` has been dropped via `EventHandlerDropGuard`. It
691            // means the `ObservableEventHandler` has been dropped. It's time to
692            // close this stream.
693            return Poll::Ready(None);
694        };
695
696        // First off, the subscriber is of type `Subscriber<Option<T>>` because the
697        // `SharedObservable` starts with a `None` value to indicate it has no yet
698        // received any update. We want the `Stream` to return `T`, not `Option<T>`. We
699        // then filter out all `None` value.
700        //
701        // Second, when a `None` value is met, we want to poll again (hence the `loop`).
702        // At best, there is a new value to return. At worst, the subscriber will return
703        // `Poll::Pending` and will register the wakers accordingly.
704
705        loop {
706            match this.subscriber.as_mut().poll_next(context) {
707                // Stream has been closed somehow.
708                Poll::Ready(None) => return Poll::Ready(None),
709
710                // The initial value (of the `SharedObservable` behind `self.subscriber`) has been
711                // polled. We want to filter it out.
712                Poll::Ready(Some(None)) => {
713                    // Loop over.
714                    continue;
715                }
716
717                // We have a new value!
718                Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
719
720                // Classical pending.
721                Poll::Pending => return Poll::Pending,
722            }
723        }
724    }
725}
726
727#[cfg(test)]
728mod tests {
729    use matrix_sdk_test::{
730        DEFAULT_TEST_ROOM_ID, InvitedRoomBuilder, JoinedRoomBuilder, async_test,
731        event_factory::{EventFactory, PreviousMembership},
732    };
733    use serde::Serialize;
734    use stream_assert::{assert_closed, assert_pending, assert_ready};
735    #[cfg(target_family = "wasm")]
736    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
737    use std::{
738        future,
739        sync::{
740            Arc,
741            atomic::{AtomicU8, Ordering::SeqCst},
742        },
743    };
744
745    use assert_matches2::assert_let;
746    use matrix_sdk_common::{deserialized_responses::EncryptionInfo, locks::Mutex};
747    use matrix_sdk_test::{StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder};
748    use once_cell::sync::Lazy;
749    use ruma::{
750        event_id,
751        events::{
752            AnySyncStateEvent, AnySyncTimelineEvent, AnyToDeviceEvent,
753            macros::EventContent,
754            room::{
755                member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
756                name::OriginalSyncRoomNameEvent,
757                power_levels::OriginalSyncRoomPowerLevelsEvent,
758            },
759            secret_storage::key::SecretStorageKeyEvent,
760            typing::SyncTypingEvent,
761        },
762        room_id,
763        serde::Raw,
764        user_id,
765    };
766    use serde_json::json;
767
768    use crate::{
769        Client, Room,
770        event_handler::Ctx,
771        test_utils::{logged_in_client, no_retry_test_client},
772    };
773
774    static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
775        EventFactory::new()
776            .member(user_id!("@example:localhost"))
777            .membership(MembershipState::Join)
778            .display_name("example")
779            .event_id(event_id!("$151800140517rfvjc:localhost"))
780            .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
781            .into()
782    });
783
784    #[async_test]
785    async fn test_add_event_handler() -> crate::Result<()> {
786        let client = logged_in_client(None).await;
787
788        let member_count = Arc::new(AtomicU8::new(0));
789        let typing_count = Arc::new(AtomicU8::new(0));
790        let power_levels_count = Arc::new(AtomicU8::new(0));
791        let invited_member_count = Arc::new(AtomicU8::new(0));
792
793        client.add_event_handler({
794            let member_count = member_count.clone();
795            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
796                member_count.fetch_add(1, SeqCst);
797            }
798        });
799        client.add_event_handler({
800            let typing_count = typing_count.clone();
801            move |_ev: SyncTypingEvent| async move {
802                typing_count.fetch_add(1, SeqCst);
803            }
804        });
805        client.add_event_handler({
806            let power_levels_count = power_levels_count.clone();
807            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
808                power_levels_count.fetch_add(1, SeqCst);
809            }
810        });
811        client.add_event_handler({
812            let invited_member_count = invited_member_count.clone();
813            move |_ev: StrippedRoomMemberEvent| async move {
814                invited_member_count.fetch_add(1, SeqCst);
815            }
816        });
817
818        let f = EventFactory::new();
819        let response = SyncResponseBuilder::default()
820            .add_joined_room(
821                JoinedRoomBuilder::default()
822                    .add_timeline_event(MEMBER_EVENT.clone())
823                    .add_typing(
824                        f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
825                    )
826                    .add_state_event(StateTestEvent::PowerLevels),
827            )
828            .add_invited_room(
829                InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
830                    StrippedStateTestEvent::Custom(json!({
831                        "content": {
832                            "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
833                            "displayname": "Alice",
834                            "membership": "invite",
835                        },
836                        "event_id": "$143273582443PhrSn:example.org",
837                        "origin_server_ts": 1432735824653u64,
838                        "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
839                        "sender": "@example:example.org",
840                        "state_key": "@alice:example.org",
841                        "type": "m.room.member",
842                        "unsigned": {
843                            "age": 1234,
844                            "invite_room_state": [
845                                {
846                                    "content": {
847                                        "name": "Example Room"
848                                    },
849                                    "sender": "@bob:example.org",
850                                    "state_key": "",
851                                    "type": "m.room.name"
852                                },
853                                {
854                                    "content": {
855                                        "join_rule": "invite"
856                                    },
857                                    "sender": "@bob:example.org",
858                                    "state_key": "",
859                                    "type": "m.room.join_rules"
860                                }
861                            ]
862                        }
863                    })),
864                ),
865            )
866            .build_sync_response();
867        client.process_sync(response).await?;
868
869        assert_eq!(member_count.load(SeqCst), 1);
870        assert_eq!(typing_count.load(SeqCst), 1);
871        assert_eq!(power_levels_count.load(SeqCst), 1);
872        assert_eq!(invited_member_count.load(SeqCst), 1);
873
874        Ok(())
875    }
876
877    #[async_test]
878    async fn test_add_to_device_event_handler() -> crate::Result<()> {
879        let client = logged_in_client(None).await;
880
881        let captured_event: Arc<Mutex<Option<AnyToDeviceEvent>>> = Arc::new(Mutex::new(None));
882        let captured_info: Arc<Mutex<Option<EncryptionInfo>>> = Arc::new(Mutex::new(None));
883
884        client.add_event_handler({
885            let captured = captured_event.clone();
886            let captured_info = captured_info.clone();
887            move |ev: AnyToDeviceEvent, encryption_info: Option<EncryptionInfo>| {
888                let mut captured_lock = captured.lock();
889                *captured_lock = Some(ev);
890                let mut captured_info_lock = captured_info.lock();
891                *captured_info_lock = encryption_info;
892                future::ready(())
893            }
894        });
895
896        let response = SyncResponseBuilder::default()
897            .add_to_device_event(json!({
898              "sender": "@alice:example.com",
899              "type": "m.custom.to.device.type",
900              "content": {
901                "a": "test",
902              }
903            }))
904            .build_sync_response();
905        client.process_sync(response).await?;
906
907        let captured = captured_event.lock().clone();
908        assert_let!(Some(received_event) = captured);
909        assert_eq!(received_event.event_type().to_string(), "m.custom.to.device.type");
910        let info = captured_info.lock().clone();
911        assert!(info.is_none());
912        Ok(())
913    }
914
915    #[async_test]
916    async fn test_add_room_event_handler() -> crate::Result<()> {
917        let client = logged_in_client(None).await;
918
919        let room_id_a = room_id!("!foo:example.org");
920        let room_id_b = room_id!("!bar:matrix.org");
921
922        let member_count = Arc::new(AtomicU8::new(0));
923        let power_levels_count = Arc::new(AtomicU8::new(0));
924
925        // Room event handlers for member events in both rooms
926        client.add_room_event_handler(room_id_a, {
927            let member_count = member_count.clone();
928            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
929                member_count.fetch_add(1, SeqCst);
930                future::ready(())
931            }
932        });
933        client.add_room_event_handler(room_id_b, {
934            let member_count = member_count.clone();
935            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
936                member_count.fetch_add(1, SeqCst);
937                future::ready(())
938            }
939        });
940
941        // Power levels event handlers for member events in room A
942        client.add_room_event_handler(room_id_a, {
943            let power_levels_count = power_levels_count.clone();
944            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
945                power_levels_count.fetch_add(1, SeqCst);
946                future::ready(())
947            }
948        });
949
950        // Room name event handler for room name events in room B
951        client.add_room_event_handler(
952            room_id_b,
953            // lint is buggy: rustc wants the explicit conversion from ! to () here, but clippy
954            // thinks it's useless.
955            #[allow(clippy::unused_unit)]
956            async move |_ev: OriginalSyncRoomNameEvent| -> () {
957                unreachable!("No room event in room B")
958            },
959        );
960
961        let response = SyncResponseBuilder::default()
962            .add_joined_room(
963                JoinedRoomBuilder::new(room_id_a)
964                    .add_timeline_event(MEMBER_EVENT.clone())
965                    .add_state_event(StateTestEvent::PowerLevels)
966                    .add_state_event(StateTestEvent::RoomName),
967            )
968            .add_joined_room(
969                JoinedRoomBuilder::new(room_id_b)
970                    .add_timeline_event(MEMBER_EVENT.clone())
971                    .add_state_event(StateTestEvent::PowerLevels),
972            )
973            .build_sync_response();
974        client.process_sync(response).await?;
975
976        assert_eq!(member_count.load(SeqCst), 2);
977        assert_eq!(power_levels_count.load(SeqCst), 1);
978
979        Ok(())
980    }
981
982    #[async_test]
983    async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
984        let client = logged_in_client(None).await;
985
986        client.add_event_handler(
987            |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
988        );
989
990        // If it compiles, it works. No need to assert anything.
991
992        Ok(())
993    }
994
995    #[async_test]
996    async fn test_remove_event_handler() -> crate::Result<()> {
997        let client = logged_in_client(None).await;
998
999        let member_count = Arc::new(AtomicU8::new(0));
1000
1001        client.add_event_handler({
1002            let member_count = member_count.clone();
1003            move |_ev: OriginalSyncRoomMemberEvent| async move {
1004                member_count.fetch_add(1, SeqCst);
1005            }
1006        });
1007
1008        let handle_a = client.add_event_handler(
1009            // lint is buggy: rustc wants the explicit conversion from ! to () here, but clippy
1010            // thinks it's useless.
1011            #[allow(clippy::unused_unit)]
1012            async move |_ev: OriginalSyncRoomMemberEvent| -> () {
1013                panic!("handler should have been removed");
1014            },
1015        );
1016        let handle_b = client.add_room_event_handler(
1017            #[allow(unknown_lints, clippy::explicit_auto_deref)] // lint is buggy
1018            *DEFAULT_TEST_ROOM_ID,
1019            // lint is buggy: rustc wants the explicit conversion from ! to () here, but clippy
1020            // thinks it's useless.
1021            #[allow(clippy::unused_unit)]
1022            async move |_ev: OriginalSyncRoomMemberEvent| -> () {
1023                panic!("handler should have been removed");
1024            },
1025        );
1026
1027        client.add_event_handler({
1028            let member_count = member_count.clone();
1029            move |_ev: OriginalSyncRoomMemberEvent| async move {
1030                member_count.fetch_add(1, SeqCst);
1031            }
1032        });
1033
1034        let response = SyncResponseBuilder::default()
1035            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1036            .build_sync_response();
1037
1038        client.remove_event_handler(handle_a);
1039        client.remove_event_handler(handle_b);
1040
1041        client.process_sync(response).await?;
1042
1043        assert_eq!(member_count.load(SeqCst), 2);
1044
1045        Ok(())
1046    }
1047
1048    #[async_test]
1049    async fn test_event_handler_drop_guard() {
1050        let client = no_retry_test_client(None).await;
1051
1052        let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
1053        assert_eq!(client.inner.event_handlers.len(), 1);
1054
1055        {
1056            let _guard = client.event_handler_drop_guard(handle);
1057            assert_eq!(client.inner.event_handlers.len(), 1);
1058            // guard dropped here
1059        }
1060
1061        assert_eq!(client.inner.event_handlers.len(), 0);
1062    }
1063
1064    #[async_test]
1065    async fn test_use_client_in_handler() {
1066        // This used to not work because we were requiring `Send` of event
1067        // handler futures even on WASM, where practically all futures that do
1068        // I/O aren't.
1069        let client = no_retry_test_client(None).await;
1070
1071        client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
1072            // All of Client's async methods that do network requests (and
1073            // possibly some that don't) are `!Send` on wasm. We obviously want
1074            // to be able to use them in event handlers.
1075            let _caps = client.get_capabilities().await.map_err(|e| anyhow::anyhow!("{}", e))?;
1076            anyhow::Ok(())
1077        });
1078    }
1079
1080    #[async_test]
1081    async fn test_raw_event_handler() -> crate::Result<()> {
1082        let client = logged_in_client(None).await;
1083        let counter = Arc::new(AtomicU8::new(0));
1084        client.add_event_handler_context(counter.clone());
1085        client.add_event_handler(
1086            |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
1087                counter.fetch_add(1, SeqCst);
1088            },
1089        );
1090
1091        let response = SyncResponseBuilder::default()
1092            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1093            .build_sync_response();
1094        client.process_sync(response).await?;
1095
1096        assert_eq!(counter.load(SeqCst), 1);
1097        Ok(())
1098    }
1099
1100    #[async_test]
1101    async fn test_enum_event_handler() -> crate::Result<()> {
1102        let client = logged_in_client(None).await;
1103        let counter = Arc::new(AtomicU8::new(0));
1104        client.add_event_handler_context(counter.clone());
1105        client.add_event_handler(
1106            |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
1107                counter.fetch_add(1, SeqCst);
1108            },
1109        );
1110
1111        let response = SyncResponseBuilder::default()
1112            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1113            .build_sync_response();
1114        client.process_sync(response).await?;
1115
1116        assert_eq!(counter.load(SeqCst), 1);
1117        Ok(())
1118    }
1119
1120    #[async_test]
1121    async fn test_observe_events() -> crate::Result<()> {
1122        let client = logged_in_client(None).await;
1123
1124        let room_id_0 = room_id!("!r0.matrix.org");
1125        let room_id_1 = room_id!("!r1.matrix.org");
1126
1127        let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1128
1129        let mut subscriber = observable.subscribe();
1130
1131        assert_pending!(subscriber);
1132
1133        let mut response_builder = SyncResponseBuilder::new();
1134        let response = response_builder
1135            .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1136                StateTestEvent::Custom(json!({
1137                    "content": {
1138                        "name": "Name 0"
1139                    },
1140                    "event_id": "$ev0",
1141                    "origin_server_ts": 1,
1142                    "sender": "@mnt_io:matrix.org",
1143                    "state_key": "",
1144                    "type": "m.room.name",
1145                    "unsigned": {
1146                        "age": 1,
1147                    }
1148                })),
1149            ))
1150            .build_sync_response();
1151        client.process_sync(response).await?;
1152
1153        let (room_name, room) = assert_ready!(subscriber);
1154
1155        assert_eq!(room_name.event_id.as_str(), "$ev0");
1156        assert_eq!(room.room_id(), room_id_0);
1157        assert_eq!(room.name().unwrap(), "Name 0");
1158
1159        assert_pending!(subscriber);
1160
1161        let response = response_builder
1162            .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1163                StateTestEvent::Custom(json!({
1164                    "content": {
1165                        "name": "Name 1"
1166                    },
1167                    "event_id": "$ev1",
1168                    "origin_server_ts": 2,
1169                    "sender": "@mnt_io:matrix.org",
1170                    "state_key": "",
1171                    "type": "m.room.name",
1172                    "unsigned": {
1173                        "age": 2,
1174                    }
1175                })),
1176            ))
1177            .build_sync_response();
1178        client.process_sync(response).await?;
1179
1180        let (room_name, room) = assert_ready!(subscriber);
1181
1182        assert_eq!(room_name.event_id.as_str(), "$ev1");
1183        assert_eq!(room.room_id(), room_id_1);
1184        assert_eq!(room.name().unwrap(), "Name 1");
1185
1186        assert_pending!(subscriber);
1187
1188        drop(observable);
1189        assert_closed!(subscriber);
1190
1191        Ok(())
1192    }
1193
1194    #[async_test]
1195    async fn test_observe_room_events() -> crate::Result<()> {
1196        let client = logged_in_client(None).await;
1197
1198        let room_id = room_id!("!r0.matrix.org");
1199
1200        let observable_for_room =
1201            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1202
1203        let mut subscriber_for_room = observable_for_room.subscribe();
1204
1205        assert_pending!(subscriber_for_room);
1206
1207        let mut response_builder = SyncResponseBuilder::new();
1208        let response = response_builder
1209            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1210                StateTestEvent::Custom(json!({
1211                    "content": {
1212                        "name": "Name 0"
1213                    },
1214                    "event_id": "$ev0",
1215                    "origin_server_ts": 1,
1216                    "sender": "@mnt_io:matrix.org",
1217                    "state_key": "",
1218                    "type": "m.room.name",
1219                    "unsigned": {
1220                        "age": 1,
1221                    }
1222                })),
1223            ))
1224            .build_sync_response();
1225        client.process_sync(response).await?;
1226
1227        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1228
1229        assert_eq!(room_name.event_id.as_str(), "$ev0");
1230        assert_eq!(room.name().unwrap(), "Name 0");
1231
1232        assert_pending!(subscriber_for_room);
1233
1234        let response = response_builder
1235            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1236                StateTestEvent::Custom(json!({
1237                    "content": {
1238                        "name": "Name 1"
1239                    },
1240                    "event_id": "$ev1",
1241                    "origin_server_ts": 2,
1242                    "sender": "@mnt_io:matrix.org",
1243                    "state_key": "",
1244                    "type": "m.room.name",
1245                    "unsigned": {
1246                        "age": 2,
1247                    }
1248                })),
1249            ))
1250            .build_sync_response();
1251        client.process_sync(response).await?;
1252
1253        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1254
1255        assert_eq!(room_name.event_id.as_str(), "$ev1");
1256        assert_eq!(room.name().unwrap(), "Name 1");
1257
1258        assert_pending!(subscriber_for_room);
1259
1260        drop(observable_for_room);
1261        assert_closed!(subscriber_for_room);
1262
1263        Ok(())
1264    }
1265
1266    #[async_test]
1267    async fn test_observe_several_room_events() -> crate::Result<()> {
1268        let client = logged_in_client(None).await;
1269
1270        let room_id = room_id!("!r0.matrix.org");
1271
1272        let observable_for_room =
1273            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1274
1275        let mut subscriber_for_room = observable_for_room.subscribe();
1276
1277        assert_pending!(subscriber_for_room);
1278
1279        let mut response_builder = SyncResponseBuilder::new();
1280        let response = response_builder
1281            .add_joined_room(
1282                JoinedRoomBuilder::new(room_id)
1283                    .add_state_event(StateTestEvent::Custom(json!({
1284                        "content": {
1285                            "name": "Name 0"
1286                        },
1287                        "event_id": "$ev0",
1288                        "origin_server_ts": 1,
1289                        "sender": "@mnt_io:matrix.org",
1290                        "state_key": "",
1291                        "type": "m.room.name",
1292                        "unsigned": {
1293                            "age": 1,
1294                        }
1295                    })))
1296                    .add_state_event(StateTestEvent::Custom(json!({
1297                        "content": {
1298                            "name": "Name 1"
1299                        },
1300                        "event_id": "$ev1",
1301                        "origin_server_ts": 2,
1302                        "sender": "@mnt_io:matrix.org",
1303                        "state_key": "",
1304                        "type": "m.room.name",
1305                        "unsigned": {
1306                            "age": 1,
1307                        }
1308                    })))
1309                    .add_state_event(StateTestEvent::Custom(json!({
1310                        "content": {
1311                            "name": "Name 2"
1312                        },
1313                        "event_id": "$ev2",
1314                        "origin_server_ts": 3,
1315                        "sender": "@mnt_io:matrix.org",
1316                        "state_key": "",
1317                        "type": "m.room.name",
1318                        "unsigned": {
1319                            "age": 1,
1320                        }
1321                    }))),
1322            )
1323            .build_sync_response();
1324        client.process_sync(response).await?;
1325
1326        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1327
1328        // Check we only get notified about the latest received event
1329        assert_eq!(room_name.event_id.as_str(), "$ev2");
1330        assert_eq!(room.name().unwrap(), "Name 2");
1331
1332        assert_pending!(subscriber_for_room);
1333
1334        drop(observable_for_room);
1335        assert_closed!(subscriber_for_room);
1336
1337        Ok(())
1338    }
1339
1340    #[async_test]
1341    async fn test_observe_events_with_type_prefix() -> crate::Result<()> {
1342        let client = logged_in_client(None).await;
1343
1344        let observable = client.observe_events::<SecretStorageKeyEvent, ()>();
1345
1346        let mut subscriber = observable.subscribe();
1347
1348        assert_pending!(subscriber);
1349
1350        let mut response_builder = SyncResponseBuilder::new();
1351        let response = response_builder
1352            .add_custom_global_account_data(json!({
1353                "content": {
1354                    "algorithm": "m.secret_storage.v1.aes-hmac-sha2",
1355                    "iv": "gH2iNpiETFhApvW6/FFEJQ",
1356                    "mac": "9Lw12m5SKDipNghdQXKjgpfdj1/K7HFI2brO+UWAGoM",
1357                    "passphrase": {
1358                        "algorithm": "m.pbkdf2",
1359                        "salt": "IuLnH7S85YtZmkkBJKwNUKxWF42g9O1H",
1360                        "iterations": 10,
1361                    },
1362                },
1363                "type": "m.secret_storage.key.foobar",
1364            }))
1365            .build_sync_response();
1366        client.process_sync(response).await?;
1367
1368        let (secret_storage_key, ()) = assert_ready!(subscriber);
1369
1370        assert_eq!(secret_storage_key.content.key_id, "foobar");
1371
1372        assert_pending!(subscriber);
1373
1374        drop(observable);
1375        assert_closed!(subscriber);
1376
1377        Ok(())
1378    }
1379
1380    #[async_test]
1381    async fn test_observe_room_events_with_type_prefix() -> crate::Result<()> {
1382        // To create an event handler for a room account data event type with prefix, we
1383        // need to create a custom event type, none exist in the Matrix specification
1384        // yet.
1385        #[derive(Debug, Clone, EventContent, Serialize)]
1386        #[ruma_event(type = "fake.event.*", kind = RoomAccountData)]
1387        struct AccountDataWithPrefixEventContent {
1388            #[ruma_event(type_fragment)]
1389            #[serde(skip)]
1390            key_id: String,
1391        }
1392
1393        let room_id = room_id!("!r0.matrix.org");
1394        let client = logged_in_client(None).await;
1395
1396        let observable = client.observe_room_events::<AccountDataWithPrefixEvent, Room>(room_id);
1397
1398        let mut subscriber = observable.subscribe();
1399
1400        assert_pending!(subscriber);
1401
1402        let mut response_builder = SyncResponseBuilder::new();
1403        let response = response_builder
1404            .add_joined_room(
1405                JoinedRoomBuilder::new(room_id).add_account_data_bulk([Raw::new(&json!({
1406                    "content": {},
1407                    "type": "fake.event.foobar",
1408                }))
1409                .unwrap()
1410                .cast_unchecked()]),
1411            )
1412            .build_sync_response();
1413        client.process_sync(response).await?;
1414
1415        let (secret_storage_key, _room) = assert_ready!(subscriber);
1416
1417        assert_eq!(secret_storage_key.content.key_id, "foobar");
1418
1419        assert_pending!(subscriber);
1420
1421        drop(observable);
1422        assert_closed!(subscriber);
1423
1424        Ok(())
1425    }
1426}