matrix_sdk/latest_events/
mod.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The Latest Events API provides a lazy, reactive and efficient way to compute
16//! the latest event for a room or a thread.
17//!
18//! The latest event represents the last displayable and relevant event a room
19//! or a thread has been received. It is usually displayed in a _summary_, e.g.
20//! below the room title in a room list.
21//!
22//! The entry point is [`LatestEvents`]. It is preferable to get a reference to
23//! it from [`Client::latest_events`][crate::Client::latest_events], which
24//! already plugs everything to build it. [`LatestEvents`] is using the
25//! [`EventCache`] and the [`SendQueue`] to respectively get known remote events
26//! (i.e. synced from the server), or local events (i.e. ones being sent).
27//!
28//! ## Laziness
29//!
30//! [`LatestEvents`] is lazy, it means that, despite [`LatestEvents`] is
31//! listening to all [`EventCache`] or [`SendQueue`] updates, it will only do
32//! something if one is expected to get the latest event for a particular room
33//! or a particular thread. Concretely, it means that until
34//! [`LatestEvents::listen_to_room`] is called for a particular room, no latest
35//! event will ever be computed for that room (and similarly with
36//! [`LatestEvents::listen_to_thread`]).
37//!
38//! If one is no longer interested to get the latest event for a particular room
39//! or thread, the [`LatestEvents::forget_room`] and
40//! [`LatestEvents::forget_thread`] methods must be used.
41//!
42//! ## Reactive
43//!
44//! [`LatestEvents`] is designed to be reactive. Using
45//! [`LatestEvents::listen_and_subscribe_to_room`] will provide a
46//! [`Subscriber`], which brings all the tooling to get the current value or the
47//! future values with a stream.
48
49mod error;
50mod latest_event;
51mod room_latest_events;
52
53use std::{
54    collections::HashMap,
55    ops::{ControlFlow, DerefMut, Not},
56    sync::Arc,
57};
58
59pub use error::LatestEventsError;
60use eyeball::{AsyncLock, Subscriber};
61use latest_event::{LatestEvent, With};
62pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
63use matrix_sdk_base::timer;
64use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
65use room_latest_events::RoomLatestEvents;
66use ruma::{EventId, OwnedRoomId, RoomId};
67use tokio::{
68    select,
69    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
70};
71use tracing::{error, warn};
72
73use crate::{
74    client::WeakClient,
75    event_cache::{EventCache, RoomEventCacheGenericUpdate},
76    room::WeakRoom,
77    send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
78};
79
80/// The entry point to fetch the [`LatestEventValue`] for rooms or threads.
81#[derive(Clone, Debug)]
82pub struct LatestEvents {
83    state: Arc<LatestEventsState>,
84}
85
86/// The state of [`LatestEvents`].
87#[derive(Debug)]
88struct LatestEventsState {
89    /// All the registered rooms, i.e. rooms the latest events are computed for.
90    registered_rooms: Arc<RegisteredRooms>,
91
92    /// The task handle of the
93    /// [`listen_to_event_cache_and_send_queue_updates_task`].
94    _listen_task_handle: AbortOnDrop<()>,
95
96    /// The task handle of the [`compute_latest_events_task`].
97    _computation_task_handle: AbortOnDrop<()>,
98}
99
100impl LatestEvents {
101    /// Create a new [`LatestEvents`].
102    pub(crate) fn new(
103        weak_client: WeakClient,
104        event_cache: EventCache,
105        send_queue: SendQueue,
106    ) -> Self {
107        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
108
109        let registered_rooms =
110            Arc::new(RegisteredRooms::new(weak_client, &event_cache, &latest_event_queue_sender));
111
112        // The task listening to the event cache and the send queue updates.
113        let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task(
114            registered_rooms.clone(),
115            event_cache,
116            send_queue,
117            latest_event_queue_sender,
118        ))
119        .abort_on_drop();
120
121        // The task computing the new latest events.
122        let computation_task_handle = spawn(compute_latest_events_task(
123            registered_rooms.clone(),
124            latest_event_queue_receiver,
125        ))
126        .abort_on_drop();
127
128        Self {
129            state: Arc::new(LatestEventsState {
130                registered_rooms,
131                _listen_task_handle: listen_task_handle,
132                _computation_task_handle: computation_task_handle,
133            }),
134        }
135    }
136
137    /// Start listening to updates (if not already) for a particular room.
138    ///
139    /// It returns `true` if the room exists, `false` otherwise.
140    pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
141        Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
142    }
143
144    /// Check whether the system listens to a particular room.
145    ///
146    /// Note: It's a test only method.
147    #[cfg(test)]
148    pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
149        self.state.registered_rooms.rooms.read().await.contains_key(room_id)
150    }
151
152    /// Start listening to updates (if not already) for a particular room, and
153    /// return a [`Subscriber`] to get the current and future
154    /// [`LatestEventValue`]s.
155    ///
156    /// It returns `Some` if the room exists, `None` otherwise.
157    pub async fn listen_and_subscribe_to_room(
158        &self,
159        room_id: &RoomId,
160    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
161        let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
162            return Ok(None);
163        };
164
165        let room_latest_events = room_latest_events.read().await;
166        let latest_event = room_latest_events.for_room();
167
168        Ok(Some(latest_event.subscribe().await))
169    }
170
171    /// Start listening to updates (if not already) for a particular room and a
172    /// particular thread in this room.
173    ///
174    /// It returns `true` if the room and the thread exists, `false` otherwise.
175    pub async fn listen_to_thread(
176        &self,
177        room_id: &RoomId,
178        thread_id: &EventId,
179    ) -> Result<bool, LatestEventsError> {
180        Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
181    }
182
183    /// Start listening to updates (if not already) for a particular room and a
184    /// particular thread in this room, and return a [`Subscriber`] to get the
185    /// current and future [`LatestEventValue`]s.
186    ///
187    /// It returns `Some` if the room and the thread exists, `None` otherwise.
188    pub async fn listen_and_subscribe_to_thread(
189        &self,
190        room_id: &RoomId,
191        thread_id: &EventId,
192    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
193        let Some(room_latest_events) =
194            self.state.registered_rooms.for_thread(room_id, thread_id).await?
195        else {
196            return Ok(None);
197        };
198
199        let room_latest_events = room_latest_events.read().await;
200        let latest_event = room_latest_events
201            .for_thread(thread_id)
202            .expect("The `LatestEvent` for the thread must have been created");
203
204        Ok(Some(latest_event.subscribe().await))
205    }
206
207    /// Forget a room.
208    ///
209    /// It means that [`LatestEvents`] will stop listening to updates for the
210    /// `LatestEvent`s of the room and all its threads.
211    ///
212    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
213    pub async fn forget_room(&self, room_id: &RoomId) {
214        self.state.registered_rooms.forget_room(room_id).await;
215    }
216
217    /// Forget a thread.
218    ///
219    /// It means that [`LatestEvents`] will stop listening to updates for the
220    /// `LatestEvent` of the thread.
221    ///
222    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
223    /// nothing happens.
224    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
225        self.state.registered_rooms.forget_thread(room_id, thread_id).await;
226    }
227}
228
229#[derive(Debug)]
230struct RegisteredRooms {
231    /// All the registered [`RoomLatestEvents`].
232    rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
233
234    /// The (weak) client.
235    weak_client: WeakClient,
236
237    /// The event cache.
238    event_cache: EventCache,
239
240    /// The sender part of the channel used by [`compute_latest_events_task`].
241    ///
242    /// This is used to _trigger_ a computation of a `LatestEventValue` if the
243    /// restored value is `None`.
244    latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
245}
246
247impl RegisteredRooms {
248    fn new(
249        weak_client: WeakClient,
250        event_cache: &EventCache,
251        latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
252    ) -> Self {
253        Self {
254            rooms: RwLock::new(HashMap::default()),
255            weak_client,
256            event_cache: event_cache.clone(),
257            latest_event_queue_sender: latest_event_queue_sender.clone(),
258        }
259    }
260
261    /// Get a read lock guard to a [`RoomLatestEvents`] given a room ID and an
262    /// optional thread ID.
263    ///
264    /// The [`RoomLatestEvents`], and the associated [`LatestEvent`], will be
265    /// created if missing. It means that write lock is taken if necessary, but
266    /// it's always downgraded to a read lock at the end.
267    async fn room_latest_event(
268        &self,
269        room_id: &RoomId,
270        thread_id: Option<&EventId>,
271    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
272        fn create_and_insert_room_latest_events(
273            room_id: &RoomId,
274            rooms: &mut HashMap<OwnedRoomId, RoomLatestEvents>,
275            weak_client: &WeakClient,
276            event_cache: &EventCache,
277            latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
278        ) {
279            let (room_latest_events, is_latest_event_value_none) =
280                With::unzip(RoomLatestEvents::new(
281                    WeakRoom::new(weak_client.clone(), room_id.to_owned()),
282                    event_cache,
283                ));
284
285            // Insert the new `RoomLatestEvents`.
286            rooms.insert(room_id.to_owned(), room_latest_events);
287
288            // If the `LatestEventValue` restored by `RoomLatestEvents` is of kind `None`,
289            // let's try to re-compute it without waiting on the Event Cache (so the sync
290            // usually) or the Send Queue. Maybe the system has migrated to a new version
291            // and the `LatestEventValue` has been erased, while it is still possible to
292            // compute a correct value.
293            if is_latest_event_value_none {
294                let _ = latest_event_queue_sender
295                    .send(LatestEventQueueUpdate::EventCache { room_id: room_id.to_owned() });
296            }
297        }
298
299        Ok(match thread_id {
300            // Get the room latest event with the aim of fetching the latest event for a particular
301            // thread.
302            //
303            // We need to take a write lock immediately, in case the thead latest event doesn't
304            // exist.
305            Some(thread_id) => {
306                let mut rooms = self.rooms.write().await;
307
308                // The `RoomLatestEvents` doesn't exist. Let's create and insert it.
309                if rooms.contains_key(room_id).not() {
310                    create_and_insert_room_latest_events(
311                        room_id,
312                        rooms.deref_mut(),
313                        &self.weak_client,
314                        &self.event_cache,
315                        &self.latest_event_queue_sender,
316                    );
317                }
318
319                if let Some(room_latest_event) = rooms.get(room_id) {
320                    let mut room_latest_event = room_latest_event.write().await;
321
322                    // In `RoomLatestEvents`, the `LatestEvent` for this thread doesn't exist. Let's
323                    // create and insert it.
324                    if room_latest_event.has_thread(thread_id).not() {
325                        room_latest_event.create_and_insert_latest_event_for_thread(thread_id);
326                    }
327                }
328
329                RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
330            }
331
332            // Get the room latest event with the aim of fetching the latest event for a particular
333            // room.
334            None => {
335                match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
336                    .ok()
337                {
338                    value @ Some(_) => value,
339                    None => {
340                        let _timer = timer!(
341                            tracing::Level::INFO,
342                            format!("Creating `RoomLatestEvents` for {room_id:?}"),
343                        );
344
345                        let mut rooms = self.rooms.write().await;
346
347                        if rooms.contains_key(room_id).not() {
348                            create_and_insert_room_latest_events(
349                                room_id,
350                                rooms.deref_mut(),
351                                &self.weak_client,
352                                &self.event_cache,
353                                &self.latest_event_queue_sender,
354                            );
355                        }
356
357                        RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
358                    }
359                }
360            }
361        })
362    }
363
364    /// Start listening to updates (if not already) for a particular room.
365    ///
366    /// It returns `None` if the room doesn't exist.
367    pub async fn for_room(
368        &self,
369        room_id: &RoomId,
370    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
371        self.room_latest_event(room_id, None).await
372    }
373
374    /// Start listening to updates (if not already) for a particular room.
375    ///
376    /// It returns `None` if the room or the thread doesn't exist.
377    pub async fn for_thread(
378        &self,
379        room_id: &RoomId,
380        thread_id: &EventId,
381    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
382        self.room_latest_event(room_id, Some(thread_id)).await
383    }
384
385    /// Forget a room.
386    ///
387    /// It means that [`LatestEvents`] will stop listening to updates for the
388    /// `LatestEvent`s of the room and all its threads.
389    ///
390    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
391    pub async fn forget_room(&self, room_id: &RoomId) {
392        {
393            let mut rooms = self.rooms.write().await;
394
395            // Remove the whole `RoomLatestEvents`.
396            rooms.remove(room_id);
397        }
398    }
399
400    /// Forget a thread.
401    ///
402    /// It means that [`LatestEvents`] will stop listening to updates for the
403    /// `LatestEvent` of the thread.
404    ///
405    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
406    /// nothing happens.
407    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
408        let rooms = self.rooms.read().await;
409
410        // If the `RoomLatestEvents`, remove the `LatestEvent` in `per_thread`.
411        if let Some(room_latest_event) = rooms.get(room_id) {
412            let mut room_latest_event = room_latest_event.write().await;
413
414            // Release the lock on `self.rooms`.
415            drop(rooms);
416
417            room_latest_event.forget_thread(thread_id);
418        }
419    }
420}
421
422/// Represents the kind of updates the [`compute_latest_events_task`] will have
423/// to deal with.
424#[derive(Debug)]
425enum LatestEventQueueUpdate {
426    /// An update from the [`EventCache`] happened.
427    EventCache {
428        /// The ID of the room that has triggered the update.
429        room_id: OwnedRoomId,
430    },
431
432    /// An update from the [`SendQueue`] happened.
433    SendQueue {
434        /// The ID of the room that has triggered the update.
435        room_id: OwnedRoomId,
436
437        /// The update itself.
438        update: RoomSendQueueUpdate,
439    },
440}
441
442/// The task responsible to listen to the [`EventCache`] and the [`SendQueue`].
443/// When an update is received and is considered relevant, a message is sent to
444/// the [`compute_latest_events_task`] to compute a new [`LatestEvent`].
445///
446/// When an update is considered relevant, a message is sent over the
447/// `latest_event_queue_sender` channel. See [`compute_latest_events_task`].
448async fn listen_to_event_cache_and_send_queue_updates_task(
449    registered_rooms: Arc<RegisteredRooms>,
450    event_cache: EventCache,
451    send_queue: SendQueue,
452    latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
453) {
454    let mut event_cache_generic_updates_subscriber =
455        event_cache.subscribe_to_room_generic_updates();
456    let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
457
458    loop {
459        if listen_to_event_cache_and_send_queue_updates(
460            &registered_rooms.rooms,
461            &mut event_cache_generic_updates_subscriber,
462            &mut send_queue_generic_updates_subscriber,
463            &latest_event_queue_sender,
464        )
465        .await
466        .is_break()
467        {
468            warn!("`listen_to_event_cache_and_send_queue_updates_task` has stopped");
469
470            break;
471        }
472    }
473}
474
475/// The core of [`listen_to_event_cache_and_send_queue_updates_task`].
476///
477/// Having this function detached from its task is helpful for testing and for
478/// state isolation.
479async fn listen_to_event_cache_and_send_queue_updates(
480    registered_rooms: &RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
481    event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
482    send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
483    latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
484) -> ControlFlow<()> {
485    select! {
486        room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv() => {
487            if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
488                if registered_rooms.read().await.contains_key(&room_id) {
489                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
490                        room_id
491                    });
492                }
493            } else {
494                warn!("`event_cache_generic_updates` channel has been closed");
495
496                return ControlFlow::Break(());
497            }
498        }
499
500        send_queue_generic_update = send_queue_generic_updates_subscriber.recv() => {
501            if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
502                if registered_rooms.read().await.contains_key(&room_id) {
503                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
504                        room_id,
505                        update
506                    });
507                }
508            } else {
509                warn!("`send_queue_generic_updates` channel has been closed");
510
511                return ControlFlow::Break(());
512            }
513        }
514    }
515
516    ControlFlow::Continue(())
517}
518
519/// The task responsible to compute new [`LatestEvent`] for a particular room or
520/// thread.
521///
522/// The messages are coming from
523/// [`listen_to_event_cache_and_send_queue_updates_task`].
524async fn compute_latest_events_task(
525    registered_rooms: Arc<RegisteredRooms>,
526    mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
527) {
528    const BUFFER_SIZE: usize = 16;
529
530    let mut buffer = Vec::with_capacity(BUFFER_SIZE);
531
532    while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
533        compute_latest_events(&registered_rooms, &buffer).await;
534        buffer.clear();
535    }
536
537    warn!("`compute_latest_events_task` has stopped");
538}
539
540async fn compute_latest_events(
541    registered_rooms: &RegisteredRooms,
542    latest_event_queue_updates: &[LatestEventQueueUpdate],
543) {
544    for latest_event_queue_update in latest_event_queue_updates {
545        match latest_event_queue_update {
546            LatestEventQueueUpdate::EventCache { room_id } => {
547                let rooms = registered_rooms.rooms.read().await;
548
549                if let Some(room_latest_events) = rooms.get(room_id) {
550                    let mut room_latest_events = room_latest_events.write().await;
551
552                    // Release the lock on `registered_rooms`.
553                    // It is possible because `room_latest_events` is an owned lock guard.
554                    drop(rooms);
555
556                    room_latest_events.update_with_event_cache().await;
557                } else {
558                    error!(?room_id, "Failed to find the room");
559
560                    continue;
561                }
562            }
563
564            LatestEventQueueUpdate::SendQueue { room_id, update } => {
565                let rooms = registered_rooms.rooms.read().await;
566
567                if let Some(room_latest_events) = rooms.get(room_id) {
568                    let mut room_latest_events = room_latest_events.write().await;
569
570                    // Release the lock on `registered_rooms`.
571                    // It is possible because `room_latest_events` is an owned lock guard.
572                    drop(rooms);
573
574                    room_latest_events.update_with_send_queue(update).await;
575                } else {
576                    error!(?room_id, "Failed to find the room");
577
578                    continue;
579                }
580            }
581        }
582    }
583}
584
585#[cfg(all(test, not(target_family = "wasm")))]
586mod tests {
587    use std::{collections::HashMap, ops::Not};
588
589    use assert_matches::assert_matches;
590    use matrix_sdk_base::{
591        RoomState,
592        deserialized_responses::TimelineEventKind,
593        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
594        store::SerializableEventContent,
595    };
596    use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory};
597    use ruma::{
598        MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
599        events::{
600            AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
601            SyncMessageLikeEvent, room::message::RoomMessageEventContent,
602        },
603        owned_room_id, room_id, user_id,
604    };
605    use stream_assert::assert_pending;
606    use tokio::task::yield_now;
607
608    use super::{
609        LatestEventValue, LocalLatestEventValue, RegisteredRooms, RemoteLatestEventValue,
610        RoomEventCacheGenericUpdate, RoomLatestEvents, RoomSendQueueUpdate, RwLock,
611        SendQueueUpdate, WeakClient, WeakRoom, With, broadcast,
612        listen_to_event_cache_and_send_queue_updates, mpsc,
613    };
614    use crate::{latest_events::LatestEventQueueUpdate, test_utils::mocks::MatrixMockServer};
615
616    fn local_room_message(body: &str) -> LocalLatestEventValue {
617        LocalLatestEventValue {
618            timestamp: MilliSecondsSinceUnixEpoch::now(),
619            content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
620                RoomMessageEventContent::text_plain(body),
621            ))
622            .unwrap(),
623        }
624    }
625
626    #[async_test]
627    async fn test_latest_events_are_lazy() {
628        let room_id_0 = room_id!("!r0");
629        let room_id_1 = room_id!("!r1");
630        let room_id_2 = room_id!("!r2");
631        let thread_id_1_0 = event_id!("$ev1.0");
632        let thread_id_2_0 = event_id!("$ev2.0");
633
634        let server = MatrixMockServer::new().await;
635        let client = server.client_builder().build().await;
636
637        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
638        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
639        client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
640
641        client.event_cache().subscribe().unwrap();
642
643        let latest_events = client.latest_events().await;
644
645        // Despites there are many rooms, zero `RoomLatestEvents` are created.
646        assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
647
648        // Now let's listen to two rooms.
649        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
650        assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
651
652        {
653            let rooms = latest_events.state.registered_rooms.rooms.read().await;
654            // There are two rooms…
655            assert_eq!(rooms.len(), 2);
656            // … which are room 0 and room 1.
657            assert!(rooms.contains_key(room_id_0));
658            assert!(rooms.contains_key(room_id_1));
659
660            // Room 0 contains zero thread latest events.
661            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
662            // Room 1 contains zero thread latest events.
663            assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
664        }
665
666        // Now let's listen to one thread respectively for two rooms.
667        assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
668        assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
669
670        {
671            let rooms = latest_events.state.registered_rooms.rooms.read().await;
672            // There are now three rooms…
673            assert_eq!(rooms.len(), 3);
674            // … yup, room 2 is now created.
675            assert!(rooms.contains_key(room_id_0));
676            assert!(rooms.contains_key(room_id_1));
677            assert!(rooms.contains_key(room_id_2));
678
679            // Room 0 contains zero thread latest events.
680            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
681            // Room 1 contains one thread latest event…
682            let room_1 = rooms.get(room_id_1).unwrap().read().await;
683            assert_eq!(room_1.per_thread().len(), 1);
684            // … which is thread 1.0.
685            assert!(room_1.per_thread().contains_key(thread_id_1_0));
686            // Room 2 contains one thread latest event…
687            let room_2 = rooms.get(room_id_2).unwrap().read().await;
688            assert_eq!(room_2.per_thread().len(), 1);
689            // … which is thread 2.0.
690            assert!(room_2.per_thread().contains_key(thread_id_2_0));
691        }
692    }
693
694    #[async_test]
695    async fn test_forget_room() {
696        let room_id_0 = room_id!("!r0");
697        let room_id_1 = room_id!("!r1");
698
699        let server = MatrixMockServer::new().await;
700        let client = server.client_builder().build().await;
701
702        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
703        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
704
705        client.event_cache().subscribe().unwrap();
706
707        let latest_events = client.latest_events().await;
708
709        // Now let's fetch one room.
710        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
711
712        {
713            let rooms = latest_events.state.registered_rooms.rooms.read().await;
714            // There are one room…
715            assert_eq!(rooms.len(), 1);
716            // … which is room 0.
717            assert!(rooms.contains_key(room_id_0));
718
719            // Room 0 contains zero thread latest events.
720            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
721        }
722
723        // Now let's forget about room 0.
724        latest_events.forget_room(room_id_0).await;
725
726        {
727            let rooms = latest_events.state.registered_rooms.rooms.read().await;
728            // There are now zero rooms.
729            assert!(rooms.is_empty());
730        }
731    }
732
733    #[async_test]
734    async fn test_forget_thread() {
735        let room_id_0 = room_id!("!r0");
736        let room_id_1 = room_id!("!r1");
737        let thread_id_0_0 = event_id!("$ev0.0");
738
739        let server = MatrixMockServer::new().await;
740        let client = server.client_builder().build().await;
741
742        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
743        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
744
745        client.event_cache().subscribe().unwrap();
746
747        let latest_events = client.latest_events().await;
748
749        // Now let's fetch one thread .
750        assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
751
752        {
753            let rooms = latest_events.state.registered_rooms.rooms.read().await;
754            // There is one room…
755            assert_eq!(rooms.len(), 1);
756            // … which is room 0.
757            assert!(rooms.contains_key(room_id_0));
758
759            // Room 0 contains one thread latest event…
760            let room_0 = rooms.get(room_id_0).unwrap().read().await;
761            assert_eq!(room_0.per_thread().len(), 1);
762            // … which is thread 0.0.
763            assert!(room_0.per_thread().contains_key(thread_id_0_0));
764        }
765
766        // Now let's forget about the thread.
767        latest_events.forget_thread(room_id_0, thread_id_0_0).await;
768
769        {
770            let rooms = latest_events.state.registered_rooms.rooms.read().await;
771            // There is still one room…
772            assert_eq!(rooms.len(), 1);
773            // … which is room 0.
774            assert!(rooms.contains_key(room_id_0));
775
776            // But the thread has been removed.
777            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
778        }
779    }
780
781    #[async_test]
782    async fn test_inputs_task_can_listen_to_room_event_cache() {
783        let room_id = owned_room_id!("!r0");
784
785        let server = MatrixMockServer::new().await;
786        let client = server.client_builder().build().await;
787        let weak_client = WeakClient::from_client(&client);
788        let weak_room = WeakRoom::new(weak_client, room_id.clone());
789
790        let event_cache = client.event_cache();
791
792        let registered_rooms = RwLock::new(HashMap::new());
793        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
794            broadcast::channel(1);
795        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
796            broadcast::channel(1);
797        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
798
799        // New event cache update, but the `LatestEvents` isn't listening to it.
800        {
801            room_event_cache_generic_update_sender
802                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
803                .unwrap();
804
805            // Run the task.
806            assert!(
807                listen_to_event_cache_and_send_queue_updates(
808                    &registered_rooms,
809                    &mut room_event_cache_generic_update_receiver,
810                    &mut send_queue_generic_update_receiver,
811                    &latest_event_queue_sender,
812                )
813                .await
814                .is_continue()
815            );
816
817            // No latest event computation has been triggered.
818            assert!(latest_event_queue_receiver.is_empty());
819        }
820
821        // New event cache update, but this time, the `LatestEvents` is listening to it.
822        {
823            registered_rooms.write().await.insert(
824                room_id.clone(),
825                With::inner(RoomLatestEvents::new(weak_room, event_cache)),
826            );
827            room_event_cache_generic_update_sender
828                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
829                .unwrap();
830
831            assert!(
832                listen_to_event_cache_and_send_queue_updates(
833                    &registered_rooms,
834                    &mut room_event_cache_generic_update_receiver,
835                    &mut send_queue_generic_update_receiver,
836                    &latest_event_queue_sender,
837                )
838                .await
839                .is_continue()
840            );
841
842            // A latest event computation has been triggered!
843            assert!(latest_event_queue_receiver.is_empty().not());
844        }
845    }
846
847    #[async_test]
848    async fn test_inputs_task_can_listen_to_send_queue() {
849        let room_id = owned_room_id!("!r0");
850
851        let server = MatrixMockServer::new().await;
852        let client = server.client_builder().build().await;
853        let weak_client = WeakClient::from_client(&client);
854        let weak_room = WeakRoom::new(weak_client, room_id.clone());
855
856        let event_cache = client.event_cache();
857
858        let registered_rooms = RwLock::new(HashMap::new());
859
860        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
861            broadcast::channel(1);
862        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
863            broadcast::channel(1);
864        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
865
866        // New send queue update, but the `LatestEvents` isn't listening to it.
867        {
868            send_queue_generic_update_sender
869                .send(SendQueueUpdate {
870                    room_id: room_id.clone(),
871                    update: RoomSendQueueUpdate::SentEvent {
872                        transaction_id: OwnedTransactionId::from("txnid0"),
873                        event_id: event_id!("$ev0").to_owned(),
874                    },
875                })
876                .unwrap();
877
878            // Run the task.
879            assert!(
880                listen_to_event_cache_and_send_queue_updates(
881                    &registered_rooms,
882                    &mut room_event_cache_generic_update_receiver,
883                    &mut send_queue_generic_update_receiver,
884                    &latest_event_queue_sender,
885                )
886                .await
887                .is_continue()
888            );
889
890            // No latest event computation has been triggered.
891            assert!(latest_event_queue_receiver.is_empty());
892        }
893
894        // New send queue update, but this time, the `LatestEvents` is listening to it.
895        {
896            registered_rooms.write().await.insert(
897                room_id.clone(),
898                With::inner(RoomLatestEvents::new(weak_room, event_cache)),
899            );
900            send_queue_generic_update_sender
901                .send(SendQueueUpdate {
902                    room_id: room_id.clone(),
903                    update: RoomSendQueueUpdate::SentEvent {
904                        transaction_id: OwnedTransactionId::from("txnid1"),
905                        event_id: event_id!("$ev1").to_owned(),
906                    },
907                })
908                .unwrap();
909
910            assert!(
911                listen_to_event_cache_and_send_queue_updates(
912                    &registered_rooms,
913                    &mut room_event_cache_generic_update_receiver,
914                    &mut send_queue_generic_update_receiver,
915                    &latest_event_queue_sender,
916                )
917                .await
918                .is_continue()
919            );
920
921            // A latest event computation has been triggered!
922            assert!(latest_event_queue_receiver.is_empty().not());
923        }
924    }
925
926    #[async_test]
927    async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
928        let registered_rooms = RwLock::new(HashMap::new());
929        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
930            broadcast::channel(1);
931        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
932            broadcast::channel(1);
933        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
934
935        // Drop the sender to close the channel.
936        drop(room_event_cache_generic_update_sender);
937
938        // Run the task.
939        assert!(
940            listen_to_event_cache_and_send_queue_updates(
941                &registered_rooms,
942                &mut room_event_cache_generic_update_receiver,
943                &mut send_queue_generic_update_receiver,
944                &latest_event_queue_sender,
945            )
946            .await
947            // It breaks!
948            .is_break()
949        );
950
951        assert!(latest_event_queue_receiver.is_empty());
952    }
953
954    #[async_test]
955    async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
956        let registered_rooms = RwLock::new(HashMap::new());
957        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
958            broadcast::channel(1);
959        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
960            broadcast::channel(1);
961        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
962
963        // Drop the sender to close the channel.
964        drop(send_queue_generic_update_sender);
965
966        // Run the task.
967        assert!(
968            listen_to_event_cache_and_send_queue_updates(
969                &registered_rooms,
970                &mut room_event_cache_generic_update_receiver,
971                &mut send_queue_generic_update_receiver,
972                &latest_event_queue_sender,
973            )
974            .await
975            // It breaks!
976            .is_break()
977        );
978
979        assert!(latest_event_queue_receiver.is_empty());
980    }
981
982    #[async_test]
983    async fn test_latest_event_value_is_updated_via_event_cache() {
984        let room_id = owned_room_id!("!r0");
985        let user_id = user_id!("@mnt_io:matrix.org");
986        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
987        let event_id_2 = event_id!("$ev2");
988
989        let server = MatrixMockServer::new().await;
990        let client = server.client_builder().build().await;
991
992        // Create the room.
993        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
994
995        let event_cache = client.event_cache();
996        event_cache.subscribe().unwrap();
997
998        let latest_events = client.latest_events().await;
999
1000        // Subscribe to the latest event values for this room.
1001        let mut latest_event_stream =
1002            latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1003
1004        // The stream is pending: no new latest event for the moment.
1005        assert_pending!(latest_event_stream);
1006
1007        // Update the event cache with a sync.
1008        server
1009            .sync_room(
1010                &client,
1011                JoinedRoomBuilder::new(&room_id)
1012                    .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_2)),
1013            )
1014            .await;
1015
1016        // The event cache has received its update from the sync. It has emitted a
1017        // generic update, which has been received by `LatestEvents` tasks, up to the
1018        // `compute_latest_events` which has updated the latest event value.
1019        assert_matches!(
1020            latest_event_stream.next().await,
1021            Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1022                assert_matches!(
1023                    event.deserialize().unwrap(),
1024                    AnySyncTimelineEvent::MessageLike(
1025                        AnySyncMessageLikeEvent::RoomMessage(
1026                            SyncMessageLikeEvent::Original(message_content)
1027                        )
1028                    ) => {
1029                        assert_eq!(message_content.content.body(), "raclette !");
1030                    }
1031                );
1032            }
1033        );
1034
1035        assert_pending!(latest_event_stream);
1036    }
1037
1038    #[async_test]
1039    async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily() {
1040        let room_id = owned_room_id!("!r0");
1041        let user_id = user_id!("@mnt_io:matrix.org");
1042        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1043        let event_id_0 = event_id!("$ev0");
1044
1045        let server = MatrixMockServer::new().await;
1046        let client = server.client_builder().build().await;
1047
1048        // Prelude.
1049        {
1050            // Create the room.
1051            client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1052
1053            // Initialise the event cache store.
1054            client
1055                .event_cache_store()
1056                .lock()
1057                .await
1058                .expect("Could not acquire the event cache lock")
1059                .as_clean()
1060                .expect("Could not acquire a clean event cache lock")
1061                .handle_linked_chunk_updates(
1062                    LinkedChunkId::Room(&room_id),
1063                    vec![
1064                        Update::NewItemsChunk {
1065                            previous: None,
1066                            new: ChunkIdentifier::new(0),
1067                            next: None,
1068                        },
1069                        Update::PushItems {
1070                            at: Position::new(ChunkIdentifier::new(0), 0),
1071                            items: vec![
1072                                event_factory.text_msg("hello").event_id(event_id_0).into(),
1073                            ],
1074                        },
1075                    ],
1076                )
1077                .await
1078                .unwrap();
1079        }
1080
1081        let event_cache = client.event_cache();
1082        event_cache.subscribe().unwrap();
1083
1084        let latest_events = client.latest_events().await;
1085
1086        let mut latest_event_stream =
1087            latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1088
1089        // We have a race if the system is busy. Initially, the latest event
1090        // value is `LatestEventValue::None`, then an Event Cache generic update
1091        // is broadcasted manually, computing a new `LatestEventValue`. So let's
1092        // wait on the system to finish this, and assert the final
1093        // `LatestEventValue`.
1094        yield_now().await;
1095        assert_matches!(latest_event_stream.next_now().await, LatestEventValue::Remote(_));
1096
1097        assert_pending!(latest_event_stream);
1098    }
1099
1100    /// This tests a part of
1101    /// [`test_latest_event_value_is_initialized_by_the_event_cache_lazily`].
1102    ///
1103    /// When `RegisteredRooms::room_latest_events` restores a
1104    /// `LatestEventValue::None` (via `RoomLatestEvents::new`),
1105    /// a `LatestEventQueueUpdate::EventCache` is broadcasted to compute a
1106    /// `LatestEventValue` from the Event Cache lazily.
1107    #[async_test]
1108    async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily_inner() {
1109        let room_id_0 = owned_room_id!("!r0");
1110        let room_id_1 = owned_room_id!("!r1");
1111
1112        let server = MatrixMockServer::new().await;
1113        let client = server.client_builder().build().await;
1114
1115        // Create the rooms.
1116        let room_0 = client.base_client().get_or_create_room(&room_id_0, RoomState::Joined);
1117        let room_1 = client.base_client().get_or_create_room(&room_id_1, RoomState::Joined);
1118
1119        // Set up the rooms.
1120        // `room_0` always has a `LatestEventValue::None` as its the default value.
1121        let mut room_info_1 = room_0.clone_info();
1122        room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
1123        room_1.set_room_info(room_info_1, Default::default());
1124
1125        let weak_client = WeakClient::from_client(&client);
1126
1127        let event_cache = client.event_cache();
1128        event_cache.subscribe().unwrap();
1129
1130        let (latest_event_queue_sender, mut latest_event_queue_receiver) =
1131            mpsc::unbounded_channel();
1132
1133        let registered_rooms =
1134            RegisteredRooms::new(weak_client, event_cache, &latest_event_queue_sender);
1135
1136        // Room 0 has a `LatestEventValue::None`, a
1137        // `LatestEventQueueUpdate::EventCache` will be broadcasted.
1138        {
1139            let room_latest_events = registered_rooms.for_room(&room_id_0).await.unwrap().unwrap();
1140            assert_matches!(
1141                room_latest_events.read().await.for_room().get().await,
1142                LatestEventValue::None
1143            );
1144            assert_matches!(
1145                latest_event_queue_receiver.recv().await,
1146                Some(LatestEventQueueUpdate::EventCache { room_id }) => {
1147                    assert_eq!(room_id, room_id_0);
1148                }
1149            );
1150            assert!(latest_event_queue_receiver.is_empty());
1151        }
1152
1153        // Room 1 has a `LatestEventValue::Local*`, a
1154        // `LatestEventQueueUpdate::EventCache` will NOT be broadcasted.
1155        {
1156            let room_latest_events = registered_rooms.for_room(&room_id_1).await.unwrap().unwrap();
1157            assert_matches!(
1158                room_latest_events.read().await.for_room().get().await,
1159                LatestEventValue::LocalIsSending(_)
1160            );
1161            assert!(latest_event_queue_receiver.is_empty());
1162        }
1163    }
1164}