matrix_sdk/latest_events/
mod.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The Latest Events API provides a lazy, reactive and efficient way to compute
16//! the latest event for a room or a thread.
17//!
18//! The latest event represents the last displayable and relevant event a room
19//! or a thread has been received. It is usually displayed in a _summary_, e.g.
20//! below the room title in a room list.
21//!
22//! The entry point is [`LatestEvents`]. It is preferable to get a reference to
23//! it from [`Client::latest_events`][crate::Client::latest_events], which
24//! already plugs everything to build it. [`LatestEvents`] is using the
25//! [`EventCache`] and the [`SendQueue`] to respectively get known remote events
26//! (i.e. synced from the server), or local events (i.e. ones being sent).
27//!
28//! ## Laziness
29//!
30//! [`LatestEvents`] is lazy, it means that, despite [`LatestEvents`] is
31//! listening to all [`EventCache`] or [`SendQueue`] updates, it will only do
32//! something if one is expected to get the latest event for a particular room
33//! or a particular thread. Concretely, it means that until
34//! [`LatestEvents::listen_to_room`] is called for a particular room, no latest
35//! event will ever be computed for that room (and similarly with
36//! [`LatestEvents::listen_to_thread`]).
37//!
38//! If one is no longer interested to get the latest event for a particular room
39//! or thread, the [`LatestEvents::forget_room`] and
40//! [`LatestEvents::forget_thread`] methods must be used.
41//!
42//! ## Reactive
43//!
44//! [`LatestEvents`] is designed to be reactive. Using
45//! [`LatestEvents::listen_and_subscribe_to_room`] will provide a
46//! [`Subscriber`], which brings all the tooling to get the current value or the
47//! future values with a stream.
48
49mod error;
50mod latest_event;
51mod room_latest_events;
52
53use std::{
54    collections::{HashMap, HashSet},
55    ops::{ControlFlow, Not},
56    sync::Arc,
57};
58
59pub use error::LatestEventsError;
60use eyeball::{AsyncLock, Subscriber};
61use futures_util::FutureExt;
62use latest_event::LatestEvent;
63pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
64use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
65use room_latest_events::RoomLatestEvents;
66use ruma::{EventId, OwnedRoomId, RoomId};
67use tokio::{
68    select,
69    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
70};
71use tracing::{error, warn};
72
73use crate::{
74    client::WeakClient,
75    event_cache::{EventCache, RoomEventCacheGenericUpdate},
76    room::WeakRoom,
77    send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
78};
79
80/// The entry point to fetch the [`LatestEventValue`] for rooms or threads.
81#[derive(Clone, Debug)]
82pub struct LatestEvents {
83    state: Arc<LatestEventsState>,
84}
85
86/// The state of [`LatestEvents`].
87#[derive(Debug)]
88struct LatestEventsState {
89    /// All the registered rooms, i.e. rooms the latest events are computed for.
90    registered_rooms: Arc<RegisteredRooms>,
91
92    /// The task handle of the
93    /// [`listen_to_event_cache_and_send_queue_updates_task`].
94    _listen_task_handle: AbortOnDrop<()>,
95
96    /// The task handle of the [`compute_latest_events_task`].
97    _computation_task_handle: AbortOnDrop<()>,
98}
99
100impl LatestEvents {
101    /// Create a new [`LatestEvents`].
102    pub(crate) fn new(
103        weak_client: WeakClient,
104        event_cache: EventCache,
105        send_queue: SendQueue,
106    ) -> Self {
107        let (room_registration_sender, room_registration_receiver) = mpsc::channel(32);
108        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
109
110        let registered_rooms =
111            Arc::new(RegisteredRooms::new(room_registration_sender, weak_client, &event_cache));
112
113        // The task listening to the event cache and the send queue updates.
114        let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task(
115            registered_rooms.clone(),
116            room_registration_receiver,
117            event_cache,
118            send_queue,
119            latest_event_queue_sender,
120        ))
121        .abort_on_drop();
122
123        // The task computing the new latest events.
124        let computation_task_handle = spawn(compute_latest_events_task(
125            registered_rooms.clone(),
126            latest_event_queue_receiver,
127        ))
128        .abort_on_drop();
129
130        Self {
131            state: Arc::new(LatestEventsState {
132                registered_rooms,
133                _listen_task_handle: listen_task_handle,
134                _computation_task_handle: computation_task_handle,
135            }),
136        }
137    }
138
139    /// Start listening to updates (if not already) for a particular room.
140    ///
141    /// It returns `true` if the room exists, `false` otherwise.
142    pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
143        Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
144    }
145
146    /// Check whether the system listens to a particular room.
147    ///
148    /// Note: It's a test only method.
149    #[cfg(test)]
150    pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
151        self.state.registered_rooms.rooms.read().await.contains_key(room_id)
152    }
153
154    /// Start listening to updates (if not already) for a particular room, and
155    /// return a [`Subscriber`] to get the current and future
156    /// [`LatestEventValue`]s.
157    ///
158    /// It returns `Some` if the room exists, `None` otherwise.
159    pub async fn listen_and_subscribe_to_room(
160        &self,
161        room_id: &RoomId,
162    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
163        let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
164            return Ok(None);
165        };
166
167        let room_latest_events = room_latest_events.read().await;
168        let latest_event = room_latest_events.for_room();
169
170        Ok(Some(latest_event.subscribe().await))
171    }
172
173    /// Start listening to updates (if not already) for a particular room and a
174    /// particular thread in this room.
175    ///
176    /// It returns `true` if the room and the thread exists, `false` otherwise.
177    pub async fn listen_to_thread(
178        &self,
179        room_id: &RoomId,
180        thread_id: &EventId,
181    ) -> Result<bool, LatestEventsError> {
182        Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
183    }
184
185    /// Start listening to updates (if not already) for a particular room and a
186    /// particular thread in this room, and return a [`Subscriber`] to get the
187    /// current and future [`LatestEventValue`]s.
188    ///
189    /// It returns `Some` if the room and the thread exists, `None` otherwise.
190    pub async fn listen_and_subscribe_to_thread(
191        &self,
192        room_id: &RoomId,
193        thread_id: &EventId,
194    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
195        let Some(room_latest_events) =
196            self.state.registered_rooms.for_thread(room_id, thread_id).await?
197        else {
198            return Ok(None);
199        };
200
201        let room_latest_events = room_latest_events.read().await;
202        let latest_event = room_latest_events
203            .for_thread(thread_id)
204            .expect("The `LatestEvent` for the thread must have been created");
205
206        Ok(Some(latest_event.subscribe().await))
207    }
208
209    /// Forget a room.
210    ///
211    /// It means that [`LatestEvents`] will stop listening to updates for the
212    /// `LatestEvent`s of the room and all its threads.
213    ///
214    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
215    pub async fn forget_room(&self, room_id: &RoomId) {
216        self.state.registered_rooms.forget_room(room_id).await;
217    }
218
219    /// Forget a thread.
220    ///
221    /// It means that [`LatestEvents`] will stop listening to updates for the
222    /// `LatestEvent` of the thread.
223    ///
224    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
225    /// nothing happens.
226    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
227        self.state.registered_rooms.forget_thread(room_id, thread_id).await;
228    }
229}
230
231#[derive(Debug)]
232struct RegisteredRooms {
233    /// All the registered [`RoomLatestEvents`].
234    rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
235
236    /// The sender part of the channel about room registration.
237    ///
238    /// When a room is registered (with [`LatestEvents::listen_to_room`] or
239    /// [`LatestEvents::listen_to_thread`]) or unregistered (with
240    /// [`LatestEvents::forget_room`] or [`LatestEvents::forget_thread`]), a
241    /// room registration message is passed on this channel.
242    ///
243    /// The receiver part of the channel is in the
244    /// [`listen_to_event_cache_and_send_queue_updates_task`].
245    room_registration_sender: mpsc::Sender<RoomRegistration>,
246
247    /// The (weak) client.
248    weak_client: WeakClient,
249
250    /// The event cache.
251    event_cache: EventCache,
252}
253
254impl RegisteredRooms {
255    fn new(
256        room_registration_sender: mpsc::Sender<RoomRegistration>,
257        weak_client: WeakClient,
258        event_cache: &EventCache,
259    ) -> Self {
260        Self {
261            rooms: RwLock::new(HashMap::default()),
262            room_registration_sender,
263            weak_client,
264            event_cache: event_cache.clone(),
265        }
266    }
267
268    /// Get a read lock guard to a [`RoomLatestEvents`] given a room ID and an
269    /// optional thread ID.
270    ///
271    /// The [`RoomLatestEvents`], and the associated [`LatestEvent`], will be
272    /// created if missing. It means that write lock is taken if necessary, but
273    /// it's always downgraded to a read lock at the end.
274    async fn room_latest_event(
275        &self,
276        room_id: &RoomId,
277        thread_id: Option<&EventId>,
278    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
279        Ok(match thread_id {
280            // Get the room latest event with the aim of fetching the latest event for a particular
281            // thread.
282            //
283            // We need to take a write lock immediately, in case the thead latest event doesn't
284            // exist.
285            Some(thread_id) => {
286                let mut rooms = self.rooms.write().await;
287
288                // The `RoomLatestEvents` doesn't exist. Let's create and insert it.
289                if rooms.contains_key(room_id).not() {
290                    // Insert the room if it's been successfully created.
291                    if let Some(room_latest_event) = RoomLatestEvents::new(
292                        WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
293                        &self.event_cache,
294                    )
295                    .await?
296                    {
297                        rooms.insert(room_id.to_owned(), room_latest_event);
298
299                        let _ = self
300                            .room_registration_sender
301                            .send(RoomRegistration::Add(room_id.to_owned()))
302                            .await;
303                    }
304                }
305
306                if let Some(room_latest_event) = rooms.get(room_id) {
307                    let mut room_latest_event = room_latest_event.write().await;
308
309                    // In `RoomLatestEvents`, the `LatestEvent` for this thread doesn't exist. Let's
310                    // create and insert it.
311                    if room_latest_event.has_thread(thread_id).not() {
312                        room_latest_event
313                            .create_and_insert_latest_event_for_thread(thread_id)
314                            .await;
315                    }
316                }
317
318                RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
319            }
320
321            // Get the room latest event with the aim of fetching the latest event for a particular
322            // room.
323            None => {
324                match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
325                    .ok()
326                {
327                    value @ Some(_) => value,
328                    None => {
329                        let mut rooms = self.rooms.write().await;
330
331                        if rooms.contains_key(room_id).not() {
332                            // Insert the room if it's been successfully created.
333                            if let Some(room_latest_event) = RoomLatestEvents::new(
334                                WeakRoom::new(self.weak_client.clone(), room_id.to_owned()),
335                                &self.event_cache,
336                            )
337                            .await?
338                            {
339                                rooms.insert(room_id.to_owned(), room_latest_event);
340
341                                let _ = self
342                                    .room_registration_sender
343                                    .send(RoomRegistration::Add(room_id.to_owned()))
344                                    .await;
345                            }
346                        }
347
348                        RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
349                    }
350                }
351            }
352        })
353    }
354
355    /// Start listening to updates (if not already) for a particular room.
356    ///
357    /// It returns `None` if the room doesn't exist.
358    pub async fn for_room(
359        &self,
360        room_id: &RoomId,
361    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
362        self.room_latest_event(room_id, None).await
363    }
364
365    /// Start listening to updates (if not already) for a particular room.
366    ///
367    /// It returns `None` if the room or the thread doesn't exist.
368    pub async fn for_thread(
369        &self,
370        room_id: &RoomId,
371        thread_id: &EventId,
372    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
373        self.room_latest_event(room_id, Some(thread_id)).await
374    }
375
376    /// Forget a room.
377    ///
378    /// It means that [`LatestEvents`] will stop listening to updates for the
379    /// `LatestEvent`s of the room and all its threads.
380    ///
381    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
382    pub async fn forget_room(&self, room_id: &RoomId) {
383        {
384            let mut rooms = self.rooms.write().await;
385
386            // Remove the whole `RoomLatestEvents`.
387            rooms.remove(room_id);
388        }
389
390        let _ =
391            self.room_registration_sender.send(RoomRegistration::Remove(room_id.to_owned())).await;
392    }
393
394    /// Forget a thread.
395    ///
396    /// It means that [`LatestEvents`] will stop listening to updates for the
397    /// `LatestEvent` of the thread.
398    ///
399    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
400    /// nothing happens.
401    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
402        let rooms = self.rooms.read().await;
403
404        // If the `RoomLatestEvents`, remove the `LatestEvent` in `per_thread`.
405        if let Some(room_latest_event) = rooms.get(room_id) {
406            let mut room_latest_event = room_latest_event.write().await;
407
408            // Release the lock on `self.rooms`.
409            drop(rooms);
410
411            room_latest_event.forget_thread(thread_id);
412        }
413    }
414}
415
416/// Represents whether a room has been registered or forgotten.
417///
418/// This is used by [`RegisteredRooms::for_room`],
419/// [`RegisteredRooms::for_thread`], [`RegisteredRooms::forget_room`] and
420/// [`RegisteredRooms::forget_thread`].
421#[derive(Debug)]
422enum RoomRegistration {
423    /// [`LatestEvents`] wants to listen to this room.
424    Add(OwnedRoomId),
425
426    /// [`LatestEvents`] wants to no longer listen to this room.
427    Remove(OwnedRoomId),
428}
429
430/// Represents the kind of updates the [`compute_latest_events_task`] will have
431/// to deal with.
432enum LatestEventQueueUpdate {
433    /// An update from the [`EventCache`] happened.
434    EventCache {
435        /// The ID of the room that has triggered the update.
436        room_id: OwnedRoomId,
437    },
438
439    /// An update from the [`SendQueue`] happened.
440    SendQueue {
441        /// The ID of the room that has triggered the update.
442        room_id: OwnedRoomId,
443
444        /// The update itself.
445        update: RoomSendQueueUpdate,
446    },
447}
448
449/// The task responsible to listen to the [`EventCache`] and the [`SendQueue`].
450/// When an update is received and is considered relevant, a message is sent to
451/// the [`compute_latest_events_task`] to compute a new [`LatestEvent`].
452///
453/// This task also listens to [`RoomRegistration`] (see
454/// [`LatestEventsState::room_registration_sender`] for the sender part). It
455/// keeps an internal list of registered rooms, which helps to filter out
456/// updates we aren't interested by.
457///
458/// When an update is considered relevant, a message is sent over the
459/// `latest_event_queue_sender` channel. See [`compute_latest_events_task`].
460async fn listen_to_event_cache_and_send_queue_updates_task(
461    registered_rooms: Arc<RegisteredRooms>,
462    mut room_registration_receiver: mpsc::Receiver<RoomRegistration>,
463    event_cache: EventCache,
464    send_queue: SendQueue,
465    latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
466) {
467    let mut event_cache_generic_updates_subscriber =
468        event_cache.subscribe_to_room_generic_updates();
469    let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
470
471    // Initialise the list of rooms that are listened.
472    //
473    // Technically, we can use `registered_rooms.rooms` every time to get this
474    // information, but it would involve a read-lock. In order to reduce the
475    // pressure on this lock, we use this intermediate structure.
476    let mut listened_rooms =
477        HashSet::from_iter(registered_rooms.rooms.read().await.keys().cloned());
478
479    loop {
480        if listen_to_event_cache_and_send_queue_updates(
481            &mut room_registration_receiver,
482            &mut event_cache_generic_updates_subscriber,
483            &mut send_queue_generic_updates_subscriber,
484            &mut listened_rooms,
485            &latest_event_queue_sender,
486        )
487        .await
488        .is_break()
489        {
490            warn!("`listen_to_event_cache_and_send_queue_updates_task` has stopped");
491
492            break;
493        }
494    }
495}
496
497/// The core of [`listen_to_event_cache_and_send_queue_updates_task`].
498///
499/// Having this function detached from its task is helpful for testing and for
500/// state isolation.
501async fn listen_to_event_cache_and_send_queue_updates(
502    room_registration_receiver: &mut mpsc::Receiver<RoomRegistration>,
503    event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
504    send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
505    listened_rooms: &mut HashSet<OwnedRoomId>,
506    latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
507) -> ControlFlow<()> {
508    // We need a biased select here: `room_registration_receiver` must have the
509    // priority over other futures.
510    select! {
511        biased;
512
513        update = room_registration_receiver.recv().fuse() => {
514            match update {
515                Some(RoomRegistration::Add(room_id)) => {
516                    listened_rooms.insert(room_id);
517                }
518                Some(RoomRegistration::Remove(room_id)) => {
519                    listened_rooms.remove(&room_id);
520                }
521                None => {
522                    error!("`room_registration` channel has been closed");
523
524                    return ControlFlow::Break(());
525                }
526            }
527        }
528
529        room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv().fuse() => {
530            if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
531                if listened_rooms.contains(&room_id) {
532                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
533                        room_id
534                    });
535                }
536            } else {
537                warn!("`event_cache_generic_updates` channel has been closed");
538
539                return ControlFlow::Break(());
540            }
541        }
542
543        send_queue_generic_update = send_queue_generic_updates_subscriber.recv().fuse() => {
544            if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
545                if listened_rooms.contains(&room_id) {
546                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
547                        room_id,
548                        update
549                    });
550                }
551            } else {
552                warn!("`send_queue_generic_updates` channel has been closed");
553
554                return ControlFlow::Break(());
555            }
556        }
557    }
558
559    ControlFlow::Continue(())
560}
561
562/// The task responsible to compute new [`LatestEvent`] for a particular room or
563/// thread.
564///
565/// The messages are coming from
566/// [`listen_to_event_cache_and_send_queue_updates_task`].
567async fn compute_latest_events_task(
568    registered_rooms: Arc<RegisteredRooms>,
569    mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
570) {
571    const BUFFER_SIZE: usize = 16;
572
573    let mut buffer = Vec::with_capacity(BUFFER_SIZE);
574
575    while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
576        compute_latest_events(&registered_rooms, &buffer).await;
577        buffer.clear();
578    }
579
580    warn!("`compute_latest_events_task` has stopped");
581}
582
583async fn compute_latest_events(
584    registered_rooms: &RegisteredRooms,
585    latest_event_queue_updates: &[LatestEventQueueUpdate],
586) {
587    for latest_event_queue_update in latest_event_queue_updates {
588        match latest_event_queue_update {
589            LatestEventQueueUpdate::EventCache { room_id } => {
590                let rooms = registered_rooms.rooms.read().await;
591
592                if let Some(room_latest_events) = rooms.get(room_id) {
593                    let mut room_latest_events = room_latest_events.write().await;
594
595                    // Release the lock on `registered_rooms`.
596                    // It is possible because `room_latest_events` is an owned lock guard.
597                    drop(rooms);
598
599                    room_latest_events.update_with_event_cache().await;
600                } else {
601                    error!(?room_id, "Failed to find the room");
602
603                    continue;
604                }
605            }
606
607            LatestEventQueueUpdate::SendQueue { room_id, update } => {
608                let rooms = registered_rooms.rooms.read().await;
609
610                if let Some(room_latest_events) = rooms.get(room_id) {
611                    let mut room_latest_events = room_latest_events.write().await;
612
613                    // Release the lock on `registered_rooms`.
614                    // It is possible because `room_latest_events` is an owned lock guard.
615                    drop(rooms);
616
617                    room_latest_events.update_with_send_queue(update).await;
618                } else {
619                    error!(?room_id, "Failed to find the room");
620
621                    continue;
622                }
623            }
624        }
625    }
626}
627
628#[cfg(all(test, not(target_family = "wasm")))]
629mod tests {
630    use std::ops::Not;
631
632    use assert_matches::assert_matches;
633    use matrix_sdk_base::{
634        RoomState,
635        deserialized_responses::TimelineEventKind,
636        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
637    };
638    use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
639    use ruma::{
640        OwnedTransactionId, event_id,
641        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent},
642        owned_room_id, room_id, user_id,
643    };
644    use stream_assert::assert_pending;
645
646    use super::{
647        HashSet, LatestEventValue, RemoteLatestEventValue, RoomEventCacheGenericUpdate,
648        RoomRegistration, RoomSendQueueUpdate, SendQueueUpdate, broadcast,
649        listen_to_event_cache_and_send_queue_updates, mpsc,
650    };
651    use crate::test_utils::mocks::MatrixMockServer;
652
653    #[async_test]
654    async fn test_latest_events_are_lazy() {
655        let room_id_0 = room_id!("!r0");
656        let room_id_1 = room_id!("!r1");
657        let room_id_2 = room_id!("!r2");
658        let thread_id_1_0 = event_id!("$ev1.0");
659        let thread_id_2_0 = event_id!("$ev2.0");
660
661        let server = MatrixMockServer::new().await;
662        let client = server.client_builder().build().await;
663
664        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
665        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
666        client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
667
668        client.event_cache().subscribe().unwrap();
669
670        let latest_events = client.latest_events().await;
671
672        // Despites there are many rooms, zero `RoomLatestEvents` are created.
673        assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
674
675        // Now let's listen to two rooms.
676        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
677        assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
678
679        {
680            let rooms = latest_events.state.registered_rooms.rooms.read().await;
681            // There are two rooms…
682            assert_eq!(rooms.len(), 2);
683            // … which are room 0 and room 1.
684            assert!(rooms.contains_key(room_id_0));
685            assert!(rooms.contains_key(room_id_1));
686
687            // Room 0 contains zero thread latest events.
688            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
689            // Room 1 contains zero thread latest events.
690            assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
691        }
692
693        // Now let's listen to one thread respectively for two rooms.
694        assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
695        assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
696
697        {
698            let rooms = latest_events.state.registered_rooms.rooms.read().await;
699            // There are now three rooms…
700            assert_eq!(rooms.len(), 3);
701            // … yup, room 2 is now created.
702            assert!(rooms.contains_key(room_id_0));
703            assert!(rooms.contains_key(room_id_1));
704            assert!(rooms.contains_key(room_id_2));
705
706            // Room 0 contains zero thread latest events.
707            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
708            // Room 1 contains one thread latest event…
709            let room_1 = rooms.get(room_id_1).unwrap().read().await;
710            assert_eq!(room_1.per_thread().len(), 1);
711            // … which is thread 1.0.
712            assert!(room_1.per_thread().contains_key(thread_id_1_0));
713            // Room 2 contains one thread latest event…
714            let room_2 = rooms.get(room_id_2).unwrap().read().await;
715            assert_eq!(room_2.per_thread().len(), 1);
716            // … which is thread 2.0.
717            assert!(room_2.per_thread().contains_key(thread_id_2_0));
718        }
719    }
720
721    #[async_test]
722    async fn test_forget_room() {
723        let room_id_0 = room_id!("!r0");
724        let room_id_1 = room_id!("!r1");
725
726        let server = MatrixMockServer::new().await;
727        let client = server.client_builder().build().await;
728
729        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
730        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
731
732        client.event_cache().subscribe().unwrap();
733
734        let latest_events = client.latest_events().await;
735
736        // Now let's fetch one room.
737        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
738
739        {
740            let rooms = latest_events.state.registered_rooms.rooms.read().await;
741            // There are one room…
742            assert_eq!(rooms.len(), 1);
743            // … which is room 0.
744            assert!(rooms.contains_key(room_id_0));
745
746            // Room 0 contains zero thread latest events.
747            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
748        }
749
750        // Now let's forget about room 0.
751        latest_events.forget_room(room_id_0).await;
752
753        {
754            let rooms = latest_events.state.registered_rooms.rooms.read().await;
755            // There are now zero rooms.
756            assert!(rooms.is_empty());
757        }
758    }
759
760    #[async_test]
761    async fn test_forget_thread() {
762        let room_id_0 = room_id!("!r0");
763        let room_id_1 = room_id!("!r1");
764        let thread_id_0_0 = event_id!("$ev0.0");
765
766        let server = MatrixMockServer::new().await;
767        let client = server.client_builder().build().await;
768
769        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
770        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
771
772        client.event_cache().subscribe().unwrap();
773
774        let latest_events = client.latest_events().await;
775
776        // Now let's fetch one thread .
777        assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
778
779        {
780            let rooms = latest_events.state.registered_rooms.rooms.read().await;
781            // There is one room…
782            assert_eq!(rooms.len(), 1);
783            // … which is room 0.
784            assert!(rooms.contains_key(room_id_0));
785
786            // Room 0 contains one thread latest event…
787            let room_0 = rooms.get(room_id_0).unwrap().read().await;
788            assert_eq!(room_0.per_thread().len(), 1);
789            // … which is thread 0.0.
790            assert!(room_0.per_thread().contains_key(thread_id_0_0));
791        }
792
793        // Now let's forget about the thread.
794        latest_events.forget_thread(room_id_0, thread_id_0_0).await;
795
796        {
797            let rooms = latest_events.state.registered_rooms.rooms.read().await;
798            // There is still one room…
799            assert_eq!(rooms.len(), 1);
800            // … which is room 0.
801            assert!(rooms.contains_key(room_id_0));
802
803            // But the thread has been removed.
804            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
805        }
806    }
807
808    #[async_test]
809    async fn test_inputs_task_can_listen_to_room_registration() {
810        let room_id_0 = owned_room_id!("!r0");
811        let room_id_1 = owned_room_id!("!r1");
812
813        let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
814        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
815            broadcast::channel(1);
816        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
817            broadcast::channel(1);
818        let mut listened_rooms = HashSet::new();
819        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
820
821        // Send a _room update_ for the first time.
822        {
823            // It mimics `LatestEvents::for_room`.
824            room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
825
826            // Run the task.
827            assert!(
828                listen_to_event_cache_and_send_queue_updates(
829                    &mut room_registration_receiver,
830                    &mut room_event_cache_generic_update_receiver,
831                    &mut send_queue_generic_update_receiver,
832                    &mut listened_rooms,
833                    &latest_event_queue_sender,
834                )
835                .await
836                .is_continue()
837            );
838
839            assert_eq!(listened_rooms.len(), 1);
840            assert!(listened_rooms.contains(&room_id_0));
841            assert!(latest_event_queue_receiver.is_empty());
842        }
843
844        // Send a _room update_ for the second time. It's the same room.
845        {
846            room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).await.unwrap();
847
848            // Run the task.
849            assert!(
850                listen_to_event_cache_and_send_queue_updates(
851                    &mut room_registration_receiver,
852                    &mut room_event_cache_generic_update_receiver,
853                    &mut send_queue_generic_update_receiver,
854                    &mut listened_rooms,
855                    &latest_event_queue_sender,
856                )
857                .await
858                .is_continue()
859            );
860
861            // This is the second time this room is added. Nothing happens.
862            assert_eq!(listened_rooms.len(), 1);
863            assert!(listened_rooms.contains(&room_id_0));
864            assert!(latest_event_queue_receiver.is_empty());
865        }
866
867        // Send another _room update_. It's a different room.
868        {
869            room_registration_sender
870                .send(RoomRegistration::Add(room_id_1.to_owned()))
871                .await
872                .unwrap();
873
874            // Run the task.
875            assert!(
876                listen_to_event_cache_and_send_queue_updates(
877                    &mut room_registration_receiver,
878                    &mut room_event_cache_generic_update_receiver,
879                    &mut send_queue_generic_update_receiver,
880                    &mut listened_rooms,
881                    &latest_event_queue_sender,
882                )
883                .await
884                .is_continue()
885            );
886
887            // This is the first time this room is added. It must appear.
888            assert_eq!(listened_rooms.len(), 2);
889            assert!(listened_rooms.contains(&room_id_0));
890            assert!(listened_rooms.contains(&room_id_1));
891            assert!(latest_event_queue_receiver.is_empty());
892        }
893    }
894
895    #[async_test]
896    async fn test_inputs_task_stops_when_room_registration_channel_is_closed() {
897        let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
898        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
899            broadcast::channel(1);
900        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
901            broadcast::channel(1);
902        let mut listened_rooms = HashSet::new();
903        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
904
905        // Close the receiver to close the channel.
906        room_registration_receiver.close();
907
908        // Run the task.
909        assert!(
910            listen_to_event_cache_and_send_queue_updates(
911                &mut room_registration_receiver,
912                &mut room_event_cache_generic_update_receiver,
913                &mut send_queue_generic_update_receiver,
914                &mut listened_rooms,
915                &latest_event_queue_sender,
916            )
917            .await
918            // It breaks!
919            .is_break()
920        );
921
922        assert_eq!(listened_rooms.len(), 0);
923        assert!(latest_event_queue_receiver.is_empty());
924    }
925
926    #[async_test]
927    async fn test_inputs_task_can_listen_to_room_event_cache() {
928        let room_id = owned_room_id!("!r0");
929
930        let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
931        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
932            broadcast::channel(1);
933        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
934            broadcast::channel(1);
935        let mut listened_rooms = HashSet::new();
936        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
937
938        // New event cache update, but the `LatestEvents` isn't listening to it.
939        {
940            room_event_cache_generic_update_sender
941                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
942                .unwrap();
943
944            // Run the task.
945            assert!(
946                listen_to_event_cache_and_send_queue_updates(
947                    &mut room_registration_receiver,
948                    &mut room_event_cache_generic_update_receiver,
949                    &mut send_queue_generic_update_receiver,
950                    &mut listened_rooms,
951                    &latest_event_queue_sender,
952                )
953                .await
954                .is_continue()
955            );
956
957            assert!(listened_rooms.is_empty());
958
959            // No latest event computation has been triggered.
960            assert!(latest_event_queue_receiver.is_empty());
961        }
962
963        // New event cache update, but this time, the `LatestEvents` is listening to it.
964        {
965            room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
966            room_event_cache_generic_update_sender
967                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
968                .unwrap();
969
970            // Run the task to handle the `RoomRegistration` and the
971            // `RoomEventCacheGenericUpdate`.
972            for _ in 0..2 {
973                assert!(
974                    listen_to_event_cache_and_send_queue_updates(
975                        &mut room_registration_receiver,
976                        &mut room_event_cache_generic_update_receiver,
977                        &mut send_queue_generic_update_receiver,
978                        &mut listened_rooms,
979                        &latest_event_queue_sender,
980                    )
981                    .await
982                    .is_continue()
983                );
984            }
985
986            assert_eq!(listened_rooms.len(), 1);
987            assert!(listened_rooms.contains(&room_id));
988
989            // A latest event computation has been triggered!
990            assert!(latest_event_queue_receiver.is_empty().not());
991        }
992    }
993
994    #[async_test]
995    async fn test_inputs_task_can_listen_to_send_queue() {
996        let room_id = owned_room_id!("!r0");
997
998        let (room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
999        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1000            broadcast::channel(1);
1001        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1002            broadcast::channel(1);
1003        let mut listened_rooms = HashSet::new();
1004        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1005
1006        // New send queue update, but the `LatestEvents` isn't listening to it.
1007        {
1008            send_queue_generic_update_sender
1009                .send(SendQueueUpdate {
1010                    room_id: room_id.clone(),
1011                    update: RoomSendQueueUpdate::SentEvent {
1012                        transaction_id: OwnedTransactionId::from("txnid0"),
1013                        event_id: event_id!("$ev0").to_owned(),
1014                    },
1015                })
1016                .unwrap();
1017
1018            // Run the task.
1019            assert!(
1020                listen_to_event_cache_and_send_queue_updates(
1021                    &mut room_registration_receiver,
1022                    &mut room_event_cache_generic_update_receiver,
1023                    &mut send_queue_generic_update_receiver,
1024                    &mut listened_rooms,
1025                    &latest_event_queue_sender,
1026                )
1027                .await
1028                .is_continue()
1029            );
1030
1031            assert!(listened_rooms.is_empty());
1032
1033            // No latest event computation has been triggered.
1034            assert!(latest_event_queue_receiver.is_empty());
1035        }
1036
1037        // New send queue update, but this time, the `LatestEvents` is listening to it.
1038        {
1039            room_registration_sender.send(RoomRegistration::Add(room_id.clone())).await.unwrap();
1040            send_queue_generic_update_sender
1041                .send(SendQueueUpdate {
1042                    room_id: room_id.clone(),
1043                    update: RoomSendQueueUpdate::SentEvent {
1044                        transaction_id: OwnedTransactionId::from("txnid1"),
1045                        event_id: event_id!("$ev1").to_owned(),
1046                    },
1047                })
1048                .unwrap();
1049
1050            // Run the task to handle the `RoomRegistration` and the `SendQueueUpdate`.
1051            for _ in 0..2 {
1052                assert!(
1053                    listen_to_event_cache_and_send_queue_updates(
1054                        &mut room_registration_receiver,
1055                        &mut room_event_cache_generic_update_receiver,
1056                        &mut send_queue_generic_update_receiver,
1057                        &mut listened_rooms,
1058                        &latest_event_queue_sender,
1059                    )
1060                    .await
1061                    .is_continue()
1062                );
1063            }
1064
1065            assert_eq!(listened_rooms.len(), 1);
1066            assert!(listened_rooms.contains(&room_id));
1067
1068            // A latest event computation has been triggered!
1069            assert!(latest_event_queue_receiver.is_empty().not());
1070        }
1071    }
1072
1073    #[async_test]
1074    async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
1075        let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1076        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1077            broadcast::channel(1);
1078        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1079            broadcast::channel(1);
1080        let mut listened_rooms = HashSet::new();
1081        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1082
1083        // Drop the sender to close the channel.
1084        drop(room_event_cache_generic_update_sender);
1085
1086        // Run the task.
1087        assert!(
1088            listen_to_event_cache_and_send_queue_updates(
1089                &mut room_registration_receiver,
1090                &mut room_event_cache_generic_update_receiver,
1091                &mut send_queue_generic_update_receiver,
1092                &mut listened_rooms,
1093                &latest_event_queue_sender,
1094            )
1095            .await
1096            // It breaks!
1097            .is_break()
1098        );
1099
1100        assert_eq!(listened_rooms.len(), 0);
1101        assert!(latest_event_queue_receiver.is_empty());
1102    }
1103
1104    #[async_test]
1105    async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
1106        let (_room_registration_sender, mut room_registration_receiver) = mpsc::channel(1);
1107        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1108            broadcast::channel(1);
1109        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1110            broadcast::channel(1);
1111        let mut listened_rooms = HashSet::new();
1112        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1113
1114        // Drop the sender to close the channel.
1115        drop(send_queue_generic_update_sender);
1116
1117        // Run the task.
1118        assert!(
1119            listen_to_event_cache_and_send_queue_updates(
1120                &mut room_registration_receiver,
1121                &mut room_event_cache_generic_update_receiver,
1122                &mut send_queue_generic_update_receiver,
1123                &mut listened_rooms,
1124                &latest_event_queue_sender,
1125            )
1126            .await
1127            // It breaks!
1128            .is_break()
1129        );
1130
1131        assert_eq!(listened_rooms.len(), 0);
1132        assert!(latest_event_queue_receiver.is_empty());
1133    }
1134
1135    #[async_test]
1136    async fn test_latest_event_value_is_updated_via_event_cache() {
1137        let room_id = owned_room_id!("!r0");
1138        let user_id = user_id!("@mnt_io:matrix.org");
1139        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1140        let event_id_0 = event_id!("$ev0");
1141        let event_id_1 = event_id!("$ev1");
1142        let event_id_2 = event_id!("$ev2");
1143
1144        let server = MatrixMockServer::new().await;
1145        let client = server.client_builder().build().await;
1146
1147        // Prelude.
1148        {
1149            // Create the room.
1150            client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1151
1152            // Initialise the event cache store.
1153            client
1154                .event_cache_store()
1155                .lock()
1156                .await
1157                .unwrap()
1158                .handle_linked_chunk_updates(
1159                    LinkedChunkId::Room(&room_id),
1160                    vec![
1161                        Update::NewItemsChunk {
1162                            previous: None,
1163                            new: ChunkIdentifier::new(0),
1164                            next: None,
1165                        },
1166                        Update::PushItems {
1167                            at: Position::new(ChunkIdentifier::new(0), 0),
1168                            items: vec![
1169                                event_factory.text_msg("hello").event_id(event_id_0).into(),
1170                                event_factory.text_msg("world").event_id(event_id_1).into(),
1171                            ],
1172                        },
1173                    ],
1174                )
1175                .await
1176                .unwrap();
1177        }
1178
1179        let event_cache = client.event_cache();
1180        event_cache.subscribe().unwrap();
1181
1182        let latest_events = client.latest_events().await;
1183
1184        // Subscribe to the latest event values for this room.
1185        let mut latest_event_stream =
1186            latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1187
1188        // The initial latest event value is set to `event_id_1` because it's… the…
1189        // latest event!
1190        assert_matches!(
1191            latest_event_stream.get().await,
1192            LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => {
1193                assert_matches!(
1194                    event.deserialize().unwrap(),
1195                    AnySyncTimelineEvent::MessageLike(
1196                        AnySyncMessageLikeEvent::RoomMessage(
1197                            SyncMessageLikeEvent::Original(message_content)
1198                        )
1199                    ) => {
1200                        assert_eq!(message_content.content.body(), "world");
1201                    }
1202                );
1203            }
1204        );
1205
1206        // The stream is pending: no new latest event for the moment.
1207        assert_pending!(latest_event_stream);
1208
1209        // Update the event cache with a sync.
1210        server
1211            .sync_room(
1212                &client,
1213                JoinedRoomBuilder::new(&room_id)
1214                    .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_2)),
1215            )
1216            .await;
1217
1218        // The event cache has received its update from the sync. It has emitted a
1219        // generic update, which has been received by `LatestEvents` tasks, up to the
1220        // `compute_latest_events` which has updated the latest event value.
1221        assert_matches!(
1222            latest_event_stream.next().await,
1223            Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1224                assert_matches!(
1225                    event.deserialize().unwrap(),
1226                    AnySyncTimelineEvent::MessageLike(
1227                        AnySyncMessageLikeEvent::RoomMessage(
1228                            SyncMessageLikeEvent::Original(message_content)
1229                        )
1230                    ) => {
1231                        assert_eq!(message_content.content.body(), "raclette !");
1232                    }
1233                );
1234            }
1235        );
1236    }
1237}