Skip to main content

matrix_sdk_ui/timeline/
thread_list_service.rs

1// Copyright 2026 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use eyeball::{ObservableWriteGuard, SharedObservable, Subscriber};
18use eyeball_im::{ObservableVector, VectorDiff, VectorSubscriberBatchedStream};
19use futures_util::future::join_all;
20use imbl::Vector;
21use matrix_sdk::{
22    Result, Room,
23    deserialized_responses::TimelineEvent,
24    event_cache::{RoomEventCacheSubscriber, RoomEventCacheUpdate},
25    locks::Mutex,
26    paginators::PaginationToken,
27    room::ListThreadsOptions,
28    task_monitor::BackgroundTaskHandle,
29};
30use matrix_sdk_common::serde_helpers::extract_thread_root;
31use ruma::{
32    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId,
33    events::{
34        AnyMessageLikeEventContent, relation::Replacement, room::message::RoomMessageEventContent,
35    },
36};
37use tokio::sync::Mutex as AsyncMutex;
38use tracing::{error, trace, warn};
39
40use crate::timeline::{
41    Profile, TimelineDetails, TimelineItemContent,
42    event_handler::{HandleAggregationKind, TimelineAction},
43    traits::RoomDataProvider,
44};
45
46/// Each `ThreadListItem` represents one thread root event in the room. The
47/// fields are pre-resolved from the raw homeserver response: the sender's
48/// profile is fetched eagerly and the event content is parsed into a
49/// [`TimelineItemContent`] so that consumers can render the item without any
50/// additional work.
51///
52/// `ThreadListItem`s are accumulated inside
53/// [`super::thread_list_service::ThreadListService`] as pages are fetched via
54/// [`super::thread_list_service::ThreadListService::paginate`].
55#[derive(Clone, Debug)]
56pub struct ThreadListItem {
57    /// The thread root event.
58    pub root_event: ThreadListItemEvent,
59
60    /// The latest event in the thread (i.e. the most recent reply), if
61    /// available.
62    ///
63    /// This is initially populated from the server's bundled thread summary
64    /// and is updated in real time as new events arrive via sync.
65    pub latest_event: Option<ThreadListItemEvent>,
66
67    /// The number of replies in this thread (excluding the root event).
68    ///
69    /// This is initially populated from the server's bundled thread summary
70    /// and is updated in real time as new events arrive via sync.
71    pub num_replies: u32,
72}
73
74/// Information about an event in a thread (either the root or the latest
75/// reply).
76#[derive(Clone, Debug)]
77pub struct ThreadListItemEvent {
78    /// The event ID.
79    pub event_id: OwnedEventId,
80
81    /// The timestamp of the event.
82    pub timestamp: MilliSecondsSinceUnixEpoch,
83
84    /// The sender of the event.
85    pub sender: OwnedUserId,
86
87    /// Whether the event was sent by the current user.
88    pub is_own: bool,
89
90    /// The sender's profile (display name and avatar URL).
91    pub sender_profile: TimelineDetails<Profile>,
92
93    /// The parsed content of the event, if available.
94    ///
95    /// `None` when the event could not be deserialized into a known
96    /// [`TimelineItemContent`] variant (e.g. an unsupported or redacted event
97    /// type).
98    pub content: Option<TimelineItemContent>,
99}
100
101/// The pagination state of a [`ThreadListService`].
102#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
103#[derive(Clone, Debug, Eq, PartialEq)]
104pub enum ThreadListPaginationState {
105    /// The list is idle (not currently loading).
106    Idle {
107        /// Whether the end of the thread list has been reached (no more pages
108        /// to load).
109        end_reached: bool,
110    },
111    /// The list is currently loading the next page.
112    Loading,
113}
114
115/// An error that occurred while using a [`ThreadListService`].
116#[derive(Debug, thiserror::Error)]
117pub enum ThreadListServiceError {
118    /// An error from the underlying Matrix SDK.
119    #[error(transparent)]
120    Sdk(#[from] matrix_sdk::Error),
121}
122
123/// A paginated list of threads for a given room.
124///
125/// `ThreadListService` provides an observable, paginated list of
126/// [`ThreadListItem`]s. It exposes methods to paginate forward through the
127/// thread list as well as subscribe to state changes.
128///
129/// When created, the service automatically starts a background task that
130/// listens to room event cache updates (from `/sync` and other sources).
131/// Whenever a new event belonging to a known thread arrives, the service
132/// updates that thread's `latest_event` and `num_replies` fields in real time,
133/// emitting observable diffs to all subscribers.
134///
135/// # Example
136///
137/// ```no_run
138/// use matrix_sdk::Room;
139/// use matrix_sdk_ui::timeline::thread_list_service::{
140///     ThreadListPaginationState, ThreadListService,
141/// };
142///
143/// # async {
144/// # let room: Room = todo!();
145/// let service = ThreadListService::new(room);
146///
147/// assert_eq!(
148///     service.pagination_state(),
149///     ThreadListPaginationState::Idle { end_reached: false }
150/// );
151///
152/// service.paginate().await.unwrap();
153///
154/// let items = service.items();
155/// # anyhow::Ok(()) };
156/// ```
157pub struct ThreadListService {
158    /// The room whose threads are being listed.
159    room: Room,
160
161    /// The pagination token used to fetch subsequent pages.
162    token: AsyncMutex<PaginationToken>,
163
164    /// The current pagination state.
165    pagination_state: SharedObservable<ThreadListPaginationState>,
166
167    /// The current list of thread items.
168    items: Arc<Mutex<ObservableVector<ThreadListItem>>>,
169
170    /// Handle to the background task listening for event cache updates.
171    /// Dropping this aborts the task.
172    _event_cache_task: BackgroundTaskHandle,
173}
174
175impl ThreadListService {
176    /// Creates a new [`ThreadListService`] for the given room.
177    ///
178    /// This immediately spawns a background task that listens to the room's
179    /// event cache for live updates. The task self-bootstraps by performing
180    /// the async event cache subscription internally.
181    pub fn new(room: Room) -> Self {
182        let items: Arc<Mutex<ObservableVector<ThreadListItem>>> =
183            Arc::new(Mutex::new(ObservableVector::new()));
184
185        // Eagerly subscribe the event cache to sync responses (this is a cheap,
186        // synchronous, idempotent call).
187        if let Err(e) = room.client().event_cache().subscribe() {
188            warn!("ThreadListService: failed to subscribe event cache to sync: {e}");
189        }
190
191        let event_cache_task = room
192            .client()
193            .task_monitor()
194            .spawn_background_task("thread_list_service::event_cache_listener", {
195                let room = room.clone();
196                let items = items.clone();
197                async move {
198                    // Obtain the room event cache and a subscriber.
199                    let (_event_cache_drop, mut subscriber) = match async {
200                        let (room_event_cache, drop_handles) = room.event_cache().await?;
201                        let (_, subscriber) = room_event_cache.subscribe().await?;
202                        matrix_sdk::event_cache::Result::Ok((drop_handles, subscriber))
203                    }
204                    .await
205                    {
206                        Ok(pair) => pair,
207                        Err(e) => {
208                            error!(
209                                "ThreadListService: failed to subscribe to room event cache, \
210                                 live updates will not work: {e}"
211                            );
212                            return;
213                        }
214                    };
215
216                    trace!("ThreadListService: event cache listener started");
217
218                    Self::event_cache_listener_loop(&room, &mut subscriber, items).await;
219                }
220            })
221            .abort_on_drop();
222
223        Self {
224            room,
225            token: AsyncMutex::new(PaginationToken::None),
226            pagination_state: SharedObservable::new(ThreadListPaginationState::Idle {
227                end_reached: false,
228            }),
229            items,
230            _event_cache_task: event_cache_task,
231        }
232    }
233
234    /// Returns the current pagination state.
235    pub fn pagination_state(&self) -> ThreadListPaginationState {
236        self.pagination_state.get()
237    }
238
239    /// Subscribes to pagination state updates.
240    ///
241    /// The returned [`Subscriber`] will emit a new value every time the
242    /// pagination state changes.
243    pub fn subscribe_to_pagination_state_updates(&self) -> Subscriber<ThreadListPaginationState> {
244        self.pagination_state.subscribe()
245    }
246
247    /// Returns the current list of thread items as a snapshot.
248    pub fn items(&self) -> Vec<ThreadListItem> {
249        self.items.lock().iter().cloned().collect()
250    }
251
252    /// Subscribes to updates of the thread item list.
253    ///
254    /// Returns a snapshot of the current items alongside a batched stream of
255    /// [`eyeball_im::VectorDiff`]s that describe subsequent changes.
256    pub fn subscribe_to_items_updates(
257        &self,
258    ) -> (Vector<ThreadListItem>, VectorSubscriberBatchedStream<ThreadListItem>) {
259        self.items.lock().subscribe().into_values_and_batched_stream()
260    }
261
262    /// Fetches the next page of threads, appending the results to the item
263    /// list.
264    ///
265    /// - If the list is already loading or the end has been reached, this
266    ///   method returns immediately with `Ok(())`.
267    /// - On a network/SDK error the pagination state is reset to `Idle {
268    ///   end_reached: false }` and the error is propagated.
269    pub async fn paginate(&self) -> Result<(), ThreadListServiceError> {
270        // Guard: do nothing if we are already loading or have reached the end.
271        {
272            let mut pagination_state = self.pagination_state.write();
273
274            match *pagination_state {
275                ThreadListPaginationState::Idle { end_reached: true }
276                | ThreadListPaginationState::Loading => return Ok(()),
277                _ => {}
278            }
279
280            ObservableWriteGuard::set(&mut pagination_state, ThreadListPaginationState::Loading);
281        }
282
283        let mut pagination_token = self.token.lock().await;
284
285        // Build the options for this page, using the current token if we have one.
286        let from = match &*pagination_token {
287            PaginationToken::HasMore(token) => Some(token.clone()),
288            _ => None,
289        };
290
291        let opts = ListThreadsOptions { from, ..Default::default() };
292
293        match self.load_thread_list(opts).await {
294            Ok(thread_list) => {
295                // Update the pagination token based on whether there are more pages.
296                *pagination_token = match &thread_list.prev_batch_token {
297                    Some(token) => PaginationToken::HasMore(token.clone()),
298                    None => PaginationToken::HitEnd,
299                };
300
301                let end_reached = thread_list.prev_batch_token.is_none();
302
303                // Append new items to the observable vector.
304                self.items.lock().append(thread_list.items.into());
305
306                self.pagination_state.set(ThreadListPaginationState::Idle { end_reached });
307
308                Ok(())
309            }
310            Err(err) => {
311                self.pagination_state.set(ThreadListPaginationState::Idle { end_reached: false });
312                Err(ThreadListServiceError::Sdk(err))
313            }
314        }
315    }
316
317    /// Resets the service back to its initial state.
318    ///
319    /// Clears all loaded items, discards the current pagination token, and
320    /// sets the pagination state to `Idle { end_reached: false }`.  The next
321    /// call to [`Self::paginate`] will therefore start from the beginning of
322    /// the thread list.
323    pub async fn reset(&self) {
324        let mut pagination_token = self.token.lock().await;
325        *pagination_token = PaginationToken::None;
326
327        self.items.lock().clear();
328
329        self.pagination_state.set(ThreadListPaginationState::Idle { end_reached: false });
330    }
331
332    async fn load_thread_list(&self, opts: ListThreadsOptions) -> Result<ThreadList> {
333        let thread_roots = self.room.list_threads(opts).await?;
334
335        let list_items = join_all(
336            thread_roots
337                .chunk
338                .into_iter()
339                .map(|timeline_event| Self::build_thread_list_item(&self.room, timeline_event))
340                .collect::<Vec<_>>(),
341        )
342        .await
343        .into_iter()
344        .flatten()
345        .collect();
346
347        Ok(ThreadList { items: list_items, prev_batch_token: thread_roots.prev_batch_token })
348    }
349
350    async fn build_thread_list_item(
351        room: &Room,
352        timeline_event: TimelineEvent,
353    ) -> Option<ThreadListItem> {
354        // Extract thread summary info before consuming the event.
355        let thread_summary = timeline_event.thread_summary.summary().cloned();
356        let bundled_latest_thread_event = timeline_event.bundled_latest_thread_event.clone();
357
358        // Build the root event using the same logic as latest events.
359        let root_event = Self::build_event(room, timeline_event).await?;
360
361        // Build the latest event from the bundled thread summary, if available.
362        let num_replies = thread_summary.as_ref().map(|s| s.num_replies).unwrap_or(0);
363
364        let latest_event = if let Some(ev) = bundled_latest_thread_event.map(|b| *b) {
365            Self::build_event(room, ev).await
366        } else {
367            None
368        };
369
370        Some(ThreadListItem { root_event, latest_event, num_replies })
371    }
372
373    /// Build a [`ThreadListItemEvent`] from a [`TimelineEvent`].
374    async fn build_event(
375        room: &Room,
376        timeline_event: TimelineEvent,
377    ) -> Option<ThreadListItemEvent> {
378        let event_id = timeline_event.event_id()?;
379        let timestamp = timeline_event.timestamp()?;
380        let sender = timeline_event.sender()?;
381        let is_own = room.own_user_id() == sender;
382
383        let raw = timeline_event.into_raw();
384        let deserialized = match raw.deserialize() {
385            Ok(ev) => ev,
386            Err(e) => {
387                error!("Failed deserializing thread event for latest_event: {e}");
388                return None;
389            }
390        };
391
392        let sender_profile = room
393            .profile_from_user_id(&sender)
394            .await
395            .map(TimelineDetails::Ready)
396            .unwrap_or(TimelineDetails::Unavailable);
397
398        let content: Option<TimelineItemContent> = match TimelineAction::from_event(
399            deserialized,
400            &raw,
401            room,
402            None,
403            None,
404            None,
405            None,
406        )
407        .await
408        {
409            Some(TimelineAction::AddItem { content }) => Some(content),
410            Some(TimelineAction::HandleAggregation {
411                kind: HandleAggregationKind::Edit { replacement: Replacement { new_content, .. } },
412                ..
413            }) => {
414                match TimelineAction::from_content(
415                    AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::new(
416                        new_content.msgtype,
417                    )),
418                    None,
419                    None,
420                    None,
421                ) {
422                    TimelineAction::AddItem { content } => Some(content),
423                    _ => None,
424                }
425            }
426            _ => None,
427        };
428
429        Some(ThreadListItemEvent { event_id, timestamp, sender, is_own, sender_profile, content })
430    }
431
432    /// The main loop of the event-cache listener task.
433    ///
434    /// Listens for [`RoomEventCacheUpdate`]s and, for each new timeline event
435    /// that belongs to a thread we are tracking, updates the corresponding
436    /// [`ThreadListItem`]'s `latest_event` and `num_replies`.
437    async fn event_cache_listener_loop(
438        room: &Room,
439        subscriber: &mut RoomEventCacheSubscriber,
440        items: Arc<Mutex<ObservableVector<ThreadListItem>>>,
441    ) {
442        use tokio::sync::broadcast::error::RecvError;
443
444        loop {
445            let update = match subscriber.recv().await {
446                Ok(update) => update,
447                Err(RecvError::Closed) => {
448                    error!("ThreadListService: event cache channel closed, stopping listener");
449                    break;
450                }
451                Err(RecvError::Lagged(n)) => {
452                    warn!("ThreadListService: lagged behind {n} event cache updates");
453                    continue;
454                }
455            };
456
457            if let RoomEventCacheUpdate::UpdateTimelineEvents(timeline_diffs) = update {
458                let new_events = Self::collect_events_from_diffs(timeline_diffs.diffs);
459
460                for event in new_events {
461                    // Check if this event has a thread relation pointing to a known root.
462                    let Some(thread_root) = extract_thread_root(event.raw()) else { continue };
463
464                    // Find the position of this thread root in our list.
465                    let position = {
466                        let guard = items.lock();
467                        guard.iter().position(|item| item.root_event.event_id == thread_root)
468                    };
469
470                    if let Some(index) = position {
471                        // Build the latest event representation from the raw event.
472                        if let Some(latest_event) = Self::build_event(room, event).await {
473                            let mut guard = items.lock();
474
475                            // Re-check the position — the vector may have changed while
476                            // we were awaiting the profile lookup above.
477                            if index < guard.len()
478                                && guard[index].root_event.event_id == thread_root
479                            {
480                                let mut updated = guard[index].clone();
481                                updated.latest_event = Some(latest_event);
482                                updated.num_replies = updated.num_replies.saturating_add(1);
483                                guard.set(index, updated);
484                            }
485                        }
486                    }
487                }
488            }
489        }
490    }
491
492    /// Extracts all events from a list of [`VectorDiff`]s.
493    fn collect_events_from_diffs(
494        diffs: Vec<VectorDiff<matrix_sdk_base::event_cache::Event>>,
495    ) -> Vec<matrix_sdk_base::event_cache::Event> {
496        let mut events = Vec::new();
497
498        for diff in diffs {
499            match diff {
500                VectorDiff::Append { values } => events.extend(values),
501                VectorDiff::PushBack { value }
502                | VectorDiff::PushFront { value }
503                | VectorDiff::Insert { value, .. }
504                | VectorDiff::Set { value, .. } => events.push(value),
505                VectorDiff::Reset { values } => events.extend(values),
506                // These diffs don't carry new events.
507                VectorDiff::Clear
508                | VectorDiff::PopBack
509                | VectorDiff::PopFront
510                | VectorDiff::Remove { .. }
511                | VectorDiff::Truncate { .. } => {}
512            }
513        }
514
515        events
516    }
517}
518
519/// A structure wrapping a Thread List endpoint response i.e.
520/// [`ThreadListItem`]s and the current pagination token.
521#[derive(Clone, Debug)]
522struct ThreadList {
523    /// The thread-root events that belong to this page of results.
524    pub items: Vec<ThreadListItem>,
525
526    /// Opaque pagination token returned by the homeserver.
527    pub prev_batch_token: Option<String>,
528}
529
530#[cfg(test)]
531mod tests {
532    use std::time::Duration;
533
534    use futures_util::pin_mut;
535    use matrix_sdk::test_utils::mocks::MatrixMockServer;
536    use matrix_sdk_test::{async_test, event_factory::EventFactory};
537    use ruma::{event_id, events::AnyTimelineEvent, room_id, serde::Raw, user_id};
538    use serde_json::json;
539    use stream_assert::{assert_next_matches, assert_pending};
540    use wiremock::ResponseTemplate;
541
542    use super::{ThreadListPaginationState, ThreadListService};
543
544    #[async_test]
545    async fn test_initial_state() {
546        let server = MatrixMockServer::new().await;
547        let service = make_service(&server).await;
548
549        assert_eq!(
550            service.pagination_state(),
551            ThreadListPaginationState::Idle { end_reached: false }
552        );
553        assert!(service.items().is_empty());
554    }
555
556    #[async_test]
557    async fn test_pagination() {
558        let server = MatrixMockServer::new().await;
559        let client = server.client_builder().build().await;
560        let room_id = room_id!("!a:b.c");
561        let sender_id = user_id!("@alice:b.c");
562
563        let f = EventFactory::new().room(room_id).sender(sender_id);
564
565        let eid1 = event_id!("$1");
566        let eid2 = event_id!("$2");
567
568        server
569            .mock_room_threads()
570            .ok(
571                vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()],
572                Some("next_page_token".to_owned()),
573            )
574            .mock_once()
575            .mount()
576            .await;
577
578        server
579            .mock_room_threads()
580            .match_from("next_page_token")
581            .ok(vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()], None)
582            .mock_once()
583            .mount()
584            .await;
585
586        let room = server.sync_joined_room(&client, room_id).await;
587        let service = ThreadListService::new(room);
588
589        service.paginate().await.expect("first paginate failed");
590
591        assert_eq!(
592            service.pagination_state(),
593            ThreadListPaginationState::Idle { end_reached: false }
594        );
595        assert_eq!(service.items().len(), 1);
596        assert_eq!(service.items()[0].root_event.event_id, eid1);
597
598        service.paginate().await.expect("second paginate failed");
599
600        assert_eq!(
601            service.pagination_state(),
602            ThreadListPaginationState::Idle { end_reached: true }
603        );
604        assert_eq!(service.items().len(), 2);
605        assert_eq!(service.items()[1].root_event.event_id, eid2);
606    }
607
608    #[async_test]
609    async fn test_pagination_end_reached() {
610        let server = MatrixMockServer::new().await;
611        let client = server.client_builder().build().await;
612        let room_id = room_id!("!a:b.c");
613        let sender_id = user_id!("@alice:b.c");
614        let f = EventFactory::new().room(room_id).sender(sender_id);
615        let eid1 = event_id!("$1");
616
617        server
618            .mock_room_threads()
619            .ok(vec![f.text_msg("Thread root").event_id(eid1).into_raw()], None)
620            .mock_once()
621            .mount()
622            .await;
623
624        let room = server.sync_joined_room(&client, room_id).await;
625        let service = ThreadListService::new(room);
626
627        service.paginate().await.expect("paginate failed");
628        assert_eq!(
629            service.pagination_state(),
630            ThreadListPaginationState::Idle { end_reached: true }
631        );
632        assert_eq!(service.items().len(), 1);
633
634        service.paginate().await.expect("second paginate should be a no-op");
635        assert_eq!(service.items().len(), 1);
636        assert_eq!(
637            service.pagination_state(),
638            ThreadListPaginationState::Idle { end_reached: true }
639        );
640    }
641
642    /// Two concurrent calls to [`ThreadListService::paginate`] must not result
643    /// in two concurrent HTTP requests. The second call should detect that a
644    /// pagination is already in progress (state is `Loading`) and return
645    /// immediately without making another network request.
646    #[async_test]
647    async fn test_concurrent_pagination_is_not_possible() {
648        let server = MatrixMockServer::new().await;
649        let client = server.client_builder().build().await;
650        let room_id = room_id!("!a:b.c");
651        let sender_id = user_id!("@alice:b.c");
652        let f = EventFactory::new().room(room_id).sender(sender_id);
653        let eid1 = event_id!("$1");
654
655        // Set up a slow mock response so both `paginate()` calls overlap in
656        // flight. Using `expect(1)` means the test will panic during server
657        // teardown if the endpoint is hit more than once.
658        let chunk: Vec<Raw<AnyTimelineEvent>> =
659            vec![f.text_msg("Thread root").event_id(eid1).into_raw()];
660        server
661            .mock_room_threads()
662            .respond_with(
663                ResponseTemplate::new(200)
664                    .set_body_json(json!({ "chunk": chunk, "next_batch": null }))
665                    .set_delay(Duration::from_millis(100)),
666            )
667            .expect(1)
668            .mount()
669            .await;
670
671        let room = server.sync_joined_room(&client, room_id).await;
672        let service = ThreadListService::new(room);
673
674        // Run two paginations concurrently.
675        let (first, second) = tokio::join!(service.paginate(), service.paginate());
676
677        first.expect("first paginate should succeed");
678        second.expect("second (concurrent) paginate should succeed as a no-op");
679
680        // Only one HTTP request was made, so we have exactly one item.
681        assert_eq!(service.items().len(), 1);
682        assert_eq!(service.items()[0].root_event.event_id, eid1);
683        assert_eq!(
684            service.pagination_state(),
685            ThreadListPaginationState::Idle { end_reached: true }
686        );
687    }
688
689    /// When the server returns an error, [`ThreadListService::paginate`] must
690    /// propagate the error *and* reset the pagination state back to
691    /// `Idle { end_reached: false }` so that the caller can retry.
692    #[async_test]
693    async fn test_pagination_error() {
694        let server = MatrixMockServer::new().await;
695        let client = server.client_builder().build().await;
696        let room_id = room_id!("!a:b.c");
697
698        server.mock_room_threads().error500().mock_once().mount().await;
699
700        let room = server.sync_joined_room(&client, room_id).await;
701        let service = ThreadListService::new(room);
702
703        // Pagination must surface the server error.
704        service.paginate().await.expect_err("paginate should fail on a 500 response");
705
706        // The state must be reset so the caller can retry; it must *not* be
707        // stuck in `Loading`.
708        assert_eq!(
709            service.pagination_state(),
710            ThreadListPaginationState::Idle { end_reached: false }
711        );
712
713        // No items should have been added.
714        assert!(service.items().is_empty());
715    }
716
717    #[async_test]
718    async fn test_reset() {
719        let server = MatrixMockServer::new().await;
720        let client = server.client_builder().build().await;
721        let room_id = room_id!("!a:b.c");
722        let sender_id = user_id!("@alice:b.c");
723        let f = EventFactory::new().room(room_id).sender(sender_id);
724        let eid1 = event_id!("$1");
725
726        server
727            .mock_room_threads()
728            .ok(vec![f.text_msg("Thread root").event_id(eid1).into_raw()], None)
729            .expect(2)
730            .mount()
731            .await;
732
733        let room = server.sync_joined_room(&client, room_id).await;
734        let service = ThreadListService::new(room);
735
736        service.paginate().await.expect("first paginate failed");
737        assert_eq!(service.items().len(), 1);
738        assert_eq!(
739            service.pagination_state(),
740            ThreadListPaginationState::Idle { end_reached: true }
741        );
742
743        service.reset().await;
744        assert!(service.items().is_empty());
745        assert_eq!(
746            service.pagination_state(),
747            ThreadListPaginationState::Idle { end_reached: false }
748        );
749
750        service.paginate().await.expect("paginate after reset failed");
751        assert_eq!(service.items().len(), 1);
752    }
753
754    #[async_test]
755    async fn test_pagination_state_subscriber() {
756        let server = MatrixMockServer::new().await;
757        let client = server.client_builder().build().await;
758        let room_id = room_id!("!a:b.c");
759        let sender_id = user_id!("@alice:b.c");
760        let f = EventFactory::new().room(room_id).sender(sender_id);
761        let eid1 = event_id!("$1");
762
763        server
764            .mock_room_threads()
765            .ok(
766                vec![f.text_msg("Thread root").event_id(eid1).into_raw()],
767                Some("next_token".to_owned()),
768            )
769            .mock_once()
770            .mount()
771            .await;
772
773        let room = server.sync_joined_room(&client, room_id).await;
774        let service = ThreadListService::new(room);
775
776        let subscriber = service.subscribe_to_pagination_state_updates();
777        pin_mut!(subscriber);
778
779        assert_pending!(subscriber);
780
781        service.paginate().await.expect("paginate failed");
782
783        assert_next_matches!(subscriber, ThreadListPaginationState::Idle { end_reached: false });
784    }
785
786    #[async_test]
787    async fn test_paginated_items_have_num_replies_zero_without_summary() {
788        let server = MatrixMockServer::new().await;
789        let client = server.client_builder().build().await;
790        let room_id = room_id!("!a:b.c");
791        let sender_id = user_id!("@alice:b.c");
792        let f = EventFactory::new().room(room_id).sender(sender_id);
793        let eid1 = event_id!("$1");
794
795        // A thread root without bundled thread summary.
796        server
797            .mock_room_threads()
798            .ok(vec![f.text_msg("Thread root").event_id(eid1).into_raw()], None)
799            .mock_once()
800            .mount()
801            .await;
802
803        let room = server.sync_joined_room(&client, room_id).await;
804        let service = ThreadListService::new(room);
805
806        service.paginate().await.expect("paginate failed");
807
808        let items = service.items();
809        assert_eq!(items.len(), 1);
810        assert_eq!(items[0].num_replies, 0);
811        assert!(items[0].latest_event.is_none());
812    }
813
814    #[async_test]
815    async fn test_paginated_items_have_num_replies_from_bundled_summary() {
816        let server = MatrixMockServer::new().await;
817        let client = server.client_builder().build().await;
818        let room_id = room_id!("!a:b.c");
819        let sender_id = user_id!("@alice:b.c");
820        let f = EventFactory::new().room(room_id).sender(sender_id);
821        let root_id = event_id!("$root");
822        let reply_id = event_id!("$reply");
823
824        // Build a reply event to use as the bundled latest event.
825        // `with_bundled_thread_summary` expects `Raw<AnySyncMessageLikeEvent>`,
826        // so we cast from the more general `Raw<AnySyncTimelineEvent>`.
827        let reply_event =
828            f.text_msg("Reply in thread").event_id(reply_id).into_raw_sync().cast_unchecked();
829
830        // Build a thread root with a bundled thread summary (3 replies).
831        let thread_root = f
832            .text_msg("Thread root")
833            .event_id(root_id)
834            .with_bundled_thread_summary(reply_event, 3, false)
835            .into_raw();
836
837        server.mock_room_threads().ok(vec![thread_root], None).mock_once().mount().await;
838
839        let room = server.sync_joined_room(&client, room_id).await;
840        let service = ThreadListService::new(room);
841
842        service.paginate().await.expect("paginate failed");
843
844        let items = service.items();
845        assert_eq!(items.len(), 1);
846        assert_eq!(items[0].root_event.event_id, root_id);
847        assert_eq!(items[0].num_replies, 3);
848
849        // The latest event should be populated from the bundled summary.
850        let latest = items[0].latest_event.as_ref().expect("should have latest_event");
851        assert_eq!(latest.event_id, reply_id);
852        assert_eq!(latest.sender.as_str(), sender_id.as_str());
853    }
854
855    /// Builds a [`ThreadListService`] and makes the room known to the client
856    /// by performing a sync.
857    async fn make_service(server: &MatrixMockServer) -> ThreadListService {
858        let client = server.client_builder().build().await;
859        let room_id = room_id!("!a:b.c");
860        let room = server.sync_joined_room(&client, room_id).await;
861        ThreadListService::new(room)
862    }
863}