matrix_sdk/latest_events/
mod.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The Latest Events API provides a lazy, reactive and efficient way to compute
16//! the latest event for a room or a thread.
17//!
18//! The latest event represents the last displayable and relevant event a room
19//! or a thread has been received. It is usually displayed in a _summary_, e.g.
20//! below the room title in a room list.
21//!
22//! The entry point is [`LatestEvents`]. It is preferable to get a reference to
23//! it from [`Client::latest_events`][crate::Client::latest_events], which
24//! already plugs everything to build it. [`LatestEvents`] is using the
25//! [`EventCache`] and the [`SendQueue`] to respectively get known remote events
26//! (i.e. synced from the server), or local events (i.e. ones being sent).
27//!
28//! ## Laziness
29//!
30//! [`LatestEvents`] is lazy, it means that, despite [`LatestEvents`] is
31//! listening to all [`EventCache`] or [`SendQueue`] updates, it will only do
32//! something if one is expected to get the latest event for a particular room
33//! or a particular thread. Concretely, it means that until
34//! [`LatestEvents::listen_to_room`] is called for a particular room, no latest
35//! event will ever be computed for that room (and similarly with
36//! [`LatestEvents::listen_to_thread`]).
37//!
38//! If one is no longer interested to get the latest event for a particular room
39//! or thread, the [`LatestEvents::forget_room`] and
40//! [`LatestEvents::forget_thread`] methods must be used.
41//!
42//! ## Reactive
43//!
44//! [`LatestEvents`] is designed to be reactive. Using
45//! [`LatestEvents::listen_and_subscribe_to_room`] will provide a
46//! [`Subscriber`], which brings all the tooling to get the current value or the
47//! future values with a stream.
48
49mod error;
50mod latest_event;
51mod room_latest_events;
52
53use std::{
54    collections::HashMap,
55    ops::{ControlFlow, DerefMut, Not},
56    sync::Arc,
57};
58
59pub use error::LatestEventsError;
60use eyeball::{AsyncLock, Subscriber};
61use latest_event::{LatestEvent, With};
62pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
63use matrix_sdk_base::timer;
64use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
65use room_latest_events::RoomLatestEvents;
66use ruma::{EventId, OwnedRoomId, RoomId};
67use tokio::{
68    select,
69    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
70};
71use tracing::{error, warn};
72
73use crate::{
74    client::WeakClient,
75    event_cache::{EventCache, RoomEventCacheGenericUpdate},
76    room::WeakRoom,
77    send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
78};
79
80/// The entry point to fetch the [`LatestEventValue`] for rooms or threads.
81#[derive(Clone, Debug)]
82pub struct LatestEvents {
83    state: Arc<LatestEventsState>,
84}
85
86/// The state of [`LatestEvents`].
87#[derive(Debug)]
88struct LatestEventsState {
89    /// All the registered rooms, i.e. rooms the latest events are computed for.
90    registered_rooms: Arc<RegisteredRooms>,
91
92    /// The task handle of the
93    /// [`listen_to_event_cache_and_send_queue_updates_task`].
94    _listen_task_handle: AbortOnDrop<()>,
95
96    /// The task handle of the [`compute_latest_events_task`].
97    _computation_task_handle: AbortOnDrop<()>,
98}
99
100impl LatestEvents {
101    /// Create a new [`LatestEvents`].
102    pub(crate) fn new(
103        weak_client: WeakClient,
104        event_cache: EventCache,
105        send_queue: SendQueue,
106    ) -> Self {
107        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
108
109        let registered_rooms =
110            Arc::new(RegisteredRooms::new(weak_client, &event_cache, &latest_event_queue_sender));
111
112        // The task listening to the event cache and the send queue updates.
113        let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task(
114            registered_rooms.clone(),
115            event_cache,
116            send_queue,
117            latest_event_queue_sender,
118        ))
119        .abort_on_drop();
120
121        // The task computing the new latest events.
122        let computation_task_handle = spawn(compute_latest_events_task(
123            registered_rooms.clone(),
124            latest_event_queue_receiver,
125        ))
126        .abort_on_drop();
127
128        Self {
129            state: Arc::new(LatestEventsState {
130                registered_rooms,
131                _listen_task_handle: listen_task_handle,
132                _computation_task_handle: computation_task_handle,
133            }),
134        }
135    }
136
137    /// Start listening to updates (if not already) for a particular room.
138    ///
139    /// It returns `true` if the room exists, `false` otherwise.
140    pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
141        Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
142    }
143
144    /// Check whether the system listens to a particular room.
145    ///
146    /// Note: It's a test only method.
147    #[cfg(test)]
148    pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
149        self.state.registered_rooms.rooms.read().await.contains_key(room_id)
150    }
151
152    /// Start listening to updates (if not already) for a particular room, and
153    /// return a [`Subscriber`] to get the current and future
154    /// [`LatestEventValue`]s.
155    ///
156    /// It returns `Some` if the room exists, `None` otherwise.
157    pub async fn listen_and_subscribe_to_room(
158        &self,
159        room_id: &RoomId,
160    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
161        let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
162            return Ok(None);
163        };
164
165        let room_latest_events = room_latest_events.read().await;
166        let latest_event = room_latest_events.for_room();
167
168        Ok(Some(latest_event.subscribe().await))
169    }
170
171    /// Start listening to updates (if not already) for a particular room and a
172    /// particular thread in this room.
173    ///
174    /// It returns `true` if the room and the thread exists, `false` otherwise.
175    pub async fn listen_to_thread(
176        &self,
177        room_id: &RoomId,
178        thread_id: &EventId,
179    ) -> Result<bool, LatestEventsError> {
180        Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
181    }
182
183    /// Start listening to updates (if not already) for a particular room and a
184    /// particular thread in this room, and return a [`Subscriber`] to get the
185    /// current and future [`LatestEventValue`]s.
186    ///
187    /// It returns `Some` if the room and the thread exists, `None` otherwise.
188    pub async fn listen_and_subscribe_to_thread(
189        &self,
190        room_id: &RoomId,
191        thread_id: &EventId,
192    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
193        let Some(room_latest_events) =
194            self.state.registered_rooms.for_thread(room_id, thread_id).await?
195        else {
196            return Ok(None);
197        };
198
199        let room_latest_events = room_latest_events.read().await;
200        let latest_event = room_latest_events
201            .for_thread(thread_id)
202            .expect("The `LatestEvent` for the thread must have been created");
203
204        Ok(Some(latest_event.subscribe().await))
205    }
206
207    /// Forget a room.
208    ///
209    /// It means that [`LatestEvents`] will stop listening to updates for the
210    /// `LatestEvent`s of the room and all its threads.
211    ///
212    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
213    pub async fn forget_room(&self, room_id: &RoomId) {
214        self.state.registered_rooms.forget_room(room_id).await;
215    }
216
217    /// Forget a thread.
218    ///
219    /// It means that [`LatestEvents`] will stop listening to updates for the
220    /// `LatestEvent` of the thread.
221    ///
222    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
223    /// nothing happens.
224    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
225        self.state.registered_rooms.forget_thread(room_id, thread_id).await;
226    }
227}
228
229#[derive(Debug)]
230struct RegisteredRooms {
231    /// All the registered [`RoomLatestEvents`].
232    rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
233
234    /// The (weak) client.
235    weak_client: WeakClient,
236
237    /// The event cache.
238    event_cache: EventCache,
239
240    /// The sender part of the channel used by [`compute_latest_events_task`].
241    ///
242    /// This is used to _trigger_ a computation of a `LatestEventValue` if the
243    /// restored value is `None`.
244    latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
245}
246
247impl RegisteredRooms {
248    fn new(
249        weak_client: WeakClient,
250        event_cache: &EventCache,
251        latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
252    ) -> Self {
253        Self {
254            rooms: RwLock::new(HashMap::default()),
255            weak_client,
256            event_cache: event_cache.clone(),
257            latest_event_queue_sender: latest_event_queue_sender.clone(),
258        }
259    }
260
261    /// Get a read lock guard to a [`RoomLatestEvents`] given a room ID and an
262    /// optional thread ID.
263    ///
264    /// The [`RoomLatestEvents`], and the associated [`LatestEvent`], will be
265    /// created if missing. It means that write lock is taken if necessary, but
266    /// it's always downgraded to a read lock at the end.
267    async fn room_latest_event(
268        &self,
269        room_id: &RoomId,
270        thread_id: Option<&EventId>,
271    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
272        fn create_and_insert_room_latest_events(
273            room_id: &RoomId,
274            rooms: &mut HashMap<OwnedRoomId, RoomLatestEvents>,
275            weak_client: &WeakClient,
276            event_cache: &EventCache,
277            latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
278        ) {
279            let (room_latest_events, is_latest_event_value_none) =
280                With::unzip(RoomLatestEvents::new(
281                    WeakRoom::new(weak_client.clone(), room_id.to_owned()),
282                    event_cache,
283                ));
284
285            // Insert the new `RoomLatestEvents`.
286            rooms.insert(room_id.to_owned(), room_latest_events);
287
288            // If the `LatestEventValue` restored by `RoomLatestEvents` is of kind `None`,
289            // let's try to re-compute it without waiting on the Event Cache (so the sync
290            // usually) or the Send Queue. Maybe the system has migrated to a new version
291            // and the `LatestEventValue` has been erased, while it is still possible to
292            // compute a correct value.
293            if is_latest_event_value_none {
294                let _ = latest_event_queue_sender
295                    .send(LatestEventQueueUpdate::EventCache { room_id: room_id.to_owned() });
296            }
297        }
298
299        Ok(match thread_id {
300            // Get the room latest event with the aim of fetching the latest event for a particular
301            // thread.
302            //
303            // We need to take a write lock immediately, in case the thead latest event doesn't
304            // exist.
305            Some(thread_id) => {
306                let mut rooms = self.rooms.write().await;
307
308                // The `RoomLatestEvents` doesn't exist. Let's create and insert it.
309                if rooms.contains_key(room_id).not() {
310                    create_and_insert_room_latest_events(
311                        room_id,
312                        rooms.deref_mut(),
313                        &self.weak_client,
314                        &self.event_cache,
315                        &self.latest_event_queue_sender,
316                    );
317                }
318
319                if let Some(room_latest_event) = rooms.get(room_id) {
320                    let mut room_latest_event = room_latest_event.write().await;
321
322                    // In `RoomLatestEvents`, the `LatestEvent` for this thread doesn't exist. Let's
323                    // create and insert it.
324                    if room_latest_event.has_thread(thread_id).not() {
325                        room_latest_event.create_and_insert_latest_event_for_thread(thread_id);
326                    }
327                }
328
329                RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
330            }
331
332            // Get the room latest event with the aim of fetching the latest event for a particular
333            // room.
334            None => {
335                match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
336                    .ok()
337                {
338                    value @ Some(_) => value,
339                    None => {
340                        let _timer = timer!(
341                            tracing::Level::INFO,
342                            format!("Creating `RoomLatestEvents` for {room_id:?}"),
343                        );
344
345                        let mut rooms = self.rooms.write().await;
346
347                        if rooms.contains_key(room_id).not() {
348                            create_and_insert_room_latest_events(
349                                room_id,
350                                rooms.deref_mut(),
351                                &self.weak_client,
352                                &self.event_cache,
353                                &self.latest_event_queue_sender,
354                            );
355                        }
356
357                        RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
358                    }
359                }
360            }
361        })
362    }
363
364    /// Start listening to updates (if not already) for a particular room.
365    ///
366    /// It returns `None` if the room doesn't exist.
367    pub async fn for_room(
368        &self,
369        room_id: &RoomId,
370    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
371        self.room_latest_event(room_id, None).await
372    }
373
374    /// Start listening to updates (if not already) for a particular room.
375    ///
376    /// It returns `None` if the room or the thread doesn't exist.
377    pub async fn for_thread(
378        &self,
379        room_id: &RoomId,
380        thread_id: &EventId,
381    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
382        self.room_latest_event(room_id, Some(thread_id)).await
383    }
384
385    /// Forget a room.
386    ///
387    /// It means that [`LatestEvents`] will stop listening to updates for the
388    /// `LatestEvent`s of the room and all its threads.
389    ///
390    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
391    pub async fn forget_room(&self, room_id: &RoomId) {
392        {
393            let mut rooms = self.rooms.write().await;
394
395            // Remove the whole `RoomLatestEvents`.
396            rooms.remove(room_id);
397        }
398    }
399
400    /// Forget a thread.
401    ///
402    /// It means that [`LatestEvents`] will stop listening to updates for the
403    /// `LatestEvent` of the thread.
404    ///
405    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
406    /// nothing happens.
407    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
408        let rooms = self.rooms.read().await;
409
410        // If the `RoomLatestEvents`, remove the `LatestEvent` in `per_thread`.
411        if let Some(room_latest_event) = rooms.get(room_id) {
412            let mut room_latest_event = room_latest_event.write().await;
413
414            // Release the lock on `self.rooms`.
415            drop(rooms);
416
417            room_latest_event.forget_thread(thread_id);
418        }
419    }
420}
421
422/// Represents the kind of updates the [`compute_latest_events_task`] will have
423/// to deal with.
424#[derive(Debug)]
425enum LatestEventQueueUpdate {
426    /// An update from the [`EventCache`] happened.
427    EventCache {
428        /// The ID of the room that has triggered the update.
429        room_id: OwnedRoomId,
430    },
431
432    /// An update from the [`SendQueue`] happened.
433    SendQueue {
434        /// The ID of the room that has triggered the update.
435        room_id: OwnedRoomId,
436
437        /// The update itself.
438        update: RoomSendQueueUpdate,
439    },
440}
441
442/// The task responsible to listen to the [`EventCache`] and the [`SendQueue`].
443/// When an update is received and is considered relevant, a message is sent to
444/// the [`compute_latest_events_task`] to compute a new [`LatestEvent`].
445///
446/// When an update is considered relevant, a message is sent over the
447/// `latest_event_queue_sender` channel. See [`compute_latest_events_task`].
448async fn listen_to_event_cache_and_send_queue_updates_task(
449    registered_rooms: Arc<RegisteredRooms>,
450    event_cache: EventCache,
451    send_queue: SendQueue,
452    latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
453) {
454    let mut event_cache_generic_updates_subscriber =
455        event_cache.subscribe_to_room_generic_updates();
456    let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
457
458    loop {
459        if listen_to_event_cache_and_send_queue_updates(
460            &registered_rooms.rooms,
461            &mut event_cache_generic_updates_subscriber,
462            &mut send_queue_generic_updates_subscriber,
463            &latest_event_queue_sender,
464        )
465        .await
466        .is_break()
467        {
468            warn!("`listen_to_event_cache_and_send_queue_updates_task` has stopped");
469
470            break;
471        }
472    }
473}
474
475/// The core of [`listen_to_event_cache_and_send_queue_updates_task`].
476///
477/// Having this function detached from its task is helpful for testing and for
478/// state isolation.
479async fn listen_to_event_cache_and_send_queue_updates(
480    registered_rooms: &RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
481    event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
482    send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
483    latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
484) -> ControlFlow<()> {
485    select! {
486        room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv() => {
487            if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
488                if registered_rooms.read().await.contains_key(&room_id) {
489                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
490                        room_id
491                    });
492                }
493            } else {
494                warn!("`event_cache_generic_updates` channel has been closed");
495
496                return ControlFlow::Break(());
497            }
498        }
499
500        send_queue_generic_update = send_queue_generic_updates_subscriber.recv() => {
501            if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
502                if registered_rooms.read().await.contains_key(&room_id) {
503                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
504                        room_id,
505                        update
506                    });
507                }
508            } else {
509                warn!("`send_queue_generic_updates` channel has been closed");
510
511                return ControlFlow::Break(());
512            }
513        }
514    }
515
516    ControlFlow::Continue(())
517}
518
519/// The task responsible to compute new [`LatestEvent`] for a particular room or
520/// thread.
521///
522/// The messages are coming from
523/// [`listen_to_event_cache_and_send_queue_updates_task`].
524async fn compute_latest_events_task(
525    registered_rooms: Arc<RegisteredRooms>,
526    mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
527) {
528    const BUFFER_SIZE: usize = 16;
529
530    let mut buffer = Vec::with_capacity(BUFFER_SIZE);
531
532    while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
533        compute_latest_events(&registered_rooms, &buffer).await;
534        buffer.clear();
535    }
536
537    warn!("`compute_latest_events_task` has stopped");
538}
539
540async fn compute_latest_events(
541    registered_rooms: &RegisteredRooms,
542    latest_event_queue_updates: &[LatestEventQueueUpdate],
543) {
544    for latest_event_queue_update in latest_event_queue_updates {
545        match latest_event_queue_update {
546            LatestEventQueueUpdate::EventCache { room_id } => {
547                let rooms = registered_rooms.rooms.read().await;
548
549                if let Some(room_latest_events) = rooms.get(room_id) {
550                    let mut room_latest_events = room_latest_events.write().await;
551
552                    // Release the lock on `registered_rooms`.
553                    // It is possible because `room_latest_events` is an owned lock guard.
554                    drop(rooms);
555
556                    room_latest_events.update_with_event_cache().await;
557                } else {
558                    error!(?room_id, "Failed to find the room");
559
560                    continue;
561                }
562            }
563
564            LatestEventQueueUpdate::SendQueue { room_id, update } => {
565                let rooms = registered_rooms.rooms.read().await;
566
567                if let Some(room_latest_events) = rooms.get(room_id) {
568                    let mut room_latest_events = room_latest_events.write().await;
569
570                    // Release the lock on `registered_rooms`.
571                    // It is possible because `room_latest_events` is an owned lock guard.
572                    drop(rooms);
573
574                    room_latest_events.update_with_send_queue(update).await;
575                } else {
576                    error!(?room_id, "Failed to find the room");
577
578                    continue;
579                }
580            }
581        }
582    }
583}
584
585#[cfg(test)]
586fn local_room_message(body: &str) -> LocalLatestEventValue {
587    use matrix_sdk_base::store::SerializableEventContent;
588    use ruma::{
589        MilliSecondsSinceUnixEpoch,
590        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
591    };
592
593    LocalLatestEventValue {
594        timestamp: MilliSecondsSinceUnixEpoch::now(),
595        content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
596            RoomMessageEventContent::text_plain(body),
597        ))
598        .unwrap(),
599    }
600}
601
602#[cfg(all(test, not(target_family = "wasm")))]
603mod tests {
604    use std::{collections::HashMap, ops::Not};
605
606    use assert_matches::assert_matches;
607    use matrix_sdk_base::{
608        RoomState,
609        deserialized_responses::TimelineEventKind,
610        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
611    };
612    use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
613    use ruma::{
614        OwnedTransactionId, event_id,
615        events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent},
616        owned_room_id, room_id, user_id,
617    };
618    use stream_assert::assert_pending;
619    use tokio::task::yield_now;
620
621    use super::{
622        LatestEventValue, RegisteredRooms, RemoteLatestEventValue, RoomEventCacheGenericUpdate,
623        RoomLatestEvents, RoomSendQueueUpdate, RwLock, SendQueueUpdate, WeakClient, WeakRoom, With,
624        broadcast, listen_to_event_cache_and_send_queue_updates, mpsc,
625    };
626    use crate::{
627        latest_events::{LatestEventQueueUpdate, local_room_message},
628        test_utils::mocks::MatrixMockServer,
629    };
630
631    #[async_test]
632    async fn test_latest_events_are_lazy() {
633        let room_id_0 = room_id!("!r0");
634        let room_id_1 = room_id!("!r1");
635        let room_id_2 = room_id!("!r2");
636        let thread_id_1_0 = event_id!("$ev1.0");
637        let thread_id_2_0 = event_id!("$ev2.0");
638
639        let server = MatrixMockServer::new().await;
640        let client = server.client_builder().build().await;
641
642        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
643        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
644        client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
645
646        client.event_cache().subscribe().unwrap();
647
648        let latest_events = client.latest_events().await;
649
650        // Despites there are many rooms, zero `RoomLatestEvents` are created.
651        assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
652
653        // Now let's listen to two rooms.
654        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
655        assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
656
657        {
658            let rooms = latest_events.state.registered_rooms.rooms.read().await;
659            // There are two rooms…
660            assert_eq!(rooms.len(), 2);
661            // … which are room 0 and room 1.
662            assert!(rooms.contains_key(room_id_0));
663            assert!(rooms.contains_key(room_id_1));
664
665            // Room 0 contains zero thread latest events.
666            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
667            // Room 1 contains zero thread latest events.
668            assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
669        }
670
671        // Now let's listen to one thread respectively for two rooms.
672        assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
673        assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
674
675        {
676            let rooms = latest_events.state.registered_rooms.rooms.read().await;
677            // There are now three rooms…
678            assert_eq!(rooms.len(), 3);
679            // … yup, room 2 is now created.
680            assert!(rooms.contains_key(room_id_0));
681            assert!(rooms.contains_key(room_id_1));
682            assert!(rooms.contains_key(room_id_2));
683
684            // Room 0 contains zero thread latest events.
685            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
686            // Room 1 contains one thread latest event…
687            let room_1 = rooms.get(room_id_1).unwrap().read().await;
688            assert_eq!(room_1.per_thread().len(), 1);
689            // … which is thread 1.0.
690            assert!(room_1.per_thread().contains_key(thread_id_1_0));
691            // Room 2 contains one thread latest event…
692            let room_2 = rooms.get(room_id_2).unwrap().read().await;
693            assert_eq!(room_2.per_thread().len(), 1);
694            // … which is thread 2.0.
695            assert!(room_2.per_thread().contains_key(thread_id_2_0));
696        }
697    }
698
699    #[async_test]
700    async fn test_forget_room() {
701        let room_id_0 = room_id!("!r0");
702        let room_id_1 = room_id!("!r1");
703
704        let server = MatrixMockServer::new().await;
705        let client = server.client_builder().build().await;
706
707        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
708        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
709
710        client.event_cache().subscribe().unwrap();
711
712        let latest_events = client.latest_events().await;
713
714        // Now let's fetch one room.
715        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
716
717        {
718            let rooms = latest_events.state.registered_rooms.rooms.read().await;
719            // There are one room…
720            assert_eq!(rooms.len(), 1);
721            // … which is room 0.
722            assert!(rooms.contains_key(room_id_0));
723
724            // Room 0 contains zero thread latest events.
725            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
726        }
727
728        // Now let's forget about room 0.
729        latest_events.forget_room(room_id_0).await;
730
731        {
732            let rooms = latest_events.state.registered_rooms.rooms.read().await;
733            // There are now zero rooms.
734            assert!(rooms.is_empty());
735        }
736    }
737
738    #[async_test]
739    async fn test_forget_thread() {
740        let room_id_0 = room_id!("!r0");
741        let room_id_1 = room_id!("!r1");
742        let thread_id_0_0 = event_id!("$ev0.0");
743
744        let server = MatrixMockServer::new().await;
745        let client = server.client_builder().build().await;
746
747        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
748        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
749
750        client.event_cache().subscribe().unwrap();
751
752        let latest_events = client.latest_events().await;
753
754        // Now let's fetch one thread .
755        assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
756
757        {
758            let rooms = latest_events.state.registered_rooms.rooms.read().await;
759            // There is one room…
760            assert_eq!(rooms.len(), 1);
761            // … which is room 0.
762            assert!(rooms.contains_key(room_id_0));
763
764            // Room 0 contains one thread latest event…
765            let room_0 = rooms.get(room_id_0).unwrap().read().await;
766            assert_eq!(room_0.per_thread().len(), 1);
767            // … which is thread 0.0.
768            assert!(room_0.per_thread().contains_key(thread_id_0_0));
769        }
770
771        // Now let's forget about the thread.
772        latest_events.forget_thread(room_id_0, thread_id_0_0).await;
773
774        {
775            let rooms = latest_events.state.registered_rooms.rooms.read().await;
776            // There is still one room…
777            assert_eq!(rooms.len(), 1);
778            // … which is room 0.
779            assert!(rooms.contains_key(room_id_0));
780
781            // But the thread has been removed.
782            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
783        }
784    }
785
786    #[async_test]
787    async fn test_inputs_task_can_listen_to_room_event_cache() {
788        let room_id = owned_room_id!("!r0");
789
790        let server = MatrixMockServer::new().await;
791        let client = server.client_builder().build().await;
792        let weak_client = WeakClient::from_client(&client);
793        let weak_room = WeakRoom::new(weak_client, room_id.clone());
794
795        let event_cache = client.event_cache();
796
797        let registered_rooms = RwLock::new(HashMap::new());
798        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
799            broadcast::channel(1);
800        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
801            broadcast::channel(1);
802        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
803
804        // New event cache update, but the `LatestEvents` isn't listening to it.
805        {
806            room_event_cache_generic_update_sender
807                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
808                .unwrap();
809
810            // Run the task.
811            assert!(
812                listen_to_event_cache_and_send_queue_updates(
813                    &registered_rooms,
814                    &mut room_event_cache_generic_update_receiver,
815                    &mut send_queue_generic_update_receiver,
816                    &latest_event_queue_sender,
817                )
818                .await
819                .is_continue()
820            );
821
822            // No latest event computation has been triggered.
823            assert!(latest_event_queue_receiver.is_empty());
824        }
825
826        // New event cache update, but this time, the `LatestEvents` is listening to it.
827        {
828            registered_rooms.write().await.insert(
829                room_id.clone(),
830                With::inner(RoomLatestEvents::new(weak_room, event_cache)),
831            );
832            room_event_cache_generic_update_sender
833                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
834                .unwrap();
835
836            assert!(
837                listen_to_event_cache_and_send_queue_updates(
838                    &registered_rooms,
839                    &mut room_event_cache_generic_update_receiver,
840                    &mut send_queue_generic_update_receiver,
841                    &latest_event_queue_sender,
842                )
843                .await
844                .is_continue()
845            );
846
847            // A latest event computation has been triggered!
848            assert!(latest_event_queue_receiver.is_empty().not());
849        }
850    }
851
852    #[async_test]
853    async fn test_inputs_task_can_listen_to_send_queue() {
854        let room_id = owned_room_id!("!r0");
855
856        let server = MatrixMockServer::new().await;
857        let client = server.client_builder().build().await;
858        let weak_client = WeakClient::from_client(&client);
859        let weak_room = WeakRoom::new(weak_client, room_id.clone());
860
861        let event_cache = client.event_cache();
862
863        let registered_rooms = RwLock::new(HashMap::new());
864
865        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
866            broadcast::channel(1);
867        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
868            broadcast::channel(1);
869        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
870
871        // New send queue update, but the `LatestEvents` isn't listening to it.
872        {
873            send_queue_generic_update_sender
874                .send(SendQueueUpdate {
875                    room_id: room_id.clone(),
876                    update: RoomSendQueueUpdate::SentEvent {
877                        transaction_id: OwnedTransactionId::from("txnid0"),
878                        event_id: event_id!("$ev0").to_owned(),
879                    },
880                })
881                .unwrap();
882
883            // Run the task.
884            assert!(
885                listen_to_event_cache_and_send_queue_updates(
886                    &registered_rooms,
887                    &mut room_event_cache_generic_update_receiver,
888                    &mut send_queue_generic_update_receiver,
889                    &latest_event_queue_sender,
890                )
891                .await
892                .is_continue()
893            );
894
895            // No latest event computation has been triggered.
896            assert!(latest_event_queue_receiver.is_empty());
897        }
898
899        // New send queue update, but this time, the `LatestEvents` is listening to it.
900        {
901            registered_rooms.write().await.insert(
902                room_id.clone(),
903                With::inner(RoomLatestEvents::new(weak_room, event_cache)),
904            );
905            send_queue_generic_update_sender
906                .send(SendQueueUpdate {
907                    room_id: room_id.clone(),
908                    update: RoomSendQueueUpdate::SentEvent {
909                        transaction_id: OwnedTransactionId::from("txnid1"),
910                        event_id: event_id!("$ev1").to_owned(),
911                    },
912                })
913                .unwrap();
914
915            assert!(
916                listen_to_event_cache_and_send_queue_updates(
917                    &registered_rooms,
918                    &mut room_event_cache_generic_update_receiver,
919                    &mut send_queue_generic_update_receiver,
920                    &latest_event_queue_sender,
921                )
922                .await
923                .is_continue()
924            );
925
926            // A latest event computation has been triggered!
927            assert!(latest_event_queue_receiver.is_empty().not());
928        }
929    }
930
931    #[async_test]
932    async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
933        let registered_rooms = RwLock::new(HashMap::new());
934        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
935            broadcast::channel(1);
936        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
937            broadcast::channel(1);
938        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
939
940        // Drop the sender to close the channel.
941        drop(room_event_cache_generic_update_sender);
942
943        // Run the task.
944        assert!(
945            listen_to_event_cache_and_send_queue_updates(
946                &registered_rooms,
947                &mut room_event_cache_generic_update_receiver,
948                &mut send_queue_generic_update_receiver,
949                &latest_event_queue_sender,
950            )
951            .await
952            // It breaks!
953            .is_break()
954        );
955
956        assert!(latest_event_queue_receiver.is_empty());
957    }
958
959    #[async_test]
960    async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
961        let registered_rooms = RwLock::new(HashMap::new());
962        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
963            broadcast::channel(1);
964        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
965            broadcast::channel(1);
966        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
967
968        // Drop the sender to close the channel.
969        drop(send_queue_generic_update_sender);
970
971        // Run the task.
972        assert!(
973            listen_to_event_cache_and_send_queue_updates(
974                &registered_rooms,
975                &mut room_event_cache_generic_update_receiver,
976                &mut send_queue_generic_update_receiver,
977                &latest_event_queue_sender,
978            )
979            .await
980            // It breaks!
981            .is_break()
982        );
983
984        assert!(latest_event_queue_receiver.is_empty());
985    }
986
987    #[async_test]
988    async fn test_latest_event_value_is_updated_via_event_cache() {
989        let room_id = owned_room_id!("!r0");
990        let user_id = user_id!("@mnt_io:matrix.org");
991        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
992        let event_id_2 = event_id!("$ev2");
993
994        let server = MatrixMockServer::new().await;
995        let client = server.client_builder().build().await;
996
997        // Create the room.
998        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
999
1000        let event_cache = client.event_cache();
1001        event_cache.subscribe().unwrap();
1002
1003        let latest_events = client.latest_events().await;
1004
1005        // Subscribe to the latest event values for this room.
1006        let mut latest_event_stream =
1007            latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1008
1009        // The stream is pending: no new latest event for the moment.
1010        assert_pending!(latest_event_stream);
1011
1012        // Update the event cache with a sync.
1013        server
1014            .sync_room(
1015                &client,
1016                JoinedRoomBuilder::new(&room_id)
1017                    .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_2)),
1018            )
1019            .await;
1020
1021        // The event cache has received its update from the sync. It has emitted a
1022        // generic update, which has been received by `LatestEvents` tasks, up to the
1023        // `compute_latest_events` which has updated the latest event value.
1024        assert_matches!(
1025            latest_event_stream.next().await,
1026            Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1027                assert_matches!(
1028                    event.deserialize().unwrap(),
1029                    AnySyncTimelineEvent::MessageLike(
1030                        AnySyncMessageLikeEvent::RoomMessage(
1031                            SyncMessageLikeEvent::Original(message_content)
1032                        )
1033                    ) => {
1034                        assert_eq!(message_content.content.body(), "raclette !");
1035                    }
1036                );
1037            }
1038        );
1039
1040        assert_pending!(latest_event_stream);
1041    }
1042
1043    #[async_test]
1044    async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily() {
1045        let room_id = owned_room_id!("!r0");
1046        let user_id = user_id!("@mnt_io:matrix.org");
1047        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1048        let event_id_0 = event_id!("$ev0");
1049
1050        let server = MatrixMockServer::new().await;
1051        let client = server.client_builder().build().await;
1052
1053        // Prelude.
1054        {
1055            // Create the room.
1056            client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1057
1058            // Initialise the event cache store.
1059            client
1060                .event_cache_store()
1061                .lock()
1062                .await
1063                .expect("Could not acquire the event cache lock")
1064                .as_clean()
1065                .expect("Could not acquire a clean event cache lock")
1066                .handle_linked_chunk_updates(
1067                    LinkedChunkId::Room(&room_id),
1068                    vec![
1069                        Update::NewItemsChunk {
1070                            previous: None,
1071                            new: ChunkIdentifier::new(0),
1072                            next: None,
1073                        },
1074                        Update::PushItems {
1075                            at: Position::new(ChunkIdentifier::new(0), 0),
1076                            items: vec![
1077                                event_factory.text_msg("hello").event_id(event_id_0).into(),
1078                            ],
1079                        },
1080                    ],
1081                )
1082                .await
1083                .unwrap();
1084        }
1085
1086        let event_cache = client.event_cache();
1087        event_cache.subscribe().unwrap();
1088
1089        let latest_events = client.latest_events().await;
1090
1091        let mut latest_event_stream =
1092            latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1093
1094        // We have a race if the system is busy. Initially, the latest event
1095        // value is `LatestEventValue::None`, then an Event Cache generic update
1096        // is broadcasted manually, computing a new `LatestEventValue`. So let's
1097        // wait on the system to finish this, and assert the final
1098        // `LatestEventValue`.
1099        yield_now().await;
1100        assert_matches!(latest_event_stream.next_now().await, LatestEventValue::Remote(_));
1101
1102        assert_pending!(latest_event_stream);
1103    }
1104
1105    /// This tests a part of
1106    /// [`test_latest_event_value_is_initialized_by_the_event_cache_lazily`].
1107    ///
1108    /// When `RegisteredRooms::room_latest_events` restores a
1109    /// `LatestEventValue::None` (via `RoomLatestEvents::new`),
1110    /// a `LatestEventQueueUpdate::EventCache` is broadcasted to compute a
1111    /// `LatestEventValue` from the Event Cache lazily.
1112    #[async_test]
1113    async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily_inner() {
1114        let room_id_0 = owned_room_id!("!r0");
1115        let room_id_1 = owned_room_id!("!r1");
1116
1117        let server = MatrixMockServer::new().await;
1118        let client = server.client_builder().build().await;
1119
1120        // Create the rooms.
1121        let room_0 = client.base_client().get_or_create_room(&room_id_0, RoomState::Joined);
1122        let room_1 = client.base_client().get_or_create_room(&room_id_1, RoomState::Joined);
1123
1124        // Set up the rooms.
1125        // `room_0` always has a `LatestEventValue::None` as its the default value.
1126        let mut room_info_1 = room_0.clone_info();
1127        room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
1128        room_1.set_room_info(room_info_1, Default::default());
1129
1130        let weak_client = WeakClient::from_client(&client);
1131
1132        let event_cache = client.event_cache();
1133        event_cache.subscribe().unwrap();
1134
1135        let (latest_event_queue_sender, mut latest_event_queue_receiver) =
1136            mpsc::unbounded_channel();
1137
1138        let registered_rooms =
1139            RegisteredRooms::new(weak_client, event_cache, &latest_event_queue_sender);
1140
1141        // Room 0 has a `LatestEventValue::None`, a
1142        // `LatestEventQueueUpdate::EventCache` will be broadcasted.
1143        {
1144            let room_latest_events = registered_rooms.for_room(&room_id_0).await.unwrap().unwrap();
1145            assert_matches!(
1146                room_latest_events.read().await.for_room().get().await,
1147                LatestEventValue::None
1148            );
1149            assert_matches!(
1150                latest_event_queue_receiver.recv().await,
1151                Some(LatestEventQueueUpdate::EventCache { room_id }) => {
1152                    assert_eq!(room_id, room_id_0);
1153                }
1154            );
1155            assert!(latest_event_queue_receiver.is_empty());
1156        }
1157
1158        // Room 1 has a `LatestEventValue::Local*`, a
1159        // `LatestEventQueueUpdate::EventCache` will NOT be broadcasted.
1160        {
1161            let room_latest_events = registered_rooms.for_room(&room_id_1).await.unwrap().unwrap();
1162            assert_matches!(
1163                room_latest_events.read().await.for_room().get().await,
1164                LatestEventValue::LocalIsSending(_)
1165            );
1166            assert!(latest_event_queue_receiver.is_empty());
1167        }
1168    }
1169}