Skip to main content

matrix_sdk/latest_events/
mod.rs

1// Copyright 2025 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The Latest Events API provides a lazy, reactive and efficient way to compute
16//! the latest event for a room or a thread.
17//!
18//! The latest event represents the last displayable and relevant event a room
19//! or a thread has received. It is usually displayed in a _summary_, e.g.
20//! below the room title in a room list.
21//!
22//! The entry point is [`LatestEvents`]. It is preferable to get a reference to
23//! it from [`Client::latest_events`][crate::Client::latest_events], which
24//! already plugs everything to build it. [`LatestEvents`] is using the
25//! [`EventCache`] and the [`SendQueue`] to respectively get known remote events
26//! (i.e. synced from the server), or local events (i.e. ones being sent).
27//!
28//! ## Laziness
29//!
30//! [`LatestEvents`] is lazy, it means that, despite [`LatestEvents`] is
31//! listening to all [`EventCache`] or [`SendQueue`] updates, it will only do
32//! something if one is expected to get the latest event for a particular room
33//! or a particular thread. Concretely, it means that until
34//! [`LatestEvents::listen_to_room`] is called for a particular room, no latest
35//! event will ever be computed for that room (and similarly with
36//! [`LatestEvents::listen_to_thread`]).
37//!
38//! If one is no longer interested to get the latest event for a particular room
39//! or thread, the [`LatestEvents::forget_room`] and
40//! [`LatestEvents::forget_thread`] methods must be used.
41//!
42//! ## Reactive
43//!
44//! [`LatestEvents`] is designed to be reactive. Using
45//! [`LatestEvents::listen_and_subscribe_to_room`] will provide a
46//! [`Subscriber`], which brings all the tooling to get the current value or the
47//! future values with a stream.
48
49mod error;
50mod latest_event;
51mod room_latest_events;
52
53use std::{
54    collections::HashMap,
55    ops::{ControlFlow, DerefMut, Not},
56    sync::Arc,
57};
58
59pub use error::LatestEventsError;
60use eyeball::{AsyncLock, Subscriber};
61use latest_event::{LatestEvent, With};
62pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue};
63use matrix_sdk_base::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, timer};
64use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn};
65use room_latest_events::{RoomLatestEvents, RoomLatestEventsWriteGuard};
66use ruma::{EventId, OwnedRoomId, RoomId};
67use tokio::{
68    select,
69    sync::{RwLock, RwLockReadGuard, RwLockWriteGuard, broadcast, mpsc},
70};
71use tracing::{info, warn};
72
73use crate::{
74    client::WeakClient,
75    event_cache::{EventCache, RoomEventCacheGenericUpdate},
76    room::WeakRoom,
77    send_queue::{RoomSendQueueUpdate, SendQueue, SendQueueUpdate},
78};
79
80/// The entry point to fetch the [`LatestEventValue`] for rooms or threads.
81#[derive(Clone, Debug)]
82pub struct LatestEvents {
83    state: Arc<LatestEventsState>,
84}
85
86/// The state of [`LatestEvents`].
87#[derive(Debug)]
88struct LatestEventsState {
89    /// All the registered rooms, i.e. rooms the latest events are computed for.
90    registered_rooms: Arc<RegisteredRooms>,
91
92    /// The task handle of the [`listen_to_updates_task`].
93    _listen_task_handle: AbortOnDrop<()>,
94
95    /// The task handle of the [`compute_latest_events_task`].
96    _computation_task_handle: AbortOnDrop<()>,
97}
98
99impl LatestEvents {
100    /// Create a new [`LatestEvents`].
101    pub(crate) fn new(
102        weak_client: WeakClient,
103        event_cache: EventCache,
104        send_queue: SendQueue,
105        room_info_updates: broadcast::Receiver<RoomInfoNotableUpdate>,
106    ) -> Self {
107        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
108
109        let registered_rooms =
110            Arc::new(RegisteredRooms::new(weak_client, &event_cache, &latest_event_queue_sender));
111
112        // The task listening to the event cache, the send queue, and the room infos
113        // updates.
114        let listen_task_handle = spawn(listen_to_updates_task(
115            registered_rooms.clone(),
116            event_cache,
117            send_queue,
118            room_info_updates,
119            latest_event_queue_sender,
120        ))
121        .abort_on_drop();
122
123        // The task computing the new latest events.
124        let computation_task_handle = spawn(compute_latest_events_task(
125            registered_rooms.clone(),
126            latest_event_queue_receiver,
127        ))
128        .abort_on_drop();
129
130        Self {
131            state: Arc::new(LatestEventsState {
132                registered_rooms,
133                _listen_task_handle: listen_task_handle,
134                _computation_task_handle: computation_task_handle,
135            }),
136        }
137    }
138
139    /// Start listening to updates (if not already) for a particular room.
140    ///
141    /// It returns `true` if the room exists, `false` otherwise.
142    pub async fn listen_to_room(&self, room_id: &RoomId) -> Result<bool, LatestEventsError> {
143        Ok(self.state.registered_rooms.for_room(room_id).await?.is_some())
144    }
145
146    /// Check whether the system listens to a particular room.
147    ///
148    /// Note: It's a test only method.
149    #[cfg(test)]
150    pub async fn is_listening_to_room(&self, room_id: &RoomId) -> bool {
151        self.state.registered_rooms.rooms.read().await.contains_key(room_id)
152    }
153
154    /// Start listening to updates (if not already) for a particular room, and
155    /// return a [`Subscriber`] to get the current and future
156    /// [`LatestEventValue`]s.
157    ///
158    /// It returns `Some` if the room exists, `None` otherwise.
159    pub async fn listen_and_subscribe_to_room(
160        &self,
161        room_id: &RoomId,
162    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
163        let Some(room_latest_events) = self.state.registered_rooms.for_room(room_id).await? else {
164            return Ok(None);
165        };
166
167        let room_latest_events = room_latest_events.read().await;
168        let latest_event = room_latest_events.for_room();
169
170        Ok(Some(latest_event.subscribe().await))
171    }
172
173    /// Start listening to updates (if not already) for a particular room and a
174    /// particular thread in this room.
175    ///
176    /// It returns `true` if the room and the thread exists, `false` otherwise.
177    pub async fn listen_to_thread(
178        &self,
179        room_id: &RoomId,
180        thread_id: &EventId,
181    ) -> Result<bool, LatestEventsError> {
182        Ok(self.state.registered_rooms.for_thread(room_id, thread_id).await?.is_some())
183    }
184
185    /// Start listening to updates (if not already) for a particular room and a
186    /// particular thread in this room, and return a [`Subscriber`] to get the
187    /// current and future [`LatestEventValue`]s.
188    ///
189    /// It returns `Some` if the room and the thread exists, `None` otherwise.
190    pub async fn listen_and_subscribe_to_thread(
191        &self,
192        room_id: &RoomId,
193        thread_id: &EventId,
194    ) -> Result<Option<Subscriber<LatestEventValue, AsyncLock>>, LatestEventsError> {
195        let Some(room_latest_events) =
196            self.state.registered_rooms.for_thread(room_id, thread_id).await?
197        else {
198            return Ok(None);
199        };
200
201        let room_latest_events = room_latest_events.read().await;
202        let latest_event = room_latest_events
203            .for_thread(thread_id)
204            .expect("The `LatestEvent` for the thread must have been created");
205
206        Ok(Some(latest_event.subscribe().await))
207    }
208
209    /// Forget a room.
210    ///
211    /// It means that [`LatestEvents`] will stop listening to updates for the
212    /// `LatestEvent`s of the room and all its threads.
213    ///
214    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
215    pub async fn forget_room(&self, room_id: &RoomId) {
216        self.state.registered_rooms.forget_room(room_id).await;
217    }
218
219    /// Forget a thread.
220    ///
221    /// It means that [`LatestEvents`] will stop listening to updates for the
222    /// `LatestEvent` of the thread.
223    ///
224    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
225    /// nothing happens.
226    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
227        self.state.registered_rooms.forget_thread(room_id, thread_id).await;
228    }
229}
230
231#[derive(Debug)]
232struct RegisteredRooms {
233    /// All the registered [`RoomLatestEvents`].
234    rooms: RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
235
236    /// The (weak) client.
237    weak_client: WeakClient,
238
239    /// The event cache.
240    event_cache: EventCache,
241
242    /// The sender part of the channel used by [`compute_latest_events_task`].
243    ///
244    /// This is used to _trigger_ a computation of a `LatestEventValue` if the
245    /// restored value is `None`.
246    latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
247}
248
249impl RegisteredRooms {
250    fn new(
251        weak_client: WeakClient,
252        event_cache: &EventCache,
253        latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
254    ) -> Self {
255        Self {
256            rooms: RwLock::new(HashMap::default()),
257            weak_client,
258            event_cache: event_cache.clone(),
259            latest_event_queue_sender: latest_event_queue_sender.clone(),
260        }
261    }
262
263    /// Get a read lock guard to a [`RoomLatestEvents`] given a room ID and an
264    /// optional thread ID.
265    ///
266    /// The [`RoomLatestEvents`], and the associated [`LatestEvent`], will be
267    /// created if missing. It means that write lock is taken if necessary, but
268    /// it's always downgraded to a read lock at the end.
269    async fn room_latest_event(
270        &self,
271        room_id: &RoomId,
272        thread_id: Option<&EventId>,
273    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
274        fn create_and_insert_room_latest_events(
275            room_id: &RoomId,
276            rooms: &mut HashMap<OwnedRoomId, RoomLatestEvents>,
277            weak_client: &WeakClient,
278            event_cache: &EventCache,
279            latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
280        ) {
281            let (room_latest_events, is_latest_event_value_none) =
282                With::unzip(RoomLatestEvents::new(
283                    WeakRoom::new(weak_client.clone(), room_id.to_owned()),
284                    event_cache,
285                ));
286
287            // Insert the new `RoomLatestEvents`.
288            rooms.insert(room_id.to_owned(), room_latest_events);
289
290            // If the `LatestEventValue` restored by `RoomLatestEvents` is of kind `None`,
291            // let's try to re-compute it without waiting on the Event Cache (so the sync
292            // usually) or the Send Queue. Maybe the system has migrated to a new version
293            // and the `LatestEventValue` has been erased, while it is still possible to
294            // compute a correct value.
295            if is_latest_event_value_none {
296                let _ = latest_event_queue_sender
297                    .send(LatestEventQueueUpdate::EventCache { room_id: room_id.to_owned() });
298            }
299        }
300
301        Ok(match thread_id {
302            // Get the room latest event with the aim of fetching the latest event for a particular
303            // thread.
304            //
305            // We need to take a write lock immediately, in case the thead latest event doesn't
306            // exist.
307            Some(thread_id) => {
308                let mut rooms = self.rooms.write().await;
309
310                // The `RoomLatestEvents` doesn't exist. Let's create and insert it.
311                if rooms.contains_key(room_id).not() {
312                    create_and_insert_room_latest_events(
313                        room_id,
314                        rooms.deref_mut(),
315                        &self.weak_client,
316                        &self.event_cache,
317                        &self.latest_event_queue_sender,
318                    );
319                }
320
321                if let Some(room_latest_event) = rooms.get(room_id) {
322                    let mut room_latest_event = room_latest_event.write().await;
323
324                    // In `RoomLatestEvents`, the `LatestEvent` for this thread doesn't exist. Let's
325                    // create and insert it.
326                    if room_latest_event.has_thread(thread_id).not() {
327                        room_latest_event.create_and_insert_latest_event_for_thread(thread_id);
328                    }
329                }
330
331                RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
332            }
333
334            // Get the room latest event with the aim of fetching the latest event for a particular
335            // room.
336            None => {
337                match RwLockReadGuard::try_map(self.rooms.read().await, |rooms| rooms.get(room_id))
338                    .ok()
339                {
340                    value @ Some(_) => value,
341                    None => {
342                        let _timer = timer!(
343                            tracing::Level::INFO,
344                            format!("Creating `RoomLatestEvents` for {room_id:?}"),
345                        );
346
347                        let mut rooms = self.rooms.write().await;
348
349                        if rooms.contains_key(room_id).not() {
350                            create_and_insert_room_latest_events(
351                                room_id,
352                                rooms.deref_mut(),
353                                &self.weak_client,
354                                &self.event_cache,
355                                &self.latest_event_queue_sender,
356                            );
357                        }
358
359                        RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok()
360                    }
361                }
362            }
363        })
364    }
365
366    /// Start listening to updates (if not already) for a particular room.
367    ///
368    /// It returns `None` if the room doesn't exist.
369    pub async fn for_room(
370        &self,
371        room_id: &RoomId,
372    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
373        self.room_latest_event(room_id, None).await
374    }
375
376    /// Start listening to updates (if not already) for a particular room.
377    ///
378    /// It returns `None` if the room or the thread doesn't exist.
379    pub async fn for_thread(
380        &self,
381        room_id: &RoomId,
382        thread_id: &EventId,
383    ) -> Result<Option<RwLockReadGuard<'_, RoomLatestEvents>>, LatestEventsError> {
384        self.room_latest_event(room_id, Some(thread_id)).await
385    }
386
387    /// Forget a room.
388    ///
389    /// It means that [`LatestEvents`] will stop listening to updates for the
390    /// `LatestEvent`s of the room and all its threads.
391    ///
392    /// If [`LatestEvents`] is not listening for `room_id`, nothing happens.
393    pub async fn forget_room(&self, room_id: &RoomId) {
394        {
395            let mut rooms = self.rooms.write().await;
396
397            // Remove the whole `RoomLatestEvents`.
398            rooms.remove(room_id);
399        }
400    }
401
402    /// Forget a thread.
403    ///
404    /// It means that [`LatestEvents`] will stop listening to updates for the
405    /// `LatestEvent` of the thread.
406    ///
407    /// If [`LatestEvents`] is not listening for `room_id` or `thread_id`,
408    /// nothing happens.
409    pub async fn forget_thread(&self, room_id: &RoomId, thread_id: &EventId) {
410        let rooms = self.rooms.read().await;
411
412        // If the `RoomLatestEvents`, remove the `LatestEvent` in `per_thread`.
413        if let Some(room_latest_event) = rooms.get(room_id) {
414            let mut room_latest_event = room_latest_event.write().await;
415
416            // Release the lock on `self.rooms`.
417            drop(rooms);
418
419            room_latest_event.forget_thread(thread_id);
420        }
421    }
422}
423
424/// Represents the kind of updates the [`compute_latest_events_task`] will have
425/// to deal with.
426#[derive(Debug)]
427enum LatestEventQueueUpdate {
428    /// An update from the [`EventCache`] happened.
429    EventCache {
430        /// The ID of the room that has triggered the update.
431        room_id: OwnedRoomId,
432    },
433
434    /// An update from the [`SendQueue`] happened.
435    SendQueue {
436        /// The ID of the room that has triggered the update.
437        room_id: OwnedRoomId,
438
439        /// The update itself.
440        update: RoomSendQueueUpdate,
441    },
442
443    /// An update from the [`RoomInfo`] happened.
444    ///
445    /// [`RoomInfo`]: crate::RoomInfo
446    RoomInfo {
447        /// The ID of the room that has triggered the update.
448        room_id: OwnedRoomId,
449
450        /// The notable update reasons.
451        reasons: RoomInfoNotableUpdateReasons,
452    },
453}
454
455/// The task responsible to listen to the [`EventCache`], the [`SendQueue`] and
456/// the [`RoomInfoNotableUpdate`].
457///
458/// When an update is received and is considered relevant, a message is sent to
459/// the [`compute_latest_events_task`] to compute a new [`LatestEvent`].
460///
461/// When an update is considered relevant, a message is sent over the
462/// `latest_event_queue_sender` channel. See [`compute_latest_events_task`].
463async fn listen_to_updates_task(
464    registered_rooms: Arc<RegisteredRooms>,
465    event_cache: EventCache,
466    send_queue: SendQueue,
467    room_info_updates: broadcast::Receiver<RoomInfoNotableUpdate>,
468    latest_event_queue_sender: mpsc::UnboundedSender<LatestEventQueueUpdate>,
469) {
470    let mut event_cache_generic_updates_subscriber =
471        event_cache.subscribe_to_room_generic_updates();
472    let mut send_queue_generic_updates_subscriber = send_queue.subscribe();
473    let mut room_info_updates_subscriber = room_info_updates.resubscribe();
474
475    loop {
476        if listen_to_updates(
477            &registered_rooms.rooms,
478            &mut event_cache_generic_updates_subscriber,
479            &mut send_queue_generic_updates_subscriber,
480            &mut room_info_updates_subscriber,
481            &latest_event_queue_sender,
482        )
483        .await
484        .is_break()
485        {
486            warn!("`listen_to_updates_task` has stopped");
487
488            break;
489        }
490    }
491}
492
493/// The core of [`listen_to_updates_task`].
494///
495/// Having this function detached from its task is helpful for testing and for
496/// state isolation.
497async fn listen_to_updates(
498    registered_rooms: &RwLock<HashMap<OwnedRoomId, RoomLatestEvents>>,
499    event_cache_generic_updates_subscriber: &mut broadcast::Receiver<RoomEventCacheGenericUpdate>,
500    send_queue_generic_updates_subscriber: &mut broadcast::Receiver<SendQueueUpdate>,
501    room_info_updates_subscriber: &mut broadcast::Receiver<RoomInfoNotableUpdate>,
502    latest_event_queue_sender: &mpsc::UnboundedSender<LatestEventQueueUpdate>,
503) -> ControlFlow<()> {
504    select! {
505        room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv() => {
506            if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update {
507                if registered_rooms.read().await.contains_key(&room_id) {
508                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache {
509                        room_id
510                    });
511                }
512            } else {
513                warn!("`event_cache_generic_updates` channel has been closed");
514
515                return ControlFlow::Break(());
516            }
517        }
518
519        send_queue_generic_update = send_queue_generic_updates_subscriber.recv() => {
520            if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update {
521                if registered_rooms.read().await.contains_key(&room_id) {
522                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue {
523                        room_id,
524                        update
525                    });
526                }
527            } else {
528                warn!("`send_queue_generic_updates` channel has been closed");
529
530                return ControlFlow::Break(());
531            }
532        }
533
534        room_info_update = room_info_updates_subscriber.recv() => {
535            if let Ok(RoomInfoNotableUpdate { room_id, reasons }) = room_info_update {
536                // Filter the update reasons we are interested by.
537                if
538                    // Be careful: the `LATEST_EVENT` reason alone must always
539                    // be ignored! Otherwise it can create a loop.
540                    reasons == RoomInfoNotableUpdateReasons::LATEST_EVENT ||
541
542                    // We are interested by `MEMBERSHIP` so that it captures
543                    // when the user is invited, is joining, is knocking, or is
544                    // leaving a room.
545                    !reasons.contains(RoomInfoNotableUpdateReasons::MEMBERSHIP)
546                {
547                    return ControlFlow::Continue(())
548                }
549
550                if registered_rooms.read().await.contains_key(&room_id) {
551                    let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::RoomInfo {
552                        room_id,
553                        reasons,
554                    });
555                }
556            } else {
557                warn!("`room_info_updates` channel has been closed");
558
559                return ControlFlow::Break(());
560            }
561        }
562    }
563
564    ControlFlow::Continue(())
565}
566
567/// The task responsible to compute new [`LatestEvent`] for a particular room or
568/// thread.
569///
570/// The messages are coming from [`listen_to_updates_task`].
571async fn compute_latest_events_task(
572    registered_rooms: Arc<RegisteredRooms>,
573    mut latest_event_queue_receiver: mpsc::UnboundedReceiver<LatestEventQueueUpdate>,
574) {
575    const BUFFER_SIZE: usize = 16;
576
577    let mut buffer = Vec::with_capacity(BUFFER_SIZE);
578
579    while latest_event_queue_receiver.recv_many(&mut buffer, BUFFER_SIZE).await > 0 {
580        compute_latest_events(&registered_rooms, &buffer).await;
581        buffer.clear();
582    }
583
584    warn!("`compute_latest_events_task` has stopped");
585}
586
587async fn compute_latest_events(
588    registered_rooms: &RegisteredRooms,
589    latest_event_queue_updates: &[LatestEventQueueUpdate],
590) {
591    async fn room_latest_events_write_guard(
592        registered_rooms: &RegisteredRooms,
593        room_id: &OwnedRoomId,
594    ) -> ControlFlow<RoomLatestEventsWriteGuard, ()> {
595        let rooms = registered_rooms.rooms.read().await;
596
597        if let Some(room_latest_events) = rooms.get(room_id) {
598            let room_latest_events = room_latest_events.write().await;
599
600            // Release the lock on `registered_rooms`.
601            // It is possible because `room_latest_events` is an owned lock guard.
602            drop(rooms);
603
604            ControlFlow::Break(room_latest_events)
605        } else {
606            info!(?room_id, "Failed to find the room");
607
608            ControlFlow::Continue(())
609        }
610    }
611
612    for latest_event_queue_update in latest_event_queue_updates {
613        match latest_event_queue_update {
614            LatestEventQueueUpdate::EventCache { room_id } => {
615                let ControlFlow::Break(mut room_latest_events) =
616                    room_latest_events_write_guard(registered_rooms, room_id).await
617                else {
618                    continue;
619                };
620
621                room_latest_events.update_with_event_cache().await;
622            }
623
624            LatestEventQueueUpdate::SendQueue { room_id, update } => {
625                let ControlFlow::Break(mut room_latest_events) =
626                    room_latest_events_write_guard(registered_rooms, room_id).await
627                else {
628                    continue;
629                };
630
631                room_latest_events.update_with_send_queue(update).await;
632            }
633
634            LatestEventQueueUpdate::RoomInfo { room_id, reasons } => {
635                let ControlFlow::Break(mut room_latest_events) =
636                    room_latest_events_write_guard(registered_rooms, room_id).await
637                else {
638                    continue;
639                };
640
641                room_latest_events.update_with_room_info(*reasons).await;
642            }
643        }
644    }
645}
646
647#[cfg(test)]
648fn local_room_message(body: &str) -> LocalLatestEventValue {
649    use matrix_sdk_base::store::SerializableEventContent;
650    use ruma::{
651        MilliSecondsSinceUnixEpoch,
652        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
653    };
654
655    LocalLatestEventValue {
656        timestamp: MilliSecondsSinceUnixEpoch::now(),
657        content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage(
658            RoomMessageEventContent::text_plain(body),
659        ))
660        .unwrap(),
661    }
662}
663
664#[cfg(all(test, not(target_family = "wasm")))]
665mod tests {
666    use std::{collections::HashMap, ops::Not, time::Duration};
667
668    use assert_matches::assert_matches;
669    use matrix_sdk_base::{
670        RoomState,
671        deserialized_responses::TimelineEventKind,
672        linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update},
673    };
674    use matrix_sdk_test::{
675        InvitedRoomBuilder, JoinedRoomBuilder, async_test, event_factory::EventFactory,
676    };
677    use ruma::{
678        MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id,
679        events::{
680            AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent, SyncMessageLikeEvent,
681            room::member::{MembershipState, SyncRoomMemberEvent},
682        },
683        owned_event_id, owned_room_id, room_id, user_id,
684    };
685    use stream_assert::assert_pending;
686    use tokio::{task::yield_now, time::timeout};
687
688    use super::{
689        LatestEventValue, RegisteredRooms, RemoteLatestEventValue, RoomEventCacheGenericUpdate,
690        RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomLatestEvents, RoomSendQueueUpdate,
691        RwLock, SendQueueUpdate, WeakClient, WeakRoom, With, broadcast, listen_to_updates, mpsc,
692    };
693    use crate::{
694        latest_events::{LatestEventQueueUpdate, local_room_message},
695        test_utils::mocks::MatrixMockServer,
696    };
697
698    #[async_test]
699    async fn test_latest_events_are_lazy() {
700        let room_id_0 = room_id!("!r0");
701        let room_id_1 = room_id!("!r1");
702        let room_id_2 = room_id!("!r2");
703        let thread_id_1_0 = event_id!("$ev1.0");
704        let thread_id_2_0 = event_id!("$ev2.0");
705
706        let server = MatrixMockServer::new().await;
707        let client = server.client_builder().build().await;
708
709        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
710        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
711        client.base_client().get_or_create_room(room_id_2, RoomState::Joined);
712
713        client.event_cache().subscribe().unwrap();
714
715        let latest_events = client.latest_events().await;
716
717        // Despites there are many rooms, zero `RoomLatestEvents` are created.
718        assert!(latest_events.state.registered_rooms.rooms.read().await.is_empty());
719
720        // Now let's listen to two rooms.
721        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
722        assert!(latest_events.listen_to_room(room_id_1).await.unwrap());
723
724        {
725            let rooms = latest_events.state.registered_rooms.rooms.read().await;
726            // There are two rooms…
727            assert_eq!(rooms.len(), 2);
728            // … which are room 0 and room 1.
729            assert!(rooms.contains_key(room_id_0));
730            assert!(rooms.contains_key(room_id_1));
731
732            // Room 0 contains zero thread latest events.
733            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
734            // Room 1 contains zero thread latest events.
735            assert!(rooms.get(room_id_1).unwrap().read().await.per_thread().is_empty());
736        }
737
738        // Now let's listen to one thread respectively for two rooms.
739        assert!(latest_events.listen_to_thread(room_id_1, thread_id_1_0).await.unwrap());
740        assert!(latest_events.listen_to_thread(room_id_2, thread_id_2_0).await.unwrap());
741
742        {
743            let rooms = latest_events.state.registered_rooms.rooms.read().await;
744            // There are now three rooms…
745            assert_eq!(rooms.len(), 3);
746            // … yup, room 2 is now created.
747            assert!(rooms.contains_key(room_id_0));
748            assert!(rooms.contains_key(room_id_1));
749            assert!(rooms.contains_key(room_id_2));
750
751            // Room 0 contains zero thread latest events.
752            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
753            // Room 1 contains one thread latest event…
754            let room_1 = rooms.get(room_id_1).unwrap().read().await;
755            assert_eq!(room_1.per_thread().len(), 1);
756            // … which is thread 1.0.
757            assert!(room_1.per_thread().contains_key(thread_id_1_0));
758            // Room 2 contains one thread latest event…
759            let room_2 = rooms.get(room_id_2).unwrap().read().await;
760            assert_eq!(room_2.per_thread().len(), 1);
761            // … which is thread 2.0.
762            assert!(room_2.per_thread().contains_key(thread_id_2_0));
763        }
764    }
765
766    #[async_test]
767    async fn test_forget_room() {
768        let room_id_0 = room_id!("!r0");
769        let room_id_1 = room_id!("!r1");
770
771        let server = MatrixMockServer::new().await;
772        let client = server.client_builder().build().await;
773
774        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
775        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
776
777        client.event_cache().subscribe().unwrap();
778
779        let latest_events = client.latest_events().await;
780
781        // Now let's fetch one room.
782        assert!(latest_events.listen_to_room(room_id_0).await.unwrap());
783
784        {
785            let rooms = latest_events.state.registered_rooms.rooms.read().await;
786            // There are one room…
787            assert_eq!(rooms.len(), 1);
788            // … which is room 0.
789            assert!(rooms.contains_key(room_id_0));
790
791            // Room 0 contains zero thread latest events.
792            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
793        }
794
795        // Now let's forget about room 0.
796        latest_events.forget_room(room_id_0).await;
797
798        {
799            let rooms = latest_events.state.registered_rooms.rooms.read().await;
800            // There are now zero rooms.
801            assert!(rooms.is_empty());
802        }
803    }
804
805    #[async_test]
806    async fn test_forget_thread() {
807        let room_id_0 = room_id!("!r0");
808        let room_id_1 = room_id!("!r1");
809        let thread_id_0_0 = event_id!("$ev0.0");
810
811        let server = MatrixMockServer::new().await;
812        let client = server.client_builder().build().await;
813
814        client.base_client().get_or_create_room(room_id_0, RoomState::Joined);
815        client.base_client().get_or_create_room(room_id_1, RoomState::Joined);
816
817        client.event_cache().subscribe().unwrap();
818
819        let latest_events = client.latest_events().await;
820
821        // Now let's fetch one thread .
822        assert!(latest_events.listen_to_thread(room_id_0, thread_id_0_0).await.unwrap());
823
824        {
825            let rooms = latest_events.state.registered_rooms.rooms.read().await;
826            // There is one room…
827            assert_eq!(rooms.len(), 1);
828            // … which is room 0.
829            assert!(rooms.contains_key(room_id_0));
830
831            // Room 0 contains one thread latest event…
832            let room_0 = rooms.get(room_id_0).unwrap().read().await;
833            assert_eq!(room_0.per_thread().len(), 1);
834            // … which is thread 0.0.
835            assert!(room_0.per_thread().contains_key(thread_id_0_0));
836        }
837
838        // Now let's forget about the thread.
839        latest_events.forget_thread(room_id_0, thread_id_0_0).await;
840
841        {
842            let rooms = latest_events.state.registered_rooms.rooms.read().await;
843            // There is still one room…
844            assert_eq!(rooms.len(), 1);
845            // … which is room 0.
846            assert!(rooms.contains_key(room_id_0));
847
848            // But the thread has been removed.
849            assert!(rooms.get(room_id_0).unwrap().read().await.per_thread().is_empty());
850        }
851    }
852
853    #[async_test]
854    async fn test_inputs_task_can_listen_to_room_event_cache() {
855        let room_id = owned_room_id!("!r0");
856
857        let server = MatrixMockServer::new().await;
858        let client = server.client_builder().build().await;
859        let weak_client = WeakClient::from_client(&client);
860        let weak_room = WeakRoom::new(weak_client, room_id.clone());
861
862        let event_cache = client.event_cache();
863
864        let registered_rooms = RwLock::new(HashMap::new());
865        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
866            broadcast::channel(1);
867        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
868            broadcast::channel(1);
869        let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
870        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
871
872        // New event cache update, but the `LatestEvents` isn't listening to it.
873        {
874            room_event_cache_generic_update_sender
875                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
876                .unwrap();
877
878            // Run the task.
879            assert!(
880                listen_to_updates(
881                    &registered_rooms,
882                    &mut room_event_cache_generic_update_receiver,
883                    &mut send_queue_generic_update_receiver,
884                    &mut room_info_update_receiver,
885                    &latest_event_queue_sender,
886                )
887                .await
888                .is_continue()
889            );
890
891            // No latest event computation has been triggered.
892            assert!(latest_event_queue_receiver.is_empty());
893        }
894
895        // New event cache update, but this time, the `LatestEvents` is listening to it.
896        {
897            registered_rooms.write().await.insert(
898                room_id.clone(),
899                With::inner(RoomLatestEvents::new(weak_room, event_cache)),
900            );
901            room_event_cache_generic_update_sender
902                .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() })
903                .unwrap();
904
905            assert!(
906                listen_to_updates(
907                    &registered_rooms,
908                    &mut room_event_cache_generic_update_receiver,
909                    &mut send_queue_generic_update_receiver,
910                    &mut room_info_update_receiver,
911                    &latest_event_queue_sender,
912                )
913                .await
914                .is_continue()
915            );
916
917            // A latest event computation has been triggered!
918            assert!(latest_event_queue_receiver.is_empty().not());
919        }
920    }
921
922    #[async_test]
923    async fn test_inputs_task_can_listen_to_send_queue() {
924        let room_id = owned_room_id!("!r0");
925
926        let server = MatrixMockServer::new().await;
927        let client = server.client_builder().build().await;
928        let weak_client = WeakClient::from_client(&client);
929        let weak_room = WeakRoom::new(weak_client, room_id.clone());
930
931        let event_cache = client.event_cache();
932
933        let registered_rooms = RwLock::new(HashMap::new());
934
935        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
936            broadcast::channel(1);
937        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
938            broadcast::channel(1);
939        let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
940        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
941
942        // New send queue update, but the `LatestEvents` isn't listening to it.
943        {
944            send_queue_generic_update_sender
945                .send(SendQueueUpdate {
946                    room_id: room_id.clone(),
947                    update: RoomSendQueueUpdate::SentEvent {
948                        transaction_id: OwnedTransactionId::from("txnid0"),
949                        event_id: owned_event_id!("$ev0"),
950                    },
951                })
952                .unwrap();
953
954            // Run the task.
955            assert!(
956                listen_to_updates(
957                    &registered_rooms,
958                    &mut room_event_cache_generic_update_receiver,
959                    &mut send_queue_generic_update_receiver,
960                    &mut room_info_update_receiver,
961                    &latest_event_queue_sender,
962                )
963                .await
964                .is_continue()
965            );
966
967            // No latest event computation has been triggered.
968            assert!(latest_event_queue_receiver.is_empty());
969        }
970
971        // New send queue update, but this time, the `LatestEvents` is listening to it.
972        {
973            registered_rooms.write().await.insert(
974                room_id.clone(),
975                With::inner(RoomLatestEvents::new(weak_room, event_cache)),
976            );
977            send_queue_generic_update_sender
978                .send(SendQueueUpdate {
979                    room_id: room_id.clone(),
980                    update: RoomSendQueueUpdate::SentEvent {
981                        transaction_id: OwnedTransactionId::from("txnid1"),
982                        event_id: owned_event_id!("$ev1"),
983                    },
984                })
985                .unwrap();
986
987            assert!(
988                listen_to_updates(
989                    &registered_rooms,
990                    &mut room_event_cache_generic_update_receiver,
991                    &mut send_queue_generic_update_receiver,
992                    &mut room_info_update_receiver,
993                    &latest_event_queue_sender,
994                )
995                .await
996                .is_continue()
997            );
998
999            // A latest event computation has been triggered!
1000            assert!(latest_event_queue_receiver.is_empty().not());
1001        }
1002    }
1003
1004    #[async_test]
1005    async fn test_inputs_task_can_listen_to_room_info() {
1006        let room_id = owned_room_id!("!r0");
1007
1008        let server = MatrixMockServer::new().await;
1009        let client = server.client_builder().build().await;
1010        let weak_client = WeakClient::from_client(&client);
1011        let weak_room = WeakRoom::new(weak_client, room_id.clone());
1012
1013        let event_cache = client.event_cache();
1014
1015        let registered_rooms = RwLock::new(HashMap::new());
1016
1017        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1018            broadcast::channel(1);
1019        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1020            broadcast::channel(1);
1021        let (room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1022        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1023
1024        // New room info update, but the `LatestEvents` isn't listening to it.
1025        {
1026            room_info_update_sender
1027                .send(RoomInfoNotableUpdate {
1028                    room_id: room_id.clone(),
1029                    reasons: RoomInfoNotableUpdateReasons::MEMBERSHIP,
1030                })
1031                .unwrap();
1032
1033            // Run the task.
1034            assert!(
1035                listen_to_updates(
1036                    &registered_rooms,
1037                    &mut room_event_cache_generic_update_receiver,
1038                    &mut send_queue_generic_update_receiver,
1039                    &mut room_info_update_receiver,
1040                    &latest_event_queue_sender,
1041                )
1042                .await
1043                .is_continue()
1044            );
1045
1046            // No latest event computation has been triggered.
1047            assert!(latest_event_queue_receiver.is_empty());
1048        }
1049
1050        // New room info update, but this time, the `LatestEvents` is listening to it.
1051        {
1052            registered_rooms.write().await.insert(
1053                room_id.clone(),
1054                With::inner(RoomLatestEvents::new(weak_room, event_cache)),
1055            );
1056            room_info_update_sender
1057                .send(RoomInfoNotableUpdate {
1058                    room_id: room_id.clone(),
1059                    reasons: RoomInfoNotableUpdateReasons::MEMBERSHIP,
1060                })
1061                .unwrap();
1062
1063            assert!(
1064                listen_to_updates(
1065                    &registered_rooms,
1066                    &mut room_event_cache_generic_update_receiver,
1067                    &mut send_queue_generic_update_receiver,
1068                    &mut room_info_update_receiver,
1069                    &latest_event_queue_sender,
1070                )
1071                .await
1072                .is_continue()
1073            );
1074
1075            // A latest event computation has been triggered!
1076            assert!(latest_event_queue_receiver.is_empty().not());
1077        }
1078    }
1079
1080    #[async_test]
1081    async fn test_inputs_task_can_listen_to_specific_room_info_update_reasons() {
1082        let room_id = owned_room_id!("!r0");
1083
1084        let server = MatrixMockServer::new().await;
1085        let client = server.client_builder().build().await;
1086        let weak_client = WeakClient::from_client(&client);
1087        let weak_room = WeakRoom::new(weak_client, room_id.clone());
1088
1089        let event_cache = client.event_cache();
1090
1091        let registered_rooms = RwLock::new(HashMap::new());
1092
1093        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1094            broadcast::channel(1);
1095        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1096            broadcast::channel(1);
1097        let (room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1098        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1099
1100        registered_rooms
1101            .write()
1102            .await
1103            .insert(room_id.clone(), With::inner(RoomLatestEvents::new(weak_room, event_cache)));
1104
1105        // - `RoomInfoNotableUpdateReasons::LATEST_EVENT` is forbidden, otherwise it
1106        //   could create loops.
1107        // - Other reasons are ignored, except
1108        //   `RoomInfoNotableUpdateReasons::MEMBERSHIP`.
1109        for reason in {
1110            let mut all = RoomInfoNotableUpdateReasons::all();
1111            all.remove(RoomInfoNotableUpdateReasons::MEMBERSHIP);
1112
1113            all.iter()
1114        } {
1115            room_info_update_sender
1116                .send(RoomInfoNotableUpdate { room_id: room_id.clone(), reasons: reason })
1117                .unwrap();
1118
1119            assert!(
1120                listen_to_updates(
1121                    &registered_rooms,
1122                    &mut room_event_cache_generic_update_receiver,
1123                    &mut send_queue_generic_update_receiver,
1124                    &mut room_info_update_receiver,
1125                    &latest_event_queue_sender,
1126                )
1127                .await
1128                .is_continue()
1129            );
1130
1131            // No latest event computation has been triggered.
1132            assert!(latest_event_queue_receiver.is_empty());
1133        }
1134
1135        // `RoomInfoNotableUpdateReason::MEMBERSHIP` is accepted.
1136        {
1137            room_info_update_sender
1138                .send(RoomInfoNotableUpdate {
1139                    room_id: room_id.clone(),
1140                    reasons: RoomInfoNotableUpdateReasons::MEMBERSHIP,
1141                })
1142                .unwrap();
1143
1144            assert!(
1145                listen_to_updates(
1146                    &registered_rooms,
1147                    &mut room_event_cache_generic_update_receiver,
1148                    &mut send_queue_generic_update_receiver,
1149                    &mut room_info_update_receiver,
1150                    &latest_event_queue_sender,
1151                )
1152                .await
1153                .is_continue()
1154            );
1155
1156            // A latest event computation has been triggered!
1157            assert!(latest_event_queue_receiver.is_empty().not());
1158        }
1159    }
1160
1161    #[async_test]
1162    async fn test_inputs_task_stops_when_event_cache_channel_is_closed() {
1163        let registered_rooms = RwLock::new(HashMap::new());
1164        let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1165            broadcast::channel(1);
1166        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1167            broadcast::channel(1);
1168        let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1169        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1170
1171        // Drop the sender to close the channel.
1172        drop(room_event_cache_generic_update_sender);
1173
1174        // Run the task.
1175        assert!(
1176            listen_to_updates(
1177                &registered_rooms,
1178                &mut room_event_cache_generic_update_receiver,
1179                &mut send_queue_generic_update_receiver,
1180                &mut room_info_update_receiver,
1181                &latest_event_queue_sender,
1182            )
1183            .await
1184            // It breaks!
1185            .is_break()
1186        );
1187
1188        assert!(latest_event_queue_receiver.is_empty());
1189    }
1190
1191    #[async_test]
1192    async fn test_inputs_task_stops_when_send_queue_channel_is_closed() {
1193        let registered_rooms = RwLock::new(HashMap::new());
1194        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1195            broadcast::channel(1);
1196        let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1197            broadcast::channel(1);
1198        let (_room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1199        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1200
1201        // Drop the sender to close the channel.
1202        drop(send_queue_generic_update_sender);
1203
1204        // Run the task.
1205        assert!(
1206            listen_to_updates(
1207                &registered_rooms,
1208                &mut room_event_cache_generic_update_receiver,
1209                &mut send_queue_generic_update_receiver,
1210                &mut room_info_update_receiver,
1211                &latest_event_queue_sender,
1212            )
1213            .await
1214            // It breaks!
1215            .is_break()
1216        );
1217
1218        assert!(latest_event_queue_receiver.is_empty());
1219    }
1220
1221    #[async_test]
1222    async fn test_inputs_task_stops_when_room_info_updates_are_closed() {
1223        let registered_rooms = RwLock::new(HashMap::new());
1224        let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) =
1225            broadcast::channel(1);
1226        let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) =
1227            broadcast::channel(1);
1228        let (room_info_update_sender, mut room_info_update_receiver) = broadcast::channel(1);
1229        let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel();
1230
1231        // Drop the sender to close the channel.
1232        drop(room_info_update_sender);
1233
1234        // Run the task.
1235        assert!(
1236            listen_to_updates(
1237                &registered_rooms,
1238                &mut room_event_cache_generic_update_receiver,
1239                &mut send_queue_generic_update_receiver,
1240                &mut room_info_update_receiver,
1241                &latest_event_queue_sender,
1242            )
1243            .await
1244            // It breaks!
1245            .is_break()
1246        );
1247
1248        assert!(latest_event_queue_receiver.is_empty());
1249    }
1250
1251    #[async_test]
1252    async fn test_latest_event_value_is_updated_via_event_cache() {
1253        let room_id = owned_room_id!("!r0");
1254        let user_id = user_id!("@mnt_io:matrix.org");
1255        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1256        let event_id_0 = event_id!("$ev0");
1257
1258        let server = MatrixMockServer::new().await;
1259        let client = server.client_builder().build().await;
1260
1261        // Create the room.
1262        client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1263
1264        let event_cache = client.event_cache();
1265        event_cache.subscribe().unwrap();
1266
1267        let latest_events = client.latest_events().await;
1268
1269        // Subscribe to the latest event values for this room.
1270        let mut latest_event_stream =
1271            latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1272
1273        // The stream is pending: no new latest event for the moment.
1274        assert_pending!(latest_event_stream);
1275
1276        // Update the event cache with a sync.
1277        server
1278            .sync_room(
1279                &client,
1280                JoinedRoomBuilder::new(&room_id)
1281                    .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_0)),
1282            )
1283            .await;
1284
1285        // The event cache has received its update from the sync. It has emitted a
1286        // generic update, which has been received by `LatestEvents` tasks, up to the
1287        // `compute_latest_events` which has updated the latest event value.
1288        assert_matches!(
1289            latest_event_stream.next().await,
1290            Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1291                assert_matches!(
1292                    event.deserialize().unwrap(),
1293                    AnySyncTimelineEvent::MessageLike(
1294                        AnySyncMessageLikeEvent::RoomMessage(
1295                            SyncMessageLikeEvent::Original(message_content)
1296                        )
1297                    ) => {
1298                        assert_eq!(message_content.content.body(), "raclette !");
1299                    }
1300                );
1301            }
1302        );
1303
1304        assert_pending!(latest_event_stream);
1305    }
1306
1307    #[async_test]
1308    async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily() {
1309        let room_id = owned_room_id!("!r0");
1310        let user_id = user_id!("@mnt_io:matrix.org");
1311        let event_factory = EventFactory::new().sender(user_id).room(&room_id);
1312        let event_id_0 = event_id!("$ev0");
1313
1314        let server = MatrixMockServer::new().await;
1315        let client = server.client_builder().build().await;
1316
1317        // Prelude.
1318        {
1319            // Create the room.
1320            client.base_client().get_or_create_room(&room_id, RoomState::Joined);
1321
1322            // Initialise the event cache store.
1323            client
1324                .event_cache_store()
1325                .lock()
1326                .await
1327                .expect("Could not acquire the event cache lock")
1328                .as_clean()
1329                .expect("Could not acquire a clean event cache lock")
1330                .handle_linked_chunk_updates(
1331                    LinkedChunkId::Room(&room_id),
1332                    vec![
1333                        Update::NewItemsChunk {
1334                            previous: None,
1335                            new: ChunkIdentifier::new(0),
1336                            next: None,
1337                        },
1338                        Update::PushItems {
1339                            at: Position::new(ChunkIdentifier::new(0), 0),
1340                            items: vec![
1341                                event_factory.text_msg("hello").event_id(event_id_0).into(),
1342                            ],
1343                        },
1344                    ],
1345                )
1346                .await
1347                .unwrap();
1348        }
1349
1350        let event_cache = client.event_cache();
1351        event_cache.subscribe().unwrap();
1352
1353        let latest_events = client.latest_events().await;
1354
1355        let mut latest_event_stream =
1356            latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1357
1358        // We have a race if the system is busy. Initially, the latest event
1359        // value is `LatestEventValue::None`, then an Event Cache generic update
1360        // is broadcasted manually, computing a new `LatestEventValue`. So let's
1361        // wait on the system to finish this, and assert the final
1362        // `LatestEventValue`.
1363        yield_now().await;
1364        assert_matches!(latest_event_stream.next_now().await, LatestEventValue::Remote(_));
1365
1366        assert_pending!(latest_event_stream);
1367    }
1368
1369    /// This tests a part of
1370    /// [`test_latest_event_value_is_initialized_by_the_event_cache_lazily`].
1371    ///
1372    /// When `RegisteredRooms::room_latest_events` restores a
1373    /// `LatestEventValue::None` (via `RoomLatestEvents::new`),
1374    /// a `LatestEventQueueUpdate::EventCache` is broadcasted to compute a
1375    /// `LatestEventValue` from the Event Cache lazily.
1376    #[async_test]
1377    async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily_inner() {
1378        let room_id_0 = owned_room_id!("!r0");
1379        let room_id_1 = owned_room_id!("!r1");
1380
1381        let server = MatrixMockServer::new().await;
1382        let client = server.client_builder().build().await;
1383
1384        // Create the rooms.
1385        let room_0 = client.base_client().get_or_create_room(&room_id_0, RoomState::Joined);
1386        let room_1 = client.base_client().get_or_create_room(&room_id_1, RoomState::Joined);
1387
1388        // Set up the rooms.
1389        // `room_0` always has a `LatestEventValue::None` as its the default value.
1390        let mut room_info_1 = room_0.clone_info();
1391        room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
1392        room_1.set_room_info(room_info_1, Default::default());
1393
1394        let weak_client = WeakClient::from_client(&client);
1395
1396        let event_cache = client.event_cache();
1397        event_cache.subscribe().unwrap();
1398
1399        let (latest_event_queue_sender, mut latest_event_queue_receiver) =
1400            mpsc::unbounded_channel();
1401
1402        let registered_rooms =
1403            RegisteredRooms::new(weak_client, event_cache, &latest_event_queue_sender);
1404
1405        // Room 0 has a `LatestEventValue::None`, a
1406        // `LatestEventQueueUpdate::EventCache` will be broadcasted.
1407        {
1408            let room_latest_events = registered_rooms.for_room(&room_id_0).await.unwrap().unwrap();
1409            assert_matches!(
1410                room_latest_events.read().await.for_room().get().await,
1411                LatestEventValue::None
1412            );
1413            assert_matches!(
1414                latest_event_queue_receiver.recv().await,
1415                Some(LatestEventQueueUpdate::EventCache { room_id }) => {
1416                    assert_eq!(room_id, room_id_0);
1417                }
1418            );
1419            assert!(latest_event_queue_receiver.is_empty());
1420        }
1421
1422        // Room 1 has a `LatestEventValue::Local*`, a
1423        // `LatestEventQueueUpdate::EventCache` will NOT be broadcasted.
1424        {
1425            let room_latest_events = registered_rooms.for_room(&room_id_1).await.unwrap().unwrap();
1426            assert_matches!(
1427                room_latest_events.read().await.for_room().get().await,
1428                LatestEventValue::LocalIsSending(_)
1429            );
1430            assert!(latest_event_queue_receiver.is_empty());
1431        }
1432    }
1433
1434    #[async_test]
1435    async fn test_latest_event_value_is_updated_via_room_infos_for_invites() {
1436        let room_id = owned_room_id!("!r0");
1437        let event_factory = EventFactory::new().room(&room_id);
1438        let event_id_0 = event_id!("$ev0");
1439        let event_id_1 = event_id!("$ev1");
1440
1441        let server = MatrixMockServer::new().await;
1442        let client = server.client_builder().build().await;
1443        let own_user_id = client.user_id().unwrap();
1444        let other_user_id = user_id!("@other:servername");
1445
1446        let event_cache = client.event_cache();
1447        event_cache.subscribe().unwrap();
1448
1449        let latest_events = client.latest_events().await;
1450
1451        // Subscribe to the latest event values for this room.
1452        let mut latest_event_stream =
1453            latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap();
1454
1455        // The stream is pending: no new latest event for the moment.
1456        assert_pending!(latest_event_stream);
1457
1458        let now = MilliSecondsSinceUnixEpoch::now().get();
1459
1460        // Update the room with a sync: the user is invited to a room.
1461        {
1462            server
1463                .sync_room(
1464                    &client,
1465                    InvitedRoomBuilder::new(&room_id).add_state_event(
1466                        event_factory
1467                            .member(other_user_id)
1468                            .invited(own_user_id)
1469                            .event_id(event_id_0),
1470                    ),
1471                )
1472                .await;
1473
1474            // The room has received its update from the sync. It has emitted a room info
1475            // update, which has been received by `LatestEvents` tasks, up to the
1476            // `compute_latest_events` which has updated the latest event value.
1477            assert_matches!(
1478                latest_event_stream.next().await,
1479                Some(LatestEventValue::RemoteInvite { event_id, timestamp, inviter }) => {
1480                    // It's a stripped state event: they don't have an event ID.
1481                    assert!(event_id.is_none());
1482                    // It's a stripped state event: they don't have a timestamp (`origin_server_ts`), but `now` is normally used as a fallback.
1483                    assert!(timestamp.get() >= now);
1484                    assert_eq!(inviter.as_deref(), Some(other_user_id));
1485                }
1486            );
1487
1488            assert_pending!(latest_event_stream);
1489        };
1490
1491        // Update the room with a sync: the user is invited to the same room.
1492        {
1493            server
1494                .sync_room(
1495                    &client,
1496                    InvitedRoomBuilder::new(&room_id).add_state_event(
1497                        event_factory
1498                            .member(other_user_id)
1499                            .invited(own_user_id)
1500                            .event_id(event_id_0),
1501                    ),
1502                )
1503                .await;
1504
1505            // The room has received its update from the sync. It has emitted a room info
1506            // update, which has been received by `LatestEvents` tasks, up to the
1507            // `compute_latest_events` which has NOT updated the latest event value because
1508            // a previous `RemoteInvite` was already computed.
1509            assert!(timeout(Duration::from_secs(1), latest_event_stream.next()).await.is_err());
1510
1511            assert_pending!(latest_event_stream);
1512        }
1513
1514        // Update the room with a sync: the user is joining the room.
1515        {
1516            let now = u64::from(now) + 10; // time flies
1517            server
1518                .sync_room(
1519                    &client,
1520                    JoinedRoomBuilder::new(&room_id).add_timeline_event(
1521                        event_factory
1522                            .member(own_user_id)
1523                            .membership(MembershipState::Join)
1524                            .event_id(event_id_1)
1525                            .server_ts(now),
1526                    ),
1527                )
1528                .await;
1529
1530            // The event cache has received its update from the sync. It has emitted a
1531            // generic update, which has been received by `LatestEvents` tasks, up to the
1532            // `compute_latest_events` which has updated the latest event value.
1533            assert_matches!(
1534                latest_event_stream.next().await,
1535                Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => {
1536                    assert_matches!(
1537                        event.deserialize().unwrap(),
1538                        AnySyncTimelineEvent::State(
1539                            AnySyncStateEvent::RoomMember(
1540                                SyncRoomMemberEvent::Original(event)
1541                            )
1542                        ) => {
1543                            assert_eq!(event.event_id, event_id_1);
1544                            assert_eq!(event.content.membership, MembershipState::Join);
1545                            assert_eq!(event.sender, own_user_id);
1546                            assert_eq!(event.state_key, own_user_id);
1547                            assert_eq!(u64::from(event.origin_server_ts.get()), now);
1548                        }
1549                    );
1550                }
1551            );
1552
1553            assert_pending!(latest_event_stream);
1554        }
1555    }
1556}