matrix_sdk/event_handler/
mod.rs

1// Copyright 2021 Jonas Platte
2// Copyright 2022 Famedly GmbH
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Types and traits related for event handlers. For usage, see
17//! [`Client::add_event_handler`].
18//!
19//! ### How it works
20//!
21//! The `add_event_handler` method registers event handlers of different
22//! signatures by actually storing boxed closures that all have the same
23//! signature of `async (EventHandlerData) -> ()` where `EventHandlerData` is a
24//! private type that contains all of the data an event handler *might* need.
25//!
26//! The stored closure takes care of deserializing the event which the
27//! `EventHandlerData` contains as a (borrowed) [`serde_json::value::RawValue`],
28//! extracting the context arguments from other fields of `EventHandlerData` and
29//! calling / `.await`ing the event handler if the previous steps succeeded.
30//! It also logs any errors from the above chain of function calls.
31//!
32//! For more details, see the [`EventHandler`] trait.
33
34#[cfg(any(feature = "anyhow", feature = "eyre"))]
35use std::any::TypeId;
36use std::{
37    borrow::Cow,
38    fmt,
39    future::Future,
40    pin::Pin,
41    sync::{
42        atomic::{AtomicU64, Ordering::SeqCst},
43        Arc, RwLock, Weak,
44    },
45    task::{Context, Poll},
46};
47
48#[cfg(target_family = "wasm")]
49use anymap2::any::CloneAny;
50#[cfg(not(target_family = "wasm"))]
51use anymap2::any::CloneAnySendSync;
52use eyeball::{SharedObservable, Subscriber};
53use futures_core::Stream;
54use futures_util::stream::{FuturesUnordered, StreamExt};
55use matrix_sdk_base::{
56    deserialized_responses::{EncryptionInfo, TimelineEvent},
57    sync::State,
58    SendOutsideWasm, SyncOutsideWasm,
59};
60use matrix_sdk_common::deserialized_responses::ProcessedToDeviceEvent;
61use pin_project_lite::pin_project;
62use ruma::{events::BooleanType, push::Action, serde::Raw, OwnedRoomId};
63use serde::{de::DeserializeOwned, Deserialize};
64use serde_json::value::RawValue as RawJsonValue;
65use tracing::{debug, error, field::debug, instrument, warn};
66
67use self::maps::EventHandlerMaps;
68use crate::{Client, Room};
69
70mod context;
71mod maps;
72mod static_events;
73
74pub use self::context::{Ctx, EventHandlerContext, RawEvent};
75
76#[cfg(not(target_family = "wasm"))]
77type EventHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
78#[cfg(target_family = "wasm")]
79type EventHandlerFut = Pin<Box<dyn Future<Output = ()>>>;
80
81#[cfg(not(target_family = "wasm"))]
82type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut + Send + Sync;
83#[cfg(target_family = "wasm")]
84type EventHandlerFn = dyn Fn(EventHandlerData<'_>) -> EventHandlerFut;
85
86#[cfg(not(target_family = "wasm"))]
87type AnyMap = anymap2::Map<dyn CloneAnySendSync + Send + Sync>;
88#[cfg(target_family = "wasm")]
89type AnyMap = anymap2::Map<dyn CloneAny>;
90
91#[derive(Default)]
92pub(crate) struct EventHandlerStore {
93    handlers: RwLock<EventHandlerMaps>,
94    context: RwLock<AnyMap>,
95    counter: AtomicU64,
96}
97
98impl EventHandlerStore {
99    pub fn add_handler(&self, handle: EventHandlerHandle, handler_fn: Box<EventHandlerFn>) {
100        self.handlers.write().unwrap().add(handle, handler_fn);
101    }
102
103    pub fn add_context<T>(&self, ctx: T)
104    where
105        T: Clone + Send + Sync + 'static,
106    {
107        self.context.write().unwrap().insert(ctx);
108    }
109
110    pub fn remove(&self, handle: EventHandlerHandle) {
111        self.handlers.write().unwrap().remove(handle);
112    }
113
114    #[cfg(test)]
115    fn len(&self) -> usize {
116        self.handlers.read().unwrap().len()
117    }
118}
119
120#[doc(hidden)]
121#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
122pub enum HandlerKind {
123    GlobalAccountData,
124    RoomAccountData,
125    EphemeralRoomData,
126    Timeline,
127    MessageLike,
128    OriginalMessageLike,
129    RedactedMessageLike,
130    State,
131    OriginalState,
132    RedactedState,
133    StrippedState,
134    ToDevice,
135    Presence,
136}
137
138impl HandlerKind {
139    fn message_like_redacted(redacted: bool) -> Self {
140        if redacted {
141            Self::RedactedMessageLike
142        } else {
143            Self::OriginalMessageLike
144        }
145    }
146
147    fn state_redacted(redacted: bool) -> Self {
148        if redacted {
149            Self::RedactedState
150        } else {
151            Self::OriginalState
152        }
153    }
154}
155
156/// A statically-known event kind/type that can be retrieved from an event sync.
157pub trait SyncEvent {
158    #[doc(hidden)]
159    const KIND: HandlerKind;
160    #[doc(hidden)]
161    const TYPE: Option<&'static str>;
162    #[doc(hidden)]
163    type IsPrefix: BooleanType;
164}
165
166pub(crate) struct EventHandlerWrapper {
167    handler_fn: Box<EventHandlerFn>,
168    pub handler_id: u64,
169}
170
171/// Handle to remove a registered event handler by passing it to
172/// [`Client::remove_event_handler`].
173#[derive(Clone, Debug)]
174pub struct EventHandlerHandle {
175    pub(crate) ev_kind: HandlerKind,
176    pub(crate) ev_type: Option<StaticEventTypePart>,
177    pub(crate) room_id: Option<OwnedRoomId>,
178    pub(crate) handler_id: u64,
179}
180
181/// The static part of an event type.
182#[derive(Clone, Copy, Debug)]
183pub(crate) enum StaticEventTypePart {
184    /// The full event type is static.
185    Full(&'static str),
186    /// Only the prefix of the event type is static.
187    Prefix(&'static str),
188}
189
190/// Interface for event handlers.
191///
192/// This trait is an abstraction for a certain kind of functions / closures,
193/// specifically:
194///
195/// * They must have at least one argument, which is the event itself, a type
196///   that implements [`SyncEvent`]. Any additional arguments need to implement
197///   the [`EventHandlerContext`] trait.
198/// * Their return type has to be one of: `()`, `Result<(), impl Display + Debug
199///   + 'static>` (if you are using `anyhow::Result` or `eyre::Result` you can
200///   additionally enable the `anyhow` / `eyre` feature to get the verbose
201///   `Debug` output printed on error)
202///
203/// ### How it works
204///
205/// This trait is basically a very constrained version of `Fn`: It requires at
206/// least one argument, which is represented as its own generic parameter `Ev`
207/// with the remaining parameter types being represented by the second generic
208/// parameter `Ctx`; they have to be stuffed into one generic parameter as a
209/// tuple because Rust doesn't have variadic generics.
210///
211/// `Ev` and `Ctx` are generic parameters rather than associated types because
212/// the argument list is a generic parameter for the `Fn` traits too, so a
213/// single type could implement `Fn` multiple times with different argument
214/// lists¹. Luckily, when calling [`Client::add_event_handler`] with a
215/// closure argument the trait solver takes into account that only a single one
216/// of the implementations applies (even though this could theoretically change
217/// through a dependency upgrade) and uses that rather than raising an ambiguity
218/// error. This is the same trick used by web frameworks like actix-web and
219/// axum.
220///
221/// ¹ the only thing stopping such types from existing in stable Rust is that
222/// all manual implementations of the `Fn` traits require a Nightly feature
223pub trait EventHandler<Ev, Ctx>: Clone + SendOutsideWasm + SyncOutsideWasm + 'static {
224    /// The future returned by `handle_event`.
225    #[doc(hidden)]
226    type Future: EventHandlerFuture;
227
228    /// Create a future for handling the given event.
229    ///
230    /// `data` provides additional data about the event, for example the room it
231    /// appeared in.
232    ///
233    /// Returns `None` if one of the context extractors failed.
234    #[doc(hidden)]
235    fn handle_event(self, ev: Ev, data: EventHandlerData<'_>) -> Option<Self::Future>;
236}
237
238#[doc(hidden)]
239pub trait EventHandlerFuture:
240    Future<Output = <Self as EventHandlerFuture>::Output> + SendOutsideWasm + 'static
241{
242    type Output: EventHandlerResult;
243}
244
245impl<T> EventHandlerFuture for T
246where
247    T: Future + SendOutsideWasm + 'static,
248    <T as Future>::Output: EventHandlerResult,
249{
250    type Output = <T as Future>::Output;
251}
252
253#[doc(hidden)]
254#[derive(Debug)]
255pub struct EventHandlerData<'a> {
256    client: Client,
257    room: Option<Room>,
258    raw: &'a RawJsonValue,
259    encryption_info: Option<&'a EncryptionInfo>,
260    push_actions: &'a [Action],
261    handle: EventHandlerHandle,
262}
263
264/// Return types supported for event handlers implement this trait.
265///
266/// It is not meant to be implemented outside of matrix-sdk.
267pub trait EventHandlerResult: Sized {
268    #[doc(hidden)]
269    fn print_error(&self, event_type: Option<&str>);
270}
271
272impl EventHandlerResult for () {
273    fn print_error(&self, _event_type: Option<&str>) {}
274}
275
276impl<E: fmt::Debug + fmt::Display + 'static> EventHandlerResult for Result<(), E> {
277    fn print_error(&self, event_type: Option<&str>) {
278        let msg_fragment = match event_type {
279            Some(event_type) => format!(" for `{event_type}`"),
280            None => "".to_owned(),
281        };
282
283        match self {
284            #[cfg(feature = "anyhow")]
285            Err(e) if TypeId::of::<E>() == TypeId::of::<anyhow::Error>() => {
286                error!("Event handler{msg_fragment} failed: {e:?}");
287            }
288            #[cfg(feature = "eyre")]
289            Err(e) if TypeId::of::<E>() == TypeId::of::<eyre::Report>() => {
290                error!("Event handler{msg_fragment} failed: {e:?}");
291            }
292            Err(e) => {
293                error!("Event handler{msg_fragment} failed: {e}");
294            }
295            Ok(_) => {}
296        }
297    }
298}
299
300#[derive(Deserialize)]
301struct UnsignedDetails {
302    redacted_because: Option<serde::de::IgnoredAny>,
303}
304
305/// Event handling internals.
306impl Client {
307    pub(crate) fn add_event_handler_impl<Ev, Ctx, H>(
308        &self,
309        handler: H,
310        room_id: Option<OwnedRoomId>,
311    ) -> EventHandlerHandle
312    where
313        Ev: SyncEvent + DeserializeOwned + SendOutsideWasm + 'static,
314        H: EventHandler<Ev, Ctx>,
315    {
316        let handler_fn: Box<EventHandlerFn> = Box::new(move |data| {
317            let maybe_fut = serde_json::from_str(data.raw.get())
318                .map(|ev| handler.clone().handle_event(ev, data));
319
320            Box::pin(async move {
321                match maybe_fut {
322                    Ok(Some(fut)) => {
323                        fut.await.print_error(Ev::TYPE);
324                    }
325                    Ok(None) => {
326                        error!(
327                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
328                            "Event handler has an invalid context argument",
329                        );
330                    }
331                    Err(e) => {
332                        warn!(
333                            event_type = Ev::TYPE, event_kind = ?Ev::KIND,
334                            "Failed to deserialize event, skipping event handler.\n
335                             Deserialization error: {e}",
336                        );
337                    }
338                }
339            })
340        });
341
342        let handler_id = self.inner.event_handlers.counter.fetch_add(1, SeqCst);
343        let ev_type = Ev::TYPE.map(|ev_type| {
344            if Ev::IsPrefix::as_bool() {
345                StaticEventTypePart::Prefix(ev_type)
346            } else {
347                StaticEventTypePart::Full(ev_type)
348            }
349        });
350        let handle = EventHandlerHandle { ev_kind: Ev::KIND, ev_type, room_id, handler_id };
351
352        self.inner.event_handlers.add_handler(handle.clone(), handler_fn);
353
354        handle
355    }
356
357    pub(crate) async fn handle_sync_events<T>(
358        &self,
359        kind: HandlerKind,
360        room: Option<&Room>,
361        events: &[Raw<T>],
362    ) -> serde_json::Result<()> {
363        #[derive(Deserialize)]
364        struct ExtractType<'a> {
365            #[serde(borrow, rename = "type")]
366            event_type: Cow<'a, str>,
367        }
368
369        for raw_event in events {
370            let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
371            self.call_event_handlers(room, raw_event.json(), kind, &event_type, None, &[]).await;
372        }
373
374        Ok(())
375    }
376
377    pub(crate) async fn handle_sync_to_device_events(
378        &self,
379        events: &[ProcessedToDeviceEvent],
380    ) -> serde_json::Result<()> {
381        #[derive(Deserialize)]
382        struct ExtractType<'a> {
383            #[serde(borrow, rename = "type")]
384            event_type: Cow<'a, str>,
385        }
386
387        for processed_to_device in events {
388            let (raw_event, encryption_info) = match processed_to_device {
389                ProcessedToDeviceEvent::Decrypted { raw, encryption_info } => {
390                    (raw, Some(encryption_info))
391                }
392                other => (&other.to_raw(), None),
393            };
394            let event_type = raw_event.deserialize_as_unchecked::<ExtractType<'_>>()?.event_type;
395            self.call_event_handlers(
396                None,
397                raw_event.json(),
398                HandlerKind::ToDevice,
399                &event_type,
400                encryption_info,
401                &[],
402            )
403            .await;
404        }
405
406        Ok(())
407    }
408
409    pub(crate) async fn handle_sync_state_events(
410        &self,
411        room: Option<&Room>,
412        state: &State,
413    ) -> serde_json::Result<()> {
414        #[derive(Deserialize)]
415        struct StateEventDetails<'a> {
416            #[serde(borrow, rename = "type")]
417            event_type: Cow<'a, str>,
418            unsigned: Option<UnsignedDetails>,
419        }
420
421        let state_events = match state {
422            State::Before(events) => events,
423            State::After(events) => events,
424        };
425
426        // Event handlers for possibly-redacted state events
427        self.handle_sync_events(HandlerKind::State, room, state_events).await?;
428
429        // Event handlers specifically for redacted OR unredacted state events
430        for raw_event in state_events {
431            let StateEventDetails { event_type, unsigned } =
432                raw_event.deserialize_as_unchecked()?;
433            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
434            let handler_kind = HandlerKind::state_redacted(redacted);
435
436            self.call_event_handlers(room, raw_event.json(), handler_kind, &event_type, None, &[])
437                .await;
438        }
439
440        Ok(())
441    }
442
443    pub(crate) async fn handle_sync_timeline_events(
444        &self,
445        room: Option<&Room>,
446        timeline_events: &[TimelineEvent],
447    ) -> serde_json::Result<()> {
448        #[derive(Deserialize)]
449        struct TimelineEventDetails<'a> {
450            #[serde(borrow, rename = "type")]
451            event_type: Cow<'a, str>,
452            state_key: Option<serde::de::IgnoredAny>,
453            unsigned: Option<UnsignedDetails>,
454        }
455
456        for item in timeline_events {
457            let TimelineEventDetails { event_type, state_key, unsigned } =
458                item.raw().deserialize_as_unchecked()?;
459
460            let redacted = unsigned.and_then(|u| u.redacted_because).is_some();
461            let (handler_kind_g, handler_kind_r) = match state_key {
462                Some(_) => (HandlerKind::State, HandlerKind::state_redacted(redacted)),
463                None => (HandlerKind::MessageLike, HandlerKind::message_like_redacted(redacted)),
464            };
465
466            let raw_event = item.raw().json();
467            let encryption_info = item.encryption_info().map(|i| &**i);
468            let push_actions = item.push_actions().unwrap_or(&[]);
469
470            // Event handlers for possibly-redacted timeline events
471            self.call_event_handlers(
472                room,
473                raw_event,
474                handler_kind_g,
475                &event_type,
476                encryption_info,
477                push_actions,
478            )
479            .await;
480
481            // Event handlers specifically for redacted OR unredacted timeline events
482            self.call_event_handlers(
483                room,
484                raw_event,
485                handler_kind_r,
486                &event_type,
487                encryption_info,
488                push_actions,
489            )
490            .await;
491
492            // Event handlers for `AnySyncTimelineEvent`
493            let kind = HandlerKind::Timeline;
494            self.call_event_handlers(
495                room,
496                raw_event,
497                kind,
498                &event_type,
499                encryption_info,
500                push_actions,
501            )
502            .await;
503        }
504
505        Ok(())
506    }
507
508    #[instrument(skip_all, fields(?event_kind, ?event_type, room_id))]
509    async fn call_event_handlers(
510        &self,
511        room: Option<&Room>,
512        raw: &RawJsonValue,
513        event_kind: HandlerKind,
514        event_type: &str,
515        encryption_info: Option<&EncryptionInfo>,
516        push_actions: &[Action],
517    ) {
518        let room_id = room.map(|r| r.room_id());
519        if let Some(room_id) = room_id {
520            tracing::Span::current().record("room_id", debug(room_id));
521        }
522
523        // Construct event handler futures
524        let mut futures: FuturesUnordered<_> = self
525            .inner
526            .event_handlers
527            .handlers
528            .read()
529            .unwrap()
530            .get_handlers(event_kind, event_type, room_id)
531            .map(|(handle, handler_fn)| {
532                let data = EventHandlerData {
533                    client: self.clone(),
534                    room: room.cloned(),
535                    raw,
536                    encryption_info,
537                    push_actions,
538                    handle,
539                };
540
541                (handler_fn)(data)
542            })
543            .collect();
544
545        if !futures.is_empty() {
546            debug!(amount = futures.len(), "Calling event handlers");
547
548            // Run the event handler futures with the `self.event_handlers.handlers`
549            // lock no longer being held.
550            while let Some(()) = futures.next().await {}
551        }
552    }
553}
554
555/// A guard type that removes an event handler when it drops (goes out of
556/// scope).
557///
558/// Created with [`Client::event_handler_drop_guard`].
559#[derive(Debug)]
560pub struct EventHandlerDropGuard {
561    handle: EventHandlerHandle,
562    client: Client,
563}
564
565impl EventHandlerDropGuard {
566    pub(crate) fn new(handle: EventHandlerHandle, client: Client) -> Self {
567        Self { handle, client }
568    }
569}
570
571impl Drop for EventHandlerDropGuard {
572    fn drop(&mut self) {
573        self.client.remove_event_handler(self.handle.clone());
574    }
575}
576
577macro_rules! impl_event_handler {
578    ($($ty:ident),* $(,)?) => {
579        impl<Ev, Fun, Fut, $($ty),*> EventHandler<Ev, ($($ty,)*)> for Fun
580        where
581            Ev: SyncEvent,
582            Fun: FnOnce(Ev, $($ty),*) -> Fut + Clone + SendOutsideWasm + SyncOutsideWasm + 'static,
583            Fut: EventHandlerFuture,
584            $($ty: EventHandlerContext),*
585        {
586            type Future = Fut;
587
588            fn handle_event(self, ev: Ev, _d: EventHandlerData<'_>) -> Option<Self::Future> {
589                Some((self)(ev, $($ty::from_data(&_d)?),*))
590            }
591        }
592    };
593}
594
595impl_event_handler!();
596impl_event_handler!(A);
597impl_event_handler!(A, B);
598impl_event_handler!(A, B, C);
599impl_event_handler!(A, B, C, D);
600impl_event_handler!(A, B, C, D, E);
601impl_event_handler!(A, B, C, D, E, F);
602impl_event_handler!(A, B, C, D, E, F, G);
603impl_event_handler!(A, B, C, D, E, F, G, H);
604
605/// An observer of events (may be tailored to a room).
606///
607/// Only the most recent value can be observed. Subscribers are notified when a
608/// new value is sent, but there is no guarantee that they will see all values.
609///
610/// To create such observer, use [`Client::observe_events`] or
611/// [`Client::observe_room_events`].
612#[derive(Debug)]
613pub struct ObservableEventHandler<T> {
614    /// This type is actually nothing more than a thin glue layer between the
615    /// [`EventHandler`] mechanism and the reactive programming types from
616    /// [`eyeball`]. Here, we use a [`SharedObservable`] that is updated by the
617    /// [`EventHandler`].
618    shared_observable: SharedObservable<Option<T>>,
619
620    /// This type owns the [`EventHandlerDropGuard`]. As soon as this type goes
621    /// out of scope, the event handler is unregistered/removed.
622    ///
623    /// [`EventHandlerSubscriber`] holds a weak, non-owning reference, to this
624    /// guard. It is useful to detect when to close the [`Stream`]: as soon as
625    /// this type goes out of scope, the subscriber will close itself on poll.
626    event_handler_guard: Arc<EventHandlerDropGuard>,
627}
628
629impl<T> ObservableEventHandler<T> {
630    pub(crate) fn new(
631        shared_observable: SharedObservable<Option<T>>,
632        event_handler_guard: EventHandlerDropGuard,
633    ) -> Self {
634        Self { shared_observable, event_handler_guard: Arc::new(event_handler_guard) }
635    }
636
637    /// Subscribe to this observer.
638    ///
639    /// It returns an [`EventHandlerSubscriber`], which implements [`Stream`].
640    /// See its documentation to learn more.
641    pub fn subscribe(&self) -> EventHandlerSubscriber<T> {
642        EventHandlerSubscriber::new(
643            self.shared_observable.subscribe(),
644            // The subscriber holds a weak non-owning reference to the event handler guard, so that
645            // it can detect when this observer is dropped, and can close the subscriber's stream.
646            Arc::downgrade(&self.event_handler_guard),
647        )
648    }
649}
650
651pin_project! {
652    /// The subscriber of an [`ObservableEventHandler`].
653    ///
654    /// To create such subscriber, use [`ObservableEventHandler::subscribe`].
655    ///
656    /// This type implements [`Stream`], which means it is possible to poll the
657    /// next value asynchronously. In other terms, polling this type will return
658    /// the new event as soon as they are synced. See [`Client::observe_events`]
659    /// to learn more.
660    #[derive(Debug)]
661    pub struct EventHandlerSubscriber<T> {
662        // The `Subscriber` associated to the `SharedObservable` inside
663        // `ObservableEventHandle`.
664        //
665        // Keep in mind all this API is just a thin glue layer between
666        // `EventHandle` and `SharedObservable`, that's… maagiic!
667        #[pin]
668        subscriber: Subscriber<Option<T>>,
669
670        // A weak non-owning reference to the event handler guard from
671        // `ObservableEventHandler`. When this type is polled (via its `Stream`
672        // implementation), it is possible to detect whether the observable has
673        // been dropped by upgrading this weak reference, and close the `Stream`
674        // if it needs to.
675        event_handler_guard: Weak<EventHandlerDropGuard>,
676    }
677}
678
679impl<T> EventHandlerSubscriber<T> {
680    fn new(
681        subscriber: Subscriber<Option<T>>,
682        event_handler_handle: Weak<EventHandlerDropGuard>,
683    ) -> Self {
684        Self { subscriber, event_handler_guard: event_handler_handle }
685    }
686}
687
688impl<T> Stream for EventHandlerSubscriber<T>
689where
690    T: Clone,
691{
692    type Item = T;
693
694    fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
695        let mut this = self.project();
696
697        let Some(_) = this.event_handler_guard.upgrade() else {
698            // The `EventHandlerHandle` has been dropped via `EventHandlerDropGuard`. It
699            // means the `ObservableEventHandler` has been dropped. It's time to
700            // close this stream.
701            return Poll::Ready(None);
702        };
703
704        // First off, the subscriber is of type `Subscriber<Option<T>>` because the
705        // `SharedObservable` starts with a `None` value to indicate it has no yet
706        // received any update. We want the `Stream` to return `T`, not `Option<T>`. We
707        // then filter out all `None` value.
708        //
709        // Second, when a `None` value is met, we want to poll again (hence the `loop`).
710        // At best, there is a new value to return. At worst, the subscriber will return
711        // `Poll::Pending` and will register the wakers accordingly.
712
713        loop {
714            match this.subscriber.as_mut().poll_next(context) {
715                // Stream has been closed somehow.
716                Poll::Ready(None) => return Poll::Ready(None),
717
718                // The initial value (of the `SharedObservable` behind `self.subscriber`) has been
719                // polled. We want to filter it out.
720                Poll::Ready(Some(None)) => {
721                    // Loop over.
722                    continue;
723                }
724
725                // We have a new value!
726                Poll::Ready(Some(Some(value))) => return Poll::Ready(Some(value)),
727
728                // Classical pending.
729                Poll::Pending => return Poll::Pending,
730            }
731        }
732    }
733}
734
735#[cfg(test)]
736mod tests {
737    use matrix_sdk_test::{
738        async_test,
739        event_factory::{EventFactory, PreviousMembership},
740        InvitedRoomBuilder, JoinedRoomBuilder, DEFAULT_TEST_ROOM_ID,
741    };
742    use serde::Serialize;
743    use stream_assert::{assert_closed, assert_pending, assert_ready};
744    #[cfg(target_family = "wasm")]
745    wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
746    use std::{
747        future,
748        sync::{
749            atomic::{AtomicU8, Ordering::SeqCst},
750            Arc,
751        },
752    };
753
754    use assert_matches2::assert_let;
755    use matrix_sdk_common::{deserialized_responses::EncryptionInfo, locks::Mutex};
756    use matrix_sdk_test::{StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder};
757    use once_cell::sync::Lazy;
758    use ruma::{
759        event_id,
760        events::{
761            macros::EventContent,
762            room::{
763                member::{MembershipState, OriginalSyncRoomMemberEvent, StrippedRoomMemberEvent},
764                name::OriginalSyncRoomNameEvent,
765                power_levels::OriginalSyncRoomPowerLevelsEvent,
766            },
767            secret_storage::key::SecretStorageKeyEvent,
768            typing::SyncTypingEvent,
769            AnySyncStateEvent, AnySyncTimelineEvent, AnyToDeviceEvent,
770        },
771        room_id,
772        serde::Raw,
773        user_id,
774    };
775    use serde_json::json;
776
777    use crate::{
778        event_handler::Ctx,
779        test_utils::{logged_in_client, no_retry_test_client},
780        Client, Room,
781    };
782
783    static MEMBER_EVENT: Lazy<Raw<AnySyncTimelineEvent>> = Lazy::new(|| {
784        EventFactory::new()
785            .member(user_id!("@example:localhost"))
786            .membership(MembershipState::Join)
787            .display_name("example")
788            .event_id(event_id!("$151800140517rfvjc:localhost"))
789            .previous(PreviousMembership::new(MembershipState::Invite).display_name("example"))
790            .into()
791    });
792
793    #[async_test]
794    async fn test_add_event_handler() -> crate::Result<()> {
795        let client = logged_in_client(None).await;
796
797        let member_count = Arc::new(AtomicU8::new(0));
798        let typing_count = Arc::new(AtomicU8::new(0));
799        let power_levels_count = Arc::new(AtomicU8::new(0));
800        let invited_member_count = Arc::new(AtomicU8::new(0));
801
802        client.add_event_handler({
803            let member_count = member_count.clone();
804            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| async move {
805                member_count.fetch_add(1, SeqCst);
806            }
807        });
808        client.add_event_handler({
809            let typing_count = typing_count.clone();
810            move |_ev: SyncTypingEvent| async move {
811                typing_count.fetch_add(1, SeqCst);
812            }
813        });
814        client.add_event_handler({
815            let power_levels_count = power_levels_count.clone();
816            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| async move {
817                power_levels_count.fetch_add(1, SeqCst);
818            }
819        });
820        client.add_event_handler({
821            let invited_member_count = invited_member_count.clone();
822            move |_ev: StrippedRoomMemberEvent| async move {
823                invited_member_count.fetch_add(1, SeqCst);
824            }
825        });
826
827        let f = EventFactory::new();
828        let response = SyncResponseBuilder::default()
829            .add_joined_room(
830                JoinedRoomBuilder::default()
831                    .add_timeline_event(MEMBER_EVENT.clone())
832                    .add_typing(
833                        f.typing(vec![user_id!("@alice:matrix.org"), user_id!("@bob:example.com")]),
834                    )
835                    .add_state_event(StateTestEvent::PowerLevels),
836            )
837            .add_invited_room(
838                InvitedRoomBuilder::new(room_id!("!test_invited:example.org")).add_state_event(
839                    StrippedStateTestEvent::Custom(json!({
840                        "content": {
841                            "avatar_url": "mxc://example.org/SEsfnsuifSDFSSEF",
842                            "displayname": "Alice",
843                            "membership": "invite",
844                        },
845                        "event_id": "$143273582443PhrSn:example.org",
846                        "origin_server_ts": 1432735824653u64,
847                        "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
848                        "sender": "@example:example.org",
849                        "state_key": "@alice:example.org",
850                        "type": "m.room.member",
851                        "unsigned": {
852                            "age": 1234,
853                            "invite_room_state": [
854                                {
855                                    "content": {
856                                        "name": "Example Room"
857                                    },
858                                    "sender": "@bob:example.org",
859                                    "state_key": "",
860                                    "type": "m.room.name"
861                                },
862                                {
863                                    "content": {
864                                        "join_rule": "invite"
865                                    },
866                                    "sender": "@bob:example.org",
867                                    "state_key": "",
868                                    "type": "m.room.join_rules"
869                                }
870                            ]
871                        }
872                    })),
873                ),
874            )
875            .build_sync_response();
876        client.process_sync(response).await?;
877
878        assert_eq!(member_count.load(SeqCst), 1);
879        assert_eq!(typing_count.load(SeqCst), 1);
880        assert_eq!(power_levels_count.load(SeqCst), 1);
881        assert_eq!(invited_member_count.load(SeqCst), 1);
882
883        Ok(())
884    }
885
886    #[async_test]
887    #[allow(dependency_on_unit_never_type_fallback)]
888    async fn test_add_to_device_event_handler() -> crate::Result<()> {
889        let client = logged_in_client(None).await;
890
891        let captured_event: Arc<Mutex<Option<AnyToDeviceEvent>>> = Arc::new(Mutex::new(None));
892        let captured_info: Arc<Mutex<Option<EncryptionInfo>>> = Arc::new(Mutex::new(None));
893
894        client.add_event_handler({
895            let captured = captured_event.clone();
896            let captured_info = captured_info.clone();
897            move |ev: AnyToDeviceEvent, encryption_info: Option<EncryptionInfo>| {
898                let mut captured_lock = captured.lock();
899                *captured_lock = Some(ev);
900                let mut captured_info_lock = captured_info.lock();
901                *captured_info_lock = encryption_info;
902                future::ready(())
903            }
904        });
905
906        let response = SyncResponseBuilder::default()
907            .add_to_device_event(json!({
908              "sender": "@alice:example.com",
909              "type": "m.custom.to.device.type",
910              "content": {
911                "a": "test",
912              }
913            }))
914            .build_sync_response();
915        client.process_sync(response).await?;
916
917        let captured = captured_event.lock().clone();
918        assert_let!(Some(received_event) = captured);
919        assert_eq!(received_event.event_type().to_string(), "m.custom.to.device.type");
920        let info = captured_info.lock().clone();
921        assert!(info.is_none());
922        Ok(())
923    }
924
925    #[async_test]
926    #[allow(dependency_on_unit_never_type_fallback)]
927    async fn test_add_room_event_handler() -> crate::Result<()> {
928        let client = logged_in_client(None).await;
929
930        let room_id_a = room_id!("!foo:example.org");
931        let room_id_b = room_id!("!bar:matrix.org");
932
933        let member_count = Arc::new(AtomicU8::new(0));
934        let power_levels_count = Arc::new(AtomicU8::new(0));
935
936        // Room event handlers for member events in both rooms
937        client.add_room_event_handler(room_id_a, {
938            let member_count = member_count.clone();
939            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
940                member_count.fetch_add(1, SeqCst);
941                future::ready(())
942            }
943        });
944        client.add_room_event_handler(room_id_b, {
945            let member_count = member_count.clone();
946            move |_ev: OriginalSyncRoomMemberEvent, _room: Room| {
947                member_count.fetch_add(1, SeqCst);
948                future::ready(())
949            }
950        });
951
952        // Power levels event handlers for member events in room A
953        client.add_room_event_handler(room_id_a, {
954            let power_levels_count = power_levels_count.clone();
955            move |_ev: OriginalSyncRoomPowerLevelsEvent, _client: Client, _room: Room| {
956                power_levels_count.fetch_add(1, SeqCst);
957                future::ready(())
958            }
959        });
960
961        // Room name event handler for room name events in room B
962        client.add_room_event_handler(room_id_b, move |_ev: OriginalSyncRoomNameEvent| async {
963            unreachable!("No room event in room B")
964        });
965
966        let response = SyncResponseBuilder::default()
967            .add_joined_room(
968                JoinedRoomBuilder::new(room_id_a)
969                    .add_timeline_event(MEMBER_EVENT.clone())
970                    .add_state_event(StateTestEvent::PowerLevels)
971                    .add_state_event(StateTestEvent::RoomName),
972            )
973            .add_joined_room(
974                JoinedRoomBuilder::new(room_id_b)
975                    .add_timeline_event(MEMBER_EVENT.clone())
976                    .add_state_event(StateTestEvent::PowerLevels),
977            )
978            .build_sync_response();
979        client.process_sync(response).await?;
980
981        assert_eq!(member_count.load(SeqCst), 2);
982        assert_eq!(power_levels_count.load(SeqCst), 1);
983
984        Ok(())
985    }
986
987    #[async_test]
988    #[allow(dependency_on_unit_never_type_fallback)]
989    async fn test_add_event_handler_with_tuples() -> crate::Result<()> {
990        let client = logged_in_client(None).await;
991
992        client.add_event_handler(
993            |_ev: OriginalSyncRoomMemberEvent, (_room, _client): (Room, Client)| future::ready(()),
994        );
995
996        // If it compiles, it works. No need to assert anything.
997
998        Ok(())
999    }
1000
1001    #[async_test]
1002    #[allow(dependency_on_unit_never_type_fallback)]
1003    async fn test_remove_event_handler() -> crate::Result<()> {
1004        let client = logged_in_client(None).await;
1005
1006        let member_count = Arc::new(AtomicU8::new(0));
1007
1008        client.add_event_handler({
1009            let member_count = member_count.clone();
1010            move |_ev: OriginalSyncRoomMemberEvent| async move {
1011                member_count.fetch_add(1, SeqCst);
1012            }
1013        });
1014
1015        let handle_a = client.add_event_handler(move |_ev: OriginalSyncRoomMemberEvent| async {
1016            panic!("handler should have been removed");
1017        });
1018        let handle_b = client.add_room_event_handler(
1019            #[allow(unknown_lints, clippy::explicit_auto_deref)] // lint is buggy
1020            *DEFAULT_TEST_ROOM_ID,
1021            move |_ev: OriginalSyncRoomMemberEvent| async {
1022                panic!("handler should have been removed");
1023            },
1024        );
1025
1026        client.add_event_handler({
1027            let member_count = member_count.clone();
1028            move |_ev: OriginalSyncRoomMemberEvent| async move {
1029                member_count.fetch_add(1, SeqCst);
1030            }
1031        });
1032
1033        let response = SyncResponseBuilder::default()
1034            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1035            .build_sync_response();
1036
1037        client.remove_event_handler(handle_a);
1038        client.remove_event_handler(handle_b);
1039
1040        client.process_sync(response).await?;
1041
1042        assert_eq!(member_count.load(SeqCst), 2);
1043
1044        Ok(())
1045    }
1046
1047    #[async_test]
1048    async fn test_event_handler_drop_guard() {
1049        let client = no_retry_test_client(None).await;
1050
1051        let handle = client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent| async {});
1052        assert_eq!(client.inner.event_handlers.len(), 1);
1053
1054        {
1055            let _guard = client.event_handler_drop_guard(handle);
1056            assert_eq!(client.inner.event_handlers.len(), 1);
1057            // guard dropped here
1058        }
1059
1060        assert_eq!(client.inner.event_handlers.len(), 0);
1061    }
1062
1063    #[async_test]
1064    async fn test_use_client_in_handler() {
1065        // This used to not work because we were requiring `Send` of event
1066        // handler futures even on WASM, where practically all futures that do
1067        // I/O aren't.
1068        let client = no_retry_test_client(None).await;
1069
1070        client.add_event_handler(|_ev: OriginalSyncRoomMemberEvent, client: Client| async move {
1071            // All of Client's async methods that do network requests (and
1072            // possibly some that don't) are `!Send` on wasm. We obviously want
1073            // to be able to use them in event handlers.
1074            let _caps = client.get_capabilities().await.map_err(|e| anyhow::anyhow!("{}", e))?;
1075            anyhow::Ok(())
1076        });
1077    }
1078
1079    #[async_test]
1080    async fn test_raw_event_handler() -> crate::Result<()> {
1081        let client = logged_in_client(None).await;
1082        let counter = Arc::new(AtomicU8::new(0));
1083        client.add_event_handler_context(counter.clone());
1084        client.add_event_handler(
1085            |_ev: Raw<OriginalSyncRoomMemberEvent>, counter: Ctx<Arc<AtomicU8>>| async move {
1086                counter.fetch_add(1, SeqCst);
1087            },
1088        );
1089
1090        let response = SyncResponseBuilder::default()
1091            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1092            .build_sync_response();
1093        client.process_sync(response).await?;
1094
1095        assert_eq!(counter.load(SeqCst), 1);
1096        Ok(())
1097    }
1098
1099    #[async_test]
1100    async fn test_enum_event_handler() -> crate::Result<()> {
1101        let client = logged_in_client(None).await;
1102        let counter = Arc::new(AtomicU8::new(0));
1103        client.add_event_handler_context(counter.clone());
1104        client.add_event_handler(
1105            |_ev: AnySyncStateEvent, counter: Ctx<Arc<AtomicU8>>| async move {
1106                counter.fetch_add(1, SeqCst);
1107            },
1108        );
1109
1110        let response = SyncResponseBuilder::default()
1111            .add_joined_room(JoinedRoomBuilder::default().add_timeline_event(MEMBER_EVENT.clone()))
1112            .build_sync_response();
1113        client.process_sync(response).await?;
1114
1115        assert_eq!(counter.load(SeqCst), 1);
1116        Ok(())
1117    }
1118
1119    #[async_test]
1120    #[allow(dependency_on_unit_never_type_fallback)]
1121    async fn test_observe_events() -> crate::Result<()> {
1122        let client = logged_in_client(None).await;
1123
1124        let room_id_0 = room_id!("!r0.matrix.org");
1125        let room_id_1 = room_id!("!r1.matrix.org");
1126
1127        let observable = client.observe_events::<OriginalSyncRoomNameEvent, Room>();
1128
1129        let mut subscriber = observable.subscribe();
1130
1131        assert_pending!(subscriber);
1132
1133        let mut response_builder = SyncResponseBuilder::new();
1134        let response = response_builder
1135            .add_joined_room(JoinedRoomBuilder::new(room_id_0).add_state_event(
1136                StateTestEvent::Custom(json!({
1137                    "content": {
1138                        "name": "Name 0"
1139                    },
1140                    "event_id": "$ev0",
1141                    "origin_server_ts": 1,
1142                    "sender": "@mnt_io:matrix.org",
1143                    "state_key": "",
1144                    "type": "m.room.name",
1145                    "unsigned": {
1146                        "age": 1,
1147                    }
1148                })),
1149            ))
1150            .build_sync_response();
1151        client.process_sync(response).await?;
1152
1153        let (room_name, room) = assert_ready!(subscriber);
1154
1155        assert_eq!(room_name.event_id.as_str(), "$ev0");
1156        assert_eq!(room.room_id(), room_id_0);
1157        assert_eq!(room.name().unwrap(), "Name 0");
1158
1159        assert_pending!(subscriber);
1160
1161        let response = response_builder
1162            .add_joined_room(JoinedRoomBuilder::new(room_id_1).add_state_event(
1163                StateTestEvent::Custom(json!({
1164                    "content": {
1165                        "name": "Name 1"
1166                    },
1167                    "event_id": "$ev1",
1168                    "origin_server_ts": 2,
1169                    "sender": "@mnt_io:matrix.org",
1170                    "state_key": "",
1171                    "type": "m.room.name",
1172                    "unsigned": {
1173                        "age": 2,
1174                    }
1175                })),
1176            ))
1177            .build_sync_response();
1178        client.process_sync(response).await?;
1179
1180        let (room_name, room) = assert_ready!(subscriber);
1181
1182        assert_eq!(room_name.event_id.as_str(), "$ev1");
1183        assert_eq!(room.room_id(), room_id_1);
1184        assert_eq!(room.name().unwrap(), "Name 1");
1185
1186        assert_pending!(subscriber);
1187
1188        drop(observable);
1189        assert_closed!(subscriber);
1190
1191        Ok(())
1192    }
1193
1194    #[async_test]
1195    #[allow(dependency_on_unit_never_type_fallback)]
1196    async fn test_observe_room_events() -> crate::Result<()> {
1197        let client = logged_in_client(None).await;
1198
1199        let room_id = room_id!("!r0.matrix.org");
1200
1201        let observable_for_room =
1202            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1203
1204        let mut subscriber_for_room = observable_for_room.subscribe();
1205
1206        assert_pending!(subscriber_for_room);
1207
1208        let mut response_builder = SyncResponseBuilder::new();
1209        let response = response_builder
1210            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1211                StateTestEvent::Custom(json!({
1212                    "content": {
1213                        "name": "Name 0"
1214                    },
1215                    "event_id": "$ev0",
1216                    "origin_server_ts": 1,
1217                    "sender": "@mnt_io:matrix.org",
1218                    "state_key": "",
1219                    "type": "m.room.name",
1220                    "unsigned": {
1221                        "age": 1,
1222                    }
1223                })),
1224            ))
1225            .build_sync_response();
1226        client.process_sync(response).await?;
1227
1228        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1229
1230        assert_eq!(room_name.event_id.as_str(), "$ev0");
1231        assert_eq!(room.name().unwrap(), "Name 0");
1232
1233        assert_pending!(subscriber_for_room);
1234
1235        let response = response_builder
1236            .add_joined_room(JoinedRoomBuilder::new(room_id).add_state_event(
1237                StateTestEvent::Custom(json!({
1238                    "content": {
1239                        "name": "Name 1"
1240                    },
1241                    "event_id": "$ev1",
1242                    "origin_server_ts": 2,
1243                    "sender": "@mnt_io:matrix.org",
1244                    "state_key": "",
1245                    "type": "m.room.name",
1246                    "unsigned": {
1247                        "age": 2,
1248                    }
1249                })),
1250            ))
1251            .build_sync_response();
1252        client.process_sync(response).await?;
1253
1254        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1255
1256        assert_eq!(room_name.event_id.as_str(), "$ev1");
1257        assert_eq!(room.name().unwrap(), "Name 1");
1258
1259        assert_pending!(subscriber_for_room);
1260
1261        drop(observable_for_room);
1262        assert_closed!(subscriber_for_room);
1263
1264        Ok(())
1265    }
1266
1267    #[async_test]
1268    async fn test_observe_several_room_events() -> crate::Result<()> {
1269        let client = logged_in_client(None).await;
1270
1271        let room_id = room_id!("!r0.matrix.org");
1272
1273        let observable_for_room =
1274            client.observe_room_events::<OriginalSyncRoomNameEvent, (Room, Client)>(room_id);
1275
1276        let mut subscriber_for_room = observable_for_room.subscribe();
1277
1278        assert_pending!(subscriber_for_room);
1279
1280        let mut response_builder = SyncResponseBuilder::new();
1281        let response = response_builder
1282            .add_joined_room(
1283                JoinedRoomBuilder::new(room_id)
1284                    .add_state_event(StateTestEvent::Custom(json!({
1285                        "content": {
1286                            "name": "Name 0"
1287                        },
1288                        "event_id": "$ev0",
1289                        "origin_server_ts": 1,
1290                        "sender": "@mnt_io:matrix.org",
1291                        "state_key": "",
1292                        "type": "m.room.name",
1293                        "unsigned": {
1294                            "age": 1,
1295                        }
1296                    })))
1297                    .add_state_event(StateTestEvent::Custom(json!({
1298                        "content": {
1299                            "name": "Name 1"
1300                        },
1301                        "event_id": "$ev1",
1302                        "origin_server_ts": 2,
1303                        "sender": "@mnt_io:matrix.org",
1304                        "state_key": "",
1305                        "type": "m.room.name",
1306                        "unsigned": {
1307                            "age": 1,
1308                        }
1309                    })))
1310                    .add_state_event(StateTestEvent::Custom(json!({
1311                        "content": {
1312                            "name": "Name 2"
1313                        },
1314                        "event_id": "$ev2",
1315                        "origin_server_ts": 3,
1316                        "sender": "@mnt_io:matrix.org",
1317                        "state_key": "",
1318                        "type": "m.room.name",
1319                        "unsigned": {
1320                            "age": 1,
1321                        }
1322                    }))),
1323            )
1324            .build_sync_response();
1325        client.process_sync(response).await?;
1326
1327        let (room_name, (room, _client)) = assert_ready!(subscriber_for_room);
1328
1329        // Check we only get notified about the latest received event
1330        assert_eq!(room_name.event_id.as_str(), "$ev2");
1331        assert_eq!(room.name().unwrap(), "Name 2");
1332
1333        assert_pending!(subscriber_for_room);
1334
1335        drop(observable_for_room);
1336        assert_closed!(subscriber_for_room);
1337
1338        Ok(())
1339    }
1340
1341    #[async_test]
1342    #[allow(dependency_on_unit_never_type_fallback)]
1343    async fn test_observe_events_with_type_prefix() -> crate::Result<()> {
1344        let client = logged_in_client(None).await;
1345
1346        let observable = client.observe_events::<SecretStorageKeyEvent, ()>();
1347
1348        let mut subscriber = observable.subscribe();
1349
1350        assert_pending!(subscriber);
1351
1352        let mut response_builder = SyncResponseBuilder::new();
1353        let response = response_builder
1354            .add_custom_global_account_data(json!({
1355                "content": {
1356                    "algorithm": "m.secret_storage.v1.aes-hmac-sha2",
1357                    "iv": "gH2iNpiETFhApvW6/FFEJQ",
1358                    "mac": "9Lw12m5SKDipNghdQXKjgpfdj1/K7HFI2brO+UWAGoM",
1359                    "passphrase": {
1360                        "algorithm": "m.pbkdf2",
1361                        "salt": "IuLnH7S85YtZmkkBJKwNUKxWF42g9O1H",
1362                        "iterations": 10,
1363                    },
1364                },
1365                "type": "m.secret_storage.key.foobar",
1366            }))
1367            .build_sync_response();
1368        client.process_sync(response).await?;
1369
1370        let (secret_storage_key, ()) = assert_ready!(subscriber);
1371
1372        assert_eq!(secret_storage_key.content.key_id, "foobar");
1373
1374        assert_pending!(subscriber);
1375
1376        drop(observable);
1377        assert_closed!(subscriber);
1378
1379        Ok(())
1380    }
1381
1382    #[async_test]
1383    #[allow(dependency_on_unit_never_type_fallback)]
1384    async fn test_observe_room_events_with_type_prefix() -> crate::Result<()> {
1385        // To create an event handler for a room account data event type with prefix, we
1386        // need to create a custom event type, none exist in the Matrix specification
1387        // yet.
1388        #[derive(Debug, Clone, EventContent, Serialize)]
1389        #[ruma_event(type = "fake.event.*", kind = RoomAccountData)]
1390        struct AccountDataWithPrefixEventContent {
1391            #[ruma_event(type_fragment)]
1392            #[serde(skip)]
1393            key_id: String,
1394        }
1395
1396        let room_id = room_id!("!r0.matrix.org");
1397        let client = logged_in_client(None).await;
1398
1399        let observable = client.observe_room_events::<AccountDataWithPrefixEvent, Room>(room_id);
1400
1401        let mut subscriber = observable.subscribe();
1402
1403        assert_pending!(subscriber);
1404
1405        let mut response_builder = SyncResponseBuilder::new();
1406        let response = response_builder
1407            .add_joined_room(
1408                JoinedRoomBuilder::new(room_id).add_account_data_bulk([Raw::new(&json!({
1409                    "content": {},
1410                    "type": "fake.event.foobar",
1411                }))
1412                .unwrap()
1413                .cast_unchecked()]),
1414            )
1415            .build_sync_response();
1416        client.process_sync(response).await?;
1417
1418        let (secret_storage_key, _room) = assert_ready!(subscriber);
1419
1420        assert_eq!(secret_storage_key.content.key_id, "foobar");
1421
1422        assert_pending!(subscriber);
1423
1424        drop(observable);
1425        assert_closed!(subscriber);
1426
1427        Ok(())
1428    }
1429}