matrix_sdk/latest_events/
mod.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The Latest Events API provides a lazy, reactive and efficient way to compute
16//! the latest event for a room or a thread.
17//!
18//! The latest event represents the last displayable and relevant event a room
19//! or a thread has been received. It is usually displayed in a _summary_, e.g.
20//! below the room title in a room list.
21//!
22//! The entry point is [`LatestEvents`]. It is preferable to get a reference to
23//! it from [`Client::latest_events`][crate::Client::latest_events], which
24//! already plugs everything to build it. [`LatestEvents`] is using the
25//! [`EventCache`] and the [`SendQueue`] to respectively get known remote events
26//! (i.e. synced from the server), or local events (i.e. ones being sent).
27//!
28//! ## Laziness
29//!
30//! [`LatestEvents`] is lazy, it means that, despite [`LatestEvents`] is
31//! listening to all [`EventCache`] or [`SendQueue`] updates, it will only do
32//! something if one is expected to get the latest event for a particular room
33//! or a particular thread. Concretely, it means that until
34//! [`LatestEvents::listen_to_room`] is called for a particular room, no latest
35//! event will ever be computed for that room (and similarly with
36//! [`LatestEvents::listen_to_thread`]).
37//!
38//! If one is no longer interested to get the latest event for a particular room
39//! or thread, the [`LatestEvents::forget_room`] and
40//! [`LatestEvents::forget_thread`] methods must be used.
41//!
42//! ## Reactive
43//!
44//! [`LatestEvents`] is designed to be reactive. Using
45//! [`LatestEvents::listen_and_subscribe_to_room`] will provide a
46//! [`Subscriber`], which brings all the tooling to get the current value or the
47//! future values with a stream.
48
49mod error;
50mod latest_event;
51
52use std::{
53    collections::{HashMap, HashSet},
54    ops::{ControlFlow, Not},
55    sync::Arc,
56};
57
58pub use error::LatestEventsError;
59use eyeball::{AsyncLock, Subscriber};
60use futures_util::FutureExt;
61use latest_event::LatestEvent;
62pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
63use matrix_sdk_common::executor::{spawn, AbortOnDrop, JoinHandleExt as _};
64use ruma::{EventId, OwnedEventId, OwnedRoomId, RoomId};
65use tokio::{
66    select,
67    sync::{broadcast, mpsc, RwLock, RwLockReadGuard, RwLockWriteGuard},
68};
69use tracing::{error, warn};
70
71use crate::{
72    client::WeakClient,
73    event_cache::{EventCache, EventCacheError, RoomEventCache, RoomEventCacheGenericUpdate},
74    room::WeakRoom,
75    send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
76};
77
78/// The entry point to fetch the [`LatestEventValue`] for rooms or threads.
79#[derive(Clone, Debug)]
80pub struct LatestEvents {
81    state: Arc<LatestEventsState>,
82}
83
84/// The state of [`LatestEvents`].
85#[derive(Debug)]
86struct LatestEventsState {
87    /// All the registered rooms, i.e. rooms the latest events are computed for.
88    registered_rooms: Arc<RegisteredRooms>,
89
90    /// The task handle of the
91    /// [`listen_to_event_cache_and_send_queue_updates_task`].
92    _listen_task_handle: AbortOnDrop<()>,
93
94    /// The task handle of the [`compute_latest_events_task`].
95    _computation_task_handle: AbortOnDrop<()>,
96}
97
98impl LatestEvents {
99    /// Create a new [`LatestEvents`].
100    pub(crate) fn new(
101        weak_client: WeakClient,
102        event_cache: EventCache,
103        send_queue: SendQueue,
104    ) -> Self {
105        let (room_registration_sender, room_registration_receiver) = mpsc::channel(32);
106        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
107
108        let registered_rooms =
109            Arc::new(RegisteredRooms::new(room_registration_sender, weak_client, &event_cache));
110
111        // The task listening to the event cache and the send queue updates.
112        let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task(
113            registered_rooms.clone(),
114            room_registration_receiver,
115            event_cache,
116            send_queue,
117            latest_event_queue_sender,
118        ))
119        .abort_on_drop();
120
121        // The task computing the new latest events.
122        let computation_task_handle = spawn(compute_latest_events_task(
123            registered_rooms.clone(),
124            latest_event_queue_receiver,
125        ))
126        .abort_on_drop();
127
128        Self {
129            state: Arc::new(LatestEventsState {
130                registered_rooms,
131                _listen_task_handle: listen_task_handle,
132                _computation_task_handle: computation_task_handle,
133            }),
134        }
135    }
136
137    /// Start listening to updates (if not already) for a particular room.
138    ///
139    /// It returns `true` if the room exists, `false` otherwise.
140    pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
141        Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
142    }
143
144    /// Start listening to updates (if not already) for a particular room, and
145    /// return a [`Subscriber`] to get the current and future
146    /// [`LatestEventValue`]s.
147    ///
148    /// It returns `Some` if the room exists, `None` otherwise.
149    pub async fn listen_and_subscribe_to_room(
150        &self,
151        room_id: &RoomId,
152    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
153        let Some(latest_event) = self.state.registered_rooms.for_room(room_id).await? else {
154            return Ok(None);
155        };
156
157        Ok(Some(latest_event.subscribe().await))
158    }
159
160    /// Start listening to updates (if not already) for a particular room and a
161    /// particular thread in this room.
162    ///
163    /// It returns `true` if the room and the thread exists, `false` otherwise.
164    pub async fn listen_to_thread(
165        &self,
166        room_id: &RoomId,
167        thread_id: &EventId,
168    ) -> Result<bool, LatestEventsError> {
169        Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
170    }
171
172    /// Start listening to updates (if not already) for a particular room and a
173    /// particular thread in this room, and return a [`Subscriber`] to get the
174    /// current and future [`LatestEventValue`]s.
175    ///
176    /// It returns `Some` if the room and the thread exists, `None` otherwise.
177    pub async fn listen_and_subscribe_to_thread(
178        &self,
179        room_id: &RoomId,
180        thread_id: &EventId,
181    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
182        let Some(latest_event) = self.state.registered_rooms.for_thread(room_id, thread_id).await?
183        else {
184            return Ok(None);
185        };
186
187        Ok(Some(latest_event.subscribe().await))
188    }
189
190    /// Forget a room.
191    ///
192    /// It means that [`LatestEvents`] will stop listening to updates for the
193    /// `LatestEvent`s of the room and all its threads.
194    ///
195    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
196    pub async fn forget_room(&self, room_id: &RoomId) {
197        self.state.registered_rooms.forget_room(room_id).await;
198    }
199
200    /// Forget a thread.
201    ///
202    /// It means that [`LatestEvents`] will stop listening to updates for the
203    /// `LatestEvent` of the thread.
204    ///
205    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
206    /// nothing happens.
207    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
208        self.state.registered_rooms.forget_thread(room_id, thread_id).await;
209    }
210}
211
212#[derive(Debug)]
213struct RegisteredRooms {
214    /// All the registered [`RoomLatestEvents`].
215    rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
216
217    /// The sender part of the channel about room registration.
218    ///
219    /// When a room is registered (with [`LatestEvents::listen_to_room`] or
220    /// [`LatestEvents::listen_to_thread`]) or unregistered (with
221    /// [`LatestEvents::forget_room`] or [`LatestEvents::forget_thread`]), a
222    /// room registration message is passed on this channel.
223    ///
224    /// The receiver part of the channel is in the
225    /// [`listen_to_event_cache_and_send_queue_updates_task`].
226    room_registration_sender: mpsc::Sender<RoomRegistration>,
227
228    /// The (weak) client.
229    weak_client: WeakClient,
230
231    /// The event cache.
232    event_cache: EventCache,
233}
234
235impl RegisteredRooms {
236    fn new(
237        room_registration_sender: mpsc::Sender<RoomRegistration>,
238        weak_client: WeakClient,
239        event_cache: &EventCache,
240    ) -> Self {
241        Self {
242            rooms: RwLock::new(HashMap::default()),
243            room_registration_sender,
244            weak_client,
245            event_cache: event_cache.clone(),
246        }
247    }
248
249    /// Get a read lock guard to a [`RoomLatestEvents`] given a room ID and an
250    /// optional thread ID.
251    ///
252    /// The [`RoomLatestEvents`], and the associated [`LatestEvent`], will be
253    /// created if missing. It means that write lock is taken if necessary, but
254    /// it's always downgraded to a read lock at the end.
255    async fn room_latest_event(
256        &self,
257        room_id: &RoomId,
258        thread_id: Option<&EventId>,
259    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
260        Ok(match thread_id {
261            // Get the room latest event with the aim of fetching the latest event for a particular
262            // thread.
263            //
264            // We need to take a write lock immediately, in case the thead latest event doesn't
265            // exist.
266            Some(thread_id) => {
267                let mut rooms = self.rooms.write().await;
268
269                // The `RoomLatestEvents` doesn't exist. Let's create and insert it.
270                if rooms.contains_key(room_id).not() {
271                    // Insert the room if it's been successfully created.
272                    if let Some(room_latest_event) = RoomLatestEvents::new(
273                        WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
274                        &self.event_cache,
275                    )
276                    .await?
277                    {
278                        rooms.insert(room_id.to_owned(), room_latest_event);
279
280                        let _ = self
281                            .room_registration_sender
282                            .send(RoomRegistration::Add(room_id.to_owned()))
283                            .await;
284                    }
285                }
286
287                if let Some(room_latest_event) = rooms.get_mut(room_id) {
288                    // In `RoomLatestEvents`, the `LatestEvent` for this thread doesn't exist. Let's
289                    // create and insert it.
290                    if room_latest_event.per_thread.contains_key(thread_id).not() {
291                        room_latest_event.per_thread.insert(
292                            thread_id.to_owned(),
293                            room_latest_event.create_latest_event_for(Some(thread_id)).await,
294                        );
295                    }
296                }
297
298                RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
299            }
300
301            // Get the room latest event with the aim of fetching the latest event for a particular
302            // room.
303            None => {
304                match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
305                    .ok()
306                {
307                    value @ Some(_) => value,
308                    None => {
309                        let mut rooms = self.rooms.write().await;
310
311                        if rooms.contains_key(room_id).not() {
312                            // Insert the room if it's been successfully created.
313                            if let Some(room_latest_event) = RoomLatestEvents::new(
314                                WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
315                                &self.event_cache,
316                            )
317                            .await?
318                            {
319                                rooms.insert(room_id.to_owned(), room_latest_event);
320
321                                let _ = self
322                                    .room_registration_sender
323                                    .send(RoomRegistration::Add(room_id.to_owned()))
324                                    .await;
325                            }
326                        }
327
328                        RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
329                    }
330                }
331            }
332        })
333    }
334
335    /// Start listening to updates (if not already) for a particular room, and
336    /// fetch the [`LatestEvent`] for this room.
337    ///
338    /// It returns `None` if the room doesn't exist.
339    pub async fn for_room(
340        &self,
341        room_id: &RoomId,
342    ) -> Result<Option<RwLockReadGuard<'_, LatestEvent>>, LatestEventsError> {
343        Ok(self.room_latest_event(room_id, None).await?.map(|lock_guard| {
344            RwLockReadGuard::map(lock_guard, |room_latest_event| room_latest_event.for_room())
345        }))
346    }
347
348    /// Start listening to updates (if not already) for a particular room, and
349    /// fetch the [`LatestEvent`] for a particular thread in this room.
350    ///
351    /// It returns `None` if the room or the thread doesn't exist.
352    pub async fn for_thread(
353        &self,
354        room_id: &RoomId,
355        thread_id: &EventId,
356    ) -> Result<Option<RwLockReadGuard<'_, LatestEvent>>, LatestEventsError> {
357        Ok(self.room_latest_event(room_id, Some(thread_id)).await?.and_then(|lock_guard| {
358            RwLockReadGuard::try_map(lock_guard, |room_latest_event| {
359                room_latest_event.for_thread(thread_id)
360            })
361            .ok()
362        }))
363    }
364
365    /// Forget a room.
366    ///
367    /// It means that [`LatestEvents`] will stop listening to updates for the
368    /// `LatestEvent`s of the room and all its threads.
369    ///
370    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
371    pub async fn forget_room(&self, room_id: &RoomId) {
372        let mut rooms = self.rooms.write().await;
373
374        // Remove the whole `RoomLatestEvents`.
375        rooms.remove(room_id);
376
377        let _ =
378            self.room_registration_sender.send(RoomRegistration::Remove(room_id.to_owned())).await;
379    }
380
381    /// Forget a thread.
382    ///
383    /// It means that [`LatestEvents`] will stop listening to updates for the
384    /// `LatestEvent` of the thread.
385    ///
386    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
387    /// nothing happens.
388    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
389        let mut rooms = self.rooms.write().await;
390
391        // If the `RoomLatestEvents`, remove the `LatestEvent` in `per_thread`.
392        if let Some(room_latest_event) = rooms.get_mut(room_id) {
393            room_latest_event.per_thread.remove(thread_id);
394        }
395    }
396}
397
398/// Represents whether a room has been registered or forgotten.
399///
400/// This is used by [`RegisteredRooms::for_room`],
401/// [`RegisteredRooms::for_thread`], [`RegisteredRooms::forget_room`] and
402/// [`RegisteredRooms::forget_thread`].
403#[derive(Debug)]
404enum RoomRegistration {
405    /// [`LatestEvents`] wants to listen to this room.
406    Add(OwnedRoomId),
407
408    /// [`LatestEvents`] wants to no longer listen to this room.
409    Remove(OwnedRoomId),
410}
411
412/// Represents the kind of updates the [`compute_latest_events_task`] will have
413/// to deal with.
414enum LatestEventQueueUpdate {
415    /// An update from the [`EventCache`] happened.
416    EventCache {
417        /// The ID of the room that has triggered the update.
418        room_id: OwnedRoomId,
419    },
420
421    /// An update from the [`SendQueue`] happened.
422    SendQueue {
423        /// The ID of the room that has triggered the update.
424        room_id: OwnedRoomId,
425
426        /// The update itself.
427        update: RoomSendQueueUpdate,
428    },
429}
430
431/// Type holding the [`LatestEvent`] for a room and for all its threads.
432#[derive(Debug)]
433struct RoomLatestEvents {
434    /// The latest event of the room.
435    for_the_room: LatestEvent,
436
437    /// The latest events for each thread.
438    per_thread: HashMap<OwnedEventId, LatestEvent>,
439
440    /// The room event cache associated to this room.
441    room_event_cache: RoomEventCache,
442
443    /// The (weak) room.
444    ///
445    /// It used to to get the power-levels of the user for this room when
446    /// computing the latest events.
447    weak_room: WeakRoom,
448}
449
450impl RoomLatestEvents {
451    async fn new(
452        weak_room: WeakRoom,
453        event_cache: &EventCache,
454    ) -> Result<Option<Self>, LatestEventsError> {
455        let room_id = weak_room.room_id();
456        let room_event_cache = match event_cache.for_room(room_id).await {
457            // It's fine to drop the `EventCacheDropHandles` here as the caller
458            // (`LatestEventState`) owns a clone of the `EventCache`.
459            Ok((room_event_cache, _drop_handles)) => room_event_cache,
460            Err(EventCacheError::RoomNotFound { .. }) => return Ok(None),
461            Err(err) => return Err(LatestEventsError::EventCache(err)),
462        };
463
464        Ok(Some(Self {
465            for_the_room: Self::create_latest_event_for_inner(&weak_room, None, &room_event_cache)
466                .await,
467            per_thread: HashMap::new(),
468            weak_room,
469            room_event_cache,
470        }))
471    }
472
473    async fn create_latest_event_for(&self, thread_id: Option<&EventId>) -> LatestEvent {
474        Self::create_latest_event_for_inner(&self.weak_room, thread_id, &self.room_event_cache)
475            .await
476    }
477
478    async fn create_latest_event_for_inner(
479        weak_room: &WeakRoom,
480        thread_id: Option<&EventId>,
481        room_event_cache: &RoomEventCache,
482    ) -> LatestEvent {
483        LatestEvent::new(weak_room, thread_id, room_event_cache).await
484    }
485
486    /// Get the [`LatestEvent`] for the room.
487    fn for_room(&self) -> &LatestEvent {
488        &self.for_the_room
489    }
490
491    /// Get the [`LatestEvent`] for the thread if it exists.
492    fn for_thread(&self, thread_id: &EventId) -> Option<&LatestEvent> {
493        self.per_thread.get(thread_id)
494    }
495
496    /// Update the latest events for the room and its threads, based on the
497    /// event cache data.
498    async fn update_with_event_cache(&mut self) {
499        // Get the power levels of the user for the current room if the `WeakRoom` is
500        // still valid.
501        //
502        // Get it once for all the updates of all the latest events for this room (be
503        // the room and its threads).
504        let room = self.weak_room.get();
505        let power_levels = match &room {
506            Some(room) => {
507                let power_levels = room.power_levels().await.ok();
508
509                Some(room.own_user_id()).zip(power_levels)
510            }
511
512            None => None,
513        };
514
515        self.for_the_room.update_with_event_cache(&self.room_event_cache, &power_levels).await;
516
517        for latest_event in self.per_thread.values_mut() {
518            latest_event.update_with_event_cache(&self.room_event_cache, &power_levels).await;
519        }
520    }
521
522    /// Update the latest events for the room and its threads, based on the
523    /// send queue update.
524    async fn update_with_send_queue(&mut self, send_queue_update: &RoomSendQueueUpdate) {
525        // Get the power levels of the user for the current room if the `WeakRoom` is
526        // still valid.
527        //
528        // Get it once for all the updates of all the latest events for this room (be
529        // the room and its threads).
530        let room = self.weak_room.get();
531        let power_levels = match &room {
532            Some(room) => {
533                let power_levels = room.power_levels().await.ok();
534
535                Some(room.own_user_id()).zip(power_levels)
536            }
537
538            None => None,
539        };
540
541        self.for_the_room
542            .update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels)
543            .await;
544
545        for latest_event in self.per_thread.values_mut() {
546            latest_event
547                .update_with_send_queue(send_queue_update, &self.room_event_cache, &power_levels)
548                .await;
549        }
550    }
551}
552
553/// The task responsible to listen to the [`EventCache`] and the [`SendQueue`].
554/// When an update is received and is considered relevant, a message is sent to
555/// the [`compute_latest_events_task`] to compute a new [`LatestEvent`].
556///
557/// This task also listens to [`RoomRegistration`] (see
558/// [`LatestEventsState::room_registration_sender`] for the sender part). It
559/// keeps an internal list of registered rooms, which helps to filter out
560/// updates we aren't interested by.
561///
562/// When an update is considered relevant, a message is sent over the
563/// `latest_event_queue_sender` channel. See [`compute_latest_events_task`].
564async fn listen_to_event_cache_and_send_queue_updates_task(
565    registered_rooms: Arc<RegisteredRooms>,
566    mut room_registration_receiver: mpsc::Receiver<RoomRegistration>,
567    event_cache: EventCache,
568    send_queue: SendQueue,
569    latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
570) {
571    let mut event_cache_generic_updates_subscriber =
572        event_cache.subscribe_to_room_generic_updates();
573    let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
574
575    // Initialise the list of rooms that are listened.
576    //
577    // Technically, we can use `registered_rooms.rooms` every time to get this
578    // information, but it would involve a read-lock. In order to reduce the
579    // pressure on this lock, we use this intermediate structure.
580    let mut listened_rooms =
581        HashSet::from_iter(registered_rooms.rooms.read().await.keys().cloned());
582
583    loop {
584        if listen_to_event_cache_and_send_queue_updates(
585            &mut room_registration_receiver,
586            &mut event_cache_generic_updates_subscriber,
587            &mut send_queue_generic_updates_subscriber,
588            &mut listened_rooms,
589            &latest_event_queue_sender,
590        )
591        .await
592        .is_break()
593        {
594            warn!("`listen_to_event_cache_and_send_queue_updates_task` has stopped");
595
596            break;
597        }
598    }
599}
600
601/// The core of [`listen_to_event_cache_and_send_queue_updates_task`].
602///
603/// Having this function detached from its task is helpful for testing and for
604/// state isolation.
605async fn listen_to_event_cache_and_send_queue_updates(
606    room_registration_receiver: &mut mpsc::Receiver<RoomRegistration>,
607    event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
608    send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
609    listened_rooms: &mut HashSet<OwnedRoomId>,
610    latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
611) -> ControlFlow<()> {
612    // We need a biased select here: `room_registration_receiver` must have the
613    // priority over other futures.
614    select! {
615        biased;
616
617        update = room_registration_receiver.recv().fuse() => {
618            match update {
619                Some(RoomRegistration::Add(room_id)) => {
620                    listened_rooms.insert(room_id);
621                }
622                Some(RoomRegistration::Remove(room_id)) => {
623                    listened_rooms.remove(&room_id);
624                }
625                None => {
626                    error!("`room_registration` channel has been closed");
627
628                    return ControlFlow::Break(());
629                }
630            }
631        }
632
633        room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv().fuse() => {
634            if let Ok(room_event_cache_generic_update) = room_event_cache_generic_update {
635                let room_id = room_event_cache_generic_update.room_id;
636
637                if listened_rooms.contains(&room_id) {
638                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
639                        room_id
640                    });
641                }
642            } else {
643                warn!("`event_cache_generic_updates` channel has been closed");
644
645                return ControlFlow::Break(());
646            }
647        }
648
649        send_queue_generic_update = send_queue_generic_updates_subscriber.recv().fuse() => {
650            if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
651                if listened_rooms.contains(&room_id) {
652                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
653                        room_id,
654                        update
655                    });
656                }
657            } else {
658                warn!("`send_queue_generic_updates` channel has been closed");
659
660                return ControlFlow::Break(());
661            }
662        }
663    }
664
665    ControlFlow::Continue(())
666}
667
668/// The task responsible to compute new [`LatestEvent`] for a particular room or
669/// thread.
670///
671/// The messages are coming from
672/// [`listen_to_event_cache_and_send_queue_updates_task`].
673async fn compute_latest_events_task(
674    registered_rooms: Arc<RegisteredRooms>,
675    mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
676) {
677    const BUFFER_SIZE: usize = 16;
678
679    let mut buffer = Vec::with_capacity(BUFFER_SIZE);
680
681    while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
682        compute_latest_events(&registered_rooms, &buffer).await;
683        buffer.clear();
684    }
685
686    warn!("`compute_latest_events_task` has stopped");
687}
688
689async fn compute_latest_events(
690    registered_rooms: &RegisteredRooms,
691    latest_event_queue_updates: &[LatestEventQueueUpdate],
692) {
693    for latest_event_queue_update in latest_event_queue_updates {
694        match latest_event_queue_update {
695            LatestEventQueueUpdate::EventCache { room_id } => {
696                let mut rooms = registered_rooms.rooms.write().await;
697
698                if let Some(room_latest_events) = rooms.get_mut(room_id) {
699                    room_latest_events.update_with_event_cache().await;
700                } else {
701                    error!(?room_id, "Failed to find the room");
702
703                    continue;
704                }
705            }
706
707            LatestEventQueueUpdate::SendQueue { room_id, update } => {
708                let mut rooms = registered_rooms.rooms.write().await;
709
710                if let Some(room_latest_events) = rooms.get_mut(room_id) {
711                    room_latest_events.update_with_send_queue(update).await;
712                } else {
713                    error!(?room_id, "Failed to find the room");
714
715                    continue;
716                }
717            }
718        }
719    }
720}
721
722#[cfg(all(test, not(target_family = "wasm")))]
723mod tests {
724    use std::ops::Not;
725
726    use assert_matches::assert_matches;
727    use matrix_sdk_base::{
728        deserialized_responses::TimelineEventKind,
729        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
730        RoomState,
731    };
732    use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder};
733    use ruma::{
734        event_id,
735        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent},
736        owned_room_id, room_id, user_id, OwnedTransactionId,
737    };
738    use stream_assert::assert_pending;
739
740    use super::{
741        broadcast, listen_to_event_cache_and_send_queue_updates, mpsc, HashSet, LatestEventValue,
742        RemoteLatestEventValue, RoomEventCacheGenericUpdate, RoomRegistration, RoomSendQueueUpdate,
743        SendQueueUpdate,
744    };
745    use crate::test_utils::mocks::MatrixMockServer;
746
747    #[async_test]
748    async fn test_latest_events_are_lazy() {
749        let room_id_0 = room_id!("!r0");
750        let room_id_1 = room_id!("!r1");
751        let room_id_2 = room_id!("!r2");
752        let thread_id_1_0 = event_id!("$ev1.0");
753        let thread_id_2_0 = event_id!("$ev2.0");
754
755        let server = MatrixMockServer::new().await;
756        let client = server.client_builder().build().await;
757
758        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
759        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
760        client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
761
762        client.event_cache().subscribe().unwrap();
763
764        let latest_events = client.latest_events().await;
765
766        // Despites there are many rooms, zero `RoomLatestEvents` are created.
767        assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
768
769        // Now let's listen to two rooms.
770        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
771        assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
772
773        {
774            let rooms = latest_events.state.registered_rooms.rooms.read().await;
775            // There are two rooms…
776            assert_eq!(rooms.len(), 2);
777            // … which are room 0 and room 1.
778            assert!(rooms.contains_key(room_id_0));
779            assert!(rooms.contains_key(room_id_1));
780
781            // Room 0 contains zero thread latest events.
782            assert!(rooms.get(room_id_0).unwrap().per_thread.is_empty());
783            // Room 1 contains zero thread latest events.
784            assert!(rooms.get(room_id_1).unwrap().per_thread.is_empty());
785        }
786
787        // Now let's listen to one thread respectively for two rooms.
788        assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
789        assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
790
791        {
792            let rooms = latest_events.state.registered_rooms.rooms.read().await;
793            // There are now three rooms…
794            assert_eq!(rooms.len(), 3);
795            // … yup, room 2 is now created.
796            assert!(rooms.contains_key(room_id_0));
797            assert!(rooms.contains_key(room_id_1));
798            assert!(rooms.contains_key(room_id_2));
799
800            // Room 0 contains zero thread latest events.
801            assert!(rooms.get(room_id_0).unwrap().per_thread.is_empty());
802            // Room 1 contains one thread latest event…
803            assert_eq!(rooms.get(room_id_1).unwrap().per_thread.len(), 1);
804            // … which is thread 1.0.
805            assert!(rooms.get(room_id_1).unwrap().per_thread.contains_key(thread_id_1_0));
806            // Room 2 contains one thread latest event…
807            assert_eq!(rooms.get(room_id_2).unwrap().per_thread.len(), 1);
808            // … which is thread 2.0.
809            assert!(rooms.get(room_id_2).unwrap().per_thread.contains_key(thread_id_2_0));
810        }
811    }
812
813    #[async_test]
814    async fn test_forget_room() {
815        let room_id_0 = room_id!("!r0");
816        let room_id_1 = room_id!("!r1");
817
818        let server = MatrixMockServer::new().await;
819        let client = server.client_builder().build().await;
820
821        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
822        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
823
824        client.event_cache().subscribe().unwrap();
825
826        let latest_events = client.latest_events().await;
827
828        // Now let's fetch one room.
829        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
830
831        {
832            let rooms = latest_events.state.registered_rooms.rooms.read().await;
833            // There are one room…
834            assert_eq!(rooms.len(), 1);
835            // … which is room 0.
836            assert!(rooms.contains_key(room_id_0));
837
838            // Room 0 contains zero thread latest events.
839            assert!(rooms.get(room_id_0).unwrap().per_thread.is_empty());
840        }
841
842        // Now let's forget about room 0.
843        latest_events.forget_room(room_id_0).await;
844
845        {
846            let rooms = latest_events.state.registered_rooms.rooms.read().await;
847            // There are now zero rooms.
848            assert!(rooms.is_empty());
849        }
850    }
851
852    #[async_test]
853    async fn test_forget_thread() {
854        let room_id_0 = room_id!("!r0");
855        let room_id_1 = room_id!("!r1");
856        let thread_id_0_0 = event_id!("$ev0.0");
857
858        let server = MatrixMockServer::new().await;
859        let client = server.client_builder().build().await;
860
861        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
862        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
863
864        client.event_cache().subscribe().unwrap();
865
866        let latest_events = client.latest_events().await;
867
868        // Now let's fetch one thread .
869        assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
870
871        {
872            let rooms = latest_events.state.registered_rooms.rooms.read().await;
873            // There is one room…
874            assert_eq!(rooms.len(), 1);
875            // … which is room 0.
876            assert!(rooms.contains_key(room_id_0));
877
878            // Room 0 contains one thread latest event…
879            assert_eq!(rooms.get(room_id_0).unwrap().per_thread.len(), 1);
880            // … which is thread 0.0.
881            assert!(rooms.get(room_id_0).unwrap().per_thread.contains_key(thread_id_0_0));
882        }
883
884        // Now let's forget about the thread.
885        latest_events.forget_thread(room_id_0, thread_id_0_0).await;
886
887        {
888            let rooms = latest_events.state.registered_rooms.rooms.read().await;
889            // There is still one room…
890            assert_eq!(rooms.len(), 1);
891            // … which is room 0.
892            assert!(rooms.contains_key(room_id_0));
893
894            // But the thread has been removed.
895            assert!(rooms.get(room_id_0).unwrap().per_thread.is_empty());
896        }
897    }
898
899    #[async_test]
900    async fn test_inputs_task_can_listen_to_room_registration() {
901        let room_id_0 = owned_room_id!("!r0");
902        let room_id_1 = owned_room_id!("!r1");
903
904        let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
905        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
906            broadcast::channel(1);
907        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
908            broadcast::channel(1);
909        let mut listened_rooms = HashSet::new();
910        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
911
912        // Send a _room update_ for the first time.
913        {
914            // It mimics `LatestEvents::for_room`.
915            room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
916
917            // Run the task.
918            assert!(listen_to_event_cache_and_send_queue_updates(
919                &mut room_registration_receiver,
920                &mut room_event_cache_generic_update_receiver,
921                &mut send_queue_generic_update_receiver,
922                &mut listened_rooms,
923                &latest_event_queue_sender,
924            )
925            .await
926            .is_continue());
927
928            assert_eq!(listened_rooms.len(), 1);
929            assert!(listened_rooms.contains(&room_id_0));
930            assert!(latest_event_queue_receiver.is_empty());
931        }
932
933        // Send a _room update_ for the second time. It's the same room.
934        {
935            room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
936
937            // Run the task.
938            assert!(listen_to_event_cache_and_send_queue_updates(
939                &mut room_registration_receiver,
940                &mut room_event_cache_generic_update_receiver,
941                &mut send_queue_generic_update_receiver,
942                &mut listened_rooms,
943                &latest_event_queue_sender,
944            )
945            .await
946            .is_continue());
947
948            // This is the second time this room is added. Nothing happens.
949            assert_eq!(listened_rooms.len(), 1);
950            assert!(listened_rooms.contains(&room_id_0));
951            assert!(latest_event_queue_receiver.is_empty());
952        }
953
954        // Send another _room update_. It's a different room.
955        {
956            room_registration_sender
957                .send(RoomRegistration::Add(room_id_1.to_owned()))
958                .await
959                .unwrap();
960
961            // Run the task.
962            assert!(listen_to_event_cache_and_send_queue_updates(
963                &mut room_registration_receiver,
964                &mut room_event_cache_generic_update_receiver,
965                &mut send_queue_generic_update_receiver,
966                &mut listened_rooms,
967                &latest_event_queue_sender,
968            )
969            .await
970            .is_continue());
971
972            // This is the first time this room is added. It must appear.
973            assert_eq!(listened_rooms.len(), 2);
974            assert!(listened_rooms.contains(&room_id_0));
975            assert!(listened_rooms.contains(&room_id_1));
976            assert!(latest_event_queue_receiver.is_empty());
977        }
978    }
979
980    #[async_test]
981    async fn test_inputs_task_stops_when_room_registration_channel_is_closed() {
982        let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
983        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
984            broadcast::channel(1);
985        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
986            broadcast::channel(1);
987        let mut listened_rooms = HashSet::new();
988        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
989
990        // Close the receiver to close the channel.
991        room_registration_receiver.close();
992
993        // Run the task.
994        assert!(listen_to_event_cache_and_send_queue_updates(
995            &mut room_registration_receiver,
996            &mut room_event_cache_generic_update_receiver,
997            &mut send_queue_generic_update_receiver,
998            &mut listened_rooms,
999            &latest_event_queue_sender,
1000        )
1001        .await
1002        // It breaks!
1003        .is_break());
1004
1005        assert_eq!(listened_rooms.len(), 0);
1006        assert!(latest_event_queue_receiver.is_empty());
1007    }
1008
1009    #[async_test]
1010    async fn test_inputs_task_can_listen_to_room_event_cache() {
1011        let room_id = owned_room_id!("!r0");
1012
1013        let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1014        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1015            broadcast::channel(1);
1016        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1017            broadcast::channel(1);
1018        let mut listened_rooms = HashSet::new();
1019        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1020
1021        // New event cache update, but the `LatestEvents` isn't listening to it.
1022        {
1023            room_event_cache_generic_update_sender
1024                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
1025                .unwrap();
1026
1027            // Run the task.
1028            assert!(listen_to_event_cache_and_send_queue_updates(
1029                &mut room_registration_receiver,
1030                &mut room_event_cache_generic_update_receiver,
1031                &mut send_queue_generic_update_receiver,
1032                &mut listened_rooms,
1033                &latest_event_queue_sender,
1034            )
1035            .await
1036            .is_continue());
1037
1038            assert!(listened_rooms.is_empty());
1039
1040            // No latest event computation has been triggered.
1041            assert!(latest_event_queue_receiver.is_empty());
1042        }
1043
1044        // New event cache update, but this time, the `LatestEvents` is listening to it.
1045        {
1046            room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
1047            room_event_cache_generic_update_sender
1048                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
1049                .unwrap();
1050
1051            // Run the task to handle the `RoomRegistration` and the
1052            // `RoomEventCacheGenericUpdate`.
1053            for _ in 0..2 {
1054                assert!(listen_to_event_cache_and_send_queue_updates(
1055                    &mut room_registration_receiver,
1056                    &mut room_event_cache_generic_update_receiver,
1057                    &mut send_queue_generic_update_receiver,
1058                    &mut listened_rooms,
1059                    &latest_event_queue_sender,
1060                )
1061                .await
1062                .is_continue());
1063            }
1064
1065            assert_eq!(listened_rooms.len(), 1);
1066            assert!(listened_rooms.contains(&room_id));
1067
1068            // A latest event computation has been triggered!
1069            assert!(latest_event_queue_receiver.is_empty().not());
1070        }
1071    }
1072
1073    #[async_test]
1074    async fn test_inputs_task_can_listen_to_send_queue() {
1075        let room_id = owned_room_id!("!r0");
1076
1077        let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1078        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1079            broadcast::channel(1);
1080        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1081            broadcast::channel(1);
1082        let mut listened_rooms = HashSet::new();
1083        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1084
1085        // New send queue update, but the `LatestEvents` isn't listening to it.
1086        {
1087            send_queue_generic_update_sender
1088                .send(SendQueueUpdate {
1089                    room_id: room_id.clone(),
1090                    update: RoomSendQueueUpdate::SentEvent {
1091                        transaction_id: OwnedTransactionId::from("txnid0"),
1092                        event_id: event_id!("$ev0").to_owned(),
1093                    },
1094                })
1095                .unwrap();
1096
1097            // Run the task.
1098            assert!(listen_to_event_cache_and_send_queue_updates(
1099                &mut room_registration_receiver,
1100                &mut room_event_cache_generic_update_receiver,
1101                &mut send_queue_generic_update_receiver,
1102                &mut listened_rooms,
1103                &latest_event_queue_sender,
1104            )
1105            .await
1106            .is_continue());
1107
1108            assert!(listened_rooms.is_empty());
1109
1110            // No latest event computation has been triggered.
1111            assert!(latest_event_queue_receiver.is_empty());
1112        }
1113
1114        // New send queue update, but this time, the `LatestEvents` is listening to it.
1115        {
1116            room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
1117            send_queue_generic_update_sender
1118                .send(SendQueueUpdate {
1119                    room_id: room_id.clone(),
1120                    update: RoomSendQueueUpdate::SentEvent {
1121                        transaction_id: OwnedTransactionId::from("txnid1"),
1122                        event_id: event_id!("$ev1").to_owned(),
1123                    },
1124                })
1125                .unwrap();
1126
1127            // Run the task to handle the `RoomRegistration` and the `SendQueueUpdate`.
1128            for _ in 0..2 {
1129                assert!(listen_to_event_cache_and_send_queue_updates(
1130                    &mut room_registration_receiver,
1131                    &mut room_event_cache_generic_update_receiver,
1132                    &mut send_queue_generic_update_receiver,
1133                    &mut listened_rooms,
1134                    &latest_event_queue_sender,
1135                )
1136                .await
1137                .is_continue());
1138            }
1139
1140            assert_eq!(listened_rooms.len(), 1);
1141            assert!(listened_rooms.contains(&room_id));
1142
1143            // A latest event computation has been triggered!
1144            assert!(latest_event_queue_receiver.is_empty().not());
1145        }
1146    }
1147
1148    #[async_test]
1149    async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
1150        let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1151        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1152            broadcast::channel(1);
1153        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1154            broadcast::channel(1);
1155        let mut listened_rooms = HashSet::new();
1156        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1157
1158        // Drop the sender to close the channel.
1159        drop(room_event_cache_generic_update_sender);
1160
1161        // Run the task.
1162        assert!(listen_to_event_cache_and_send_queue_updates(
1163            &mut room_registration_receiver,
1164            &mut room_event_cache_generic_update_receiver,
1165            &mut send_queue_generic_update_receiver,
1166            &mut listened_rooms,
1167            &latest_event_queue_sender,
1168        )
1169        .await
1170        // It breaks!
1171        .is_break());
1172
1173        assert_eq!(listened_rooms.len(), 0);
1174        assert!(latest_event_queue_receiver.is_empty());
1175    }
1176
1177    #[async_test]
1178    async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
1179        let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1180        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1181            broadcast::channel(1);
1182        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1183            broadcast::channel(1);
1184        let mut listened_rooms = HashSet::new();
1185        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1186
1187        // Drop the sender to close the channel.
1188        drop(send_queue_generic_update_sender);
1189
1190        // Run the task.
1191        assert!(listen_to_event_cache_and_send_queue_updates(
1192            &mut room_registration_receiver,
1193            &mut room_event_cache_generic_update_receiver,
1194            &mut send_queue_generic_update_receiver,
1195            &mut listened_rooms,
1196            &latest_event_queue_sender,
1197        )
1198        .await
1199        // It breaks!
1200        .is_break());
1201
1202        assert_eq!(listened_rooms.len(), 0);
1203        assert!(latest_event_queue_receiver.is_empty());
1204    }
1205
1206    #[async_test]
1207    async fn test_latest_event_value_is_updated_via_event_cache() {
1208        let room_id = owned_room_id!("!r0");
1209        let user_id = user_id!("@mnt_io:matrix.org");
1210        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1211        let event_id_0 = event_id!("$ev0");
1212        let event_id_1 = event_id!("$ev1");
1213        let event_id_2 = event_id!("$ev2");
1214
1215        let server = MatrixMockServer::new().await;
1216        let client = server.client_builder().build().await;
1217
1218        // Prelude.
1219        {
1220            // Create the room.
1221            client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1222
1223            // Initialise the event cache store.
1224            client
1225                .event_cache_store()
1226                .lock()
1227                .await
1228                .unwrap()
1229                .handle_linked_chunk_updates(
1230                    LinkedChunkId::Room(&room_id),
1231                    vec![
1232                        Update::NewItemsChunk {
1233                            previous: None,
1234                            new: ChunkIdentifier::new(0),
1235                            next: None,
1236                        },
1237                        Update::PushItems {
1238                            at: Position::new(ChunkIdentifier::new(0), 0),
1239                            items: vec![
1240                                event_factory.text_msg("hello").event_id(event_id_0).into(),
1241                                event_factory.text_msg("world").event_id(event_id_1).into(),
1242                            ],
1243                        },
1244                    ],
1245                )
1246                .await
1247                .unwrap();
1248        }
1249
1250        let event_cache = client.event_cache();
1251        event_cache.subscribe().unwrap();
1252
1253        let latest_events = client.latest_events().await;
1254
1255        // Subscribe to the latest event values for this room.
1256        let mut latest_event_stream =
1257            latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1258
1259        // The initial latest event value is set to `event_id_1` because it's… the…
1260        // latest event!
1261        assert_matches!(
1262            latest_event_stream.get().await,
1263            LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1264                assert_matches!(
1265                    event.deserialize().unwrap(),
1266                    AnySyncTimelineEvent::MessageLike(
1267                        AnySyncMessageLikeEvent::RoomMessage(
1268                            SyncMessageLikeEvent::Original(message_content)
1269                        )
1270                    ) => {
1271                        assert_eq!(message_content.content.body(), "world");
1272                    }
1273                );
1274            }
1275        );
1276
1277        // The stream is pending: no new latest event for the moment.
1278        assert_pending!(latest_event_stream);
1279
1280        // Update the event cache with a sync.
1281        server
1282            .sync_room(
1283                &client,
1284                JoinedRoomBuilder::new(&room_id).add_timeline_event(
1285                    event_factory.text_msg("raclette !").event_id(event_id_2).into_raw(),
1286                ),
1287            )
1288            .await;
1289
1290        // The event cache has received its update from the sync. It has emitted a
1291        // generic update, which has been received by `LatestEvents` tasks, up to the
1292        // `compute_latest_events` which has updated the latest event value.
1293        assert_matches!(
1294            latest_event_stream.next().await,
1295            Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1296                assert_matches!(
1297                    event.deserialize().unwrap(),
1298                    AnySyncTimelineEvent::MessageLike(
1299                        AnySyncMessageLikeEvent::RoomMessage(
1300                            SyncMessageLikeEvent::Original(message_content)
1301                        )
1302                    ) => {
1303                        assert_eq!(message_content.content.body(), "raclette !");
1304                    }
1305                );
1306            }
1307        );
1308    }
1309}