1use std::sync::Arc;
16
17use eyeball::{ObservableWriteGuard, SharedObservable, Subscriber};
18use eyeball_im::{ObservableVector, VectorDiff, VectorSubscriberBatchedStream};
19use futures_util::future::join_all;
20use imbl::Vector;
21use matrix_sdk::{
22 Result, Room,
23 deserialized_responses::TimelineEvent,
24 event_cache::{RoomEventCacheSubscriber, RoomEventCacheUpdate},
25 locks::Mutex,
26 paginators::PaginationToken,
27 room::ListThreadsOptions,
28 task_monitor::BackgroundTaskHandle,
29};
30use matrix_sdk_common::serde_helpers::extract_thread_root;
31use ruma::{
32 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId,
33 events::{
34 AnyMessageLikeEventContent, relation::Replacement, room::message::RoomMessageEventContent,
35 },
36};
37use tokio::sync::Mutex as AsyncMutex;
38use tracing::{error, trace, warn};
39
40use crate::timeline::{
41 Profile, TimelineDetails, TimelineItemContent,
42 event_handler::{HandleAggregationKind, TimelineAction},
43 traits::RoomDataProvider,
44};
45
46#[derive(Clone, Debug)]
56pub struct ThreadListItem {
57 pub root_event: ThreadListItemEvent,
59
60 pub latest_event: Option<ThreadListItemEvent>,
66
67 pub num_replies: u32,
72}
73
74#[derive(Clone, Debug)]
77pub struct ThreadListItemEvent {
78 pub event_id: OwnedEventId,
80
81 pub timestamp: MilliSecondsSinceUnixEpoch,
83
84 pub sender: OwnedUserId,
86
87 pub is_own: bool,
89
90 pub sender_profile: TimelineDetails<Profile>,
92
93 pub content: Option<TimelineItemContent>,
99}
100
101#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
103#[derive(Clone, Debug, Eq, PartialEq)]
104pub enum ThreadListPaginationState {
105 Idle {
107 end_reached: bool,
110 },
111 Loading,
113}
114
115#[derive(Debug, thiserror::Error)]
117pub enum ThreadListServiceError {
118 #[error(transparent)]
120 Sdk(#[from] matrix_sdk::Error),
121}
122
123pub struct ThreadListService {
158 room: Room,
160
161 token: AsyncMutex<PaginationToken>,
163
164 pagination_state: SharedObservable<ThreadListPaginationState>,
166
167 items: Arc<Mutex<ObservableVector<ThreadListItem>>>,
169
170 _event_cache_task: BackgroundTaskHandle,
173}
174
175impl ThreadListService {
176 pub fn new(room: Room) -> Self {
182 let items: Arc<Mutex<ObservableVector<ThreadListItem>>> =
183 Arc::new(Mutex::new(ObservableVector::new()));
184
185 if let Err(e) = room.client().event_cache().subscribe() {
188 warn!("ThreadListService: failed to subscribe event cache to sync: {e}");
189 }
190
191 let event_cache_task = room
192 .client()
193 .task_monitor()
194 .spawn_background_task("thread_list_service::event_cache_listener", {
195 let room = room.clone();
196 let items = items.clone();
197 async move {
198 let (_event_cache_drop, mut subscriber) = match async {
200 let (room_event_cache, drop_handles) = room.event_cache().await?;
201 let (_, subscriber) = room_event_cache.subscribe().await?;
202 matrix_sdk::event_cache::Result::Ok((drop_handles, subscriber))
203 }
204 .await
205 {
206 Ok(pair) => pair,
207 Err(e) => {
208 error!(
209 "ThreadListService: failed to subscribe to room event cache, \
210 live updates will not work: {e}"
211 );
212 return;
213 }
214 };
215
216 trace!("ThreadListService: event cache listener started");
217
218 Self::event_cache_listener_loop(&room, &mut subscriber, items).await;
219 }
220 })
221 .abort_on_drop();
222
223 Self {
224 room,
225 token: AsyncMutex::new(PaginationToken::None),
226 pagination_state: SharedObservable::new(ThreadListPaginationState::Idle {
227 end_reached: false,
228 }),
229 items,
230 _event_cache_task: event_cache_task,
231 }
232 }
233
234 pub fn pagination_state(&self) -> ThreadListPaginationState {
236 self.pagination_state.get()
237 }
238
239 pub fn subscribe_to_pagination_state_updates(&self) -> Subscriber<ThreadListPaginationState> {
244 self.pagination_state.subscribe()
245 }
246
247 pub fn items(&self) -> Vec<ThreadListItem> {
249 self.items.lock().iter().cloned().collect()
250 }
251
252 pub fn subscribe_to_items_updates(
257 &self,
258 ) -> (Vector<ThreadListItem>, VectorSubscriberBatchedStream<ThreadListItem>) {
259 self.items.lock().subscribe().into_values_and_batched_stream()
260 }
261
262 pub async fn paginate(&self) -> Result<(), ThreadListServiceError> {
270 {
272 let mut pagination_state = self.pagination_state.write();
273
274 match *pagination_state {
275 ThreadListPaginationState::Idle { end_reached: true }
276 | ThreadListPaginationState::Loading => return Ok(()),
277 _ => {}
278 }
279
280 ObservableWriteGuard::set(&mut pagination_state, ThreadListPaginationState::Loading);
281 }
282
283 let mut pagination_token = self.token.lock().await;
284
285 let from = match &*pagination_token {
287 PaginationToken::HasMore(token) => Some(token.clone()),
288 _ => None,
289 };
290
291 let opts = ListThreadsOptions { from, ..Default::default() };
292
293 match self.load_thread_list(opts).await {
294 Ok(thread_list) => {
295 *pagination_token = match &thread_list.prev_batch_token {
297 Some(token) => PaginationToken::HasMore(token.clone()),
298 None => PaginationToken::HitEnd,
299 };
300
301 let end_reached = thread_list.prev_batch_token.is_none();
302
303 self.items.lock().append(thread_list.items.into());
305
306 self.pagination_state.set(ThreadListPaginationState::Idle { end_reached });
307
308 Ok(())
309 }
310 Err(err) => {
311 self.pagination_state.set(ThreadListPaginationState::Idle { end_reached: false });
312 Err(ThreadListServiceError::Sdk(err))
313 }
314 }
315 }
316
317 pub async fn reset(&self) {
324 let mut pagination_token = self.token.lock().await;
325 *pagination_token = PaginationToken::None;
326
327 self.items.lock().clear();
328
329 self.pagination_state.set(ThreadListPaginationState::Idle { end_reached: false });
330 }
331
332 async fn load_thread_list(&self, opts: ListThreadsOptions) -> Result<ThreadList> {
333 let thread_roots = self.room.list_threads(opts).await?;
334
335 let list_items = join_all(
336 thread_roots
337 .chunk
338 .into_iter()
339 .map(|timeline_event| Self::build_thread_list_item(&self.room, timeline_event))
340 .collect::<Vec<_>>(),
341 )
342 .await
343 .into_iter()
344 .flatten()
345 .collect();
346
347 Ok(ThreadList { items: list_items, prev_batch_token: thread_roots.prev_batch_token })
348 }
349
350 async fn build_thread_list_item(
351 room: &Room,
352 timeline_event: TimelineEvent,
353 ) -> Option<ThreadListItem> {
354 let thread_summary = timeline_event.thread_summary.summary().cloned();
356 let bundled_latest_thread_event = timeline_event.bundled_latest_thread_event.clone();
357
358 let root_event = Self::build_event(room, timeline_event).await?;
360
361 let num_replies = thread_summary.as_ref().map(|s| s.num_replies).unwrap_or(0);
363
364 let latest_event = if let Some(ev) = bundled_latest_thread_event.map(|b| *b) {
365 Self::build_event(room, ev).await
366 } else {
367 None
368 };
369
370 Some(ThreadListItem { root_event, latest_event, num_replies })
371 }
372
373 async fn build_event(
375 room: &Room,
376 timeline_event: TimelineEvent,
377 ) -> Option<ThreadListItemEvent> {
378 let event_id = timeline_event.event_id()?;
379 let timestamp = timeline_event.timestamp()?;
380 let sender = timeline_event.sender()?;
381 let is_own = room.own_user_id() == sender;
382
383 let raw = timeline_event.into_raw();
384 let deserialized = match raw.deserialize() {
385 Ok(ev) => ev,
386 Err(e) => {
387 error!("Failed deserializing thread event for latest_event: {e}");
388 return None;
389 }
390 };
391
392 let sender_profile = room
393 .profile_from_user_id(&sender)
394 .await
395 .map(TimelineDetails::Ready)
396 .unwrap_or(TimelineDetails::Unavailable);
397
398 let content: Option<TimelineItemContent> = match TimelineAction::from_event(
399 deserialized,
400 &raw,
401 room,
402 None,
403 None,
404 None,
405 None,
406 )
407 .await
408 {
409 Some(TimelineAction::AddItem { content }) => Some(content),
410 Some(TimelineAction::HandleAggregation {
411 kind: HandleAggregationKind::Edit { replacement: Replacement { new_content, .. } },
412 ..
413 }) => {
414 match TimelineAction::from_content(
415 AnyMessageLikeEventContent::RoomMessage(RoomMessageEventContent::new(
416 new_content.msgtype,
417 )),
418 None,
419 None,
420 None,
421 ) {
422 TimelineAction::AddItem { content } => Some(content),
423 _ => None,
424 }
425 }
426 _ => None,
427 };
428
429 Some(ThreadListItemEvent { event_id, timestamp, sender, is_own, sender_profile, content })
430 }
431
432 async fn event_cache_listener_loop(
438 room: &Room,
439 subscriber: &mut RoomEventCacheSubscriber,
440 items: Arc<Mutex<ObservableVector<ThreadListItem>>>,
441 ) {
442 use tokio::sync::broadcast::error::RecvError;
443
444 loop {
445 let update = match subscriber.recv().await {
446 Ok(update) => update,
447 Err(RecvError::Closed) => {
448 error!("ThreadListService: event cache channel closed, stopping listener");
449 break;
450 }
451 Err(RecvError::Lagged(n)) => {
452 warn!("ThreadListService: lagged behind {n} event cache updates");
453 continue;
454 }
455 };
456
457 if let RoomEventCacheUpdate::UpdateTimelineEvents(timeline_diffs) = update {
458 let new_events = Self::collect_events_from_diffs(timeline_diffs.diffs);
459
460 for event in new_events {
461 let Some(thread_root) = extract_thread_root(event.raw()) else { continue };
463
464 let position = {
466 let guard = items.lock();
467 guard.iter().position(|item| item.root_event.event_id == thread_root)
468 };
469
470 if let Some(index) = position {
471 if let Some(latest_event) = Self::build_event(room, event).await {
473 let mut guard = items.lock();
474
475 if index < guard.len()
478 && guard[index].root_event.event_id == thread_root
479 {
480 let mut updated = guard[index].clone();
481 updated.latest_event = Some(latest_event);
482 updated.num_replies = updated.num_replies.saturating_add(1);
483 guard.set(index, updated);
484 }
485 }
486 }
487 }
488 }
489 }
490 }
491
492 fn collect_events_from_diffs(
494 diffs: Vec<VectorDiff<matrix_sdk_base::event_cache::Event>>,
495 ) -> Vec<matrix_sdk_base::event_cache::Event> {
496 let mut events = Vec::new();
497
498 for diff in diffs {
499 match diff {
500 VectorDiff::Append { values } => events.extend(values),
501 VectorDiff::PushBack { value }
502 | VectorDiff::PushFront { value }
503 | VectorDiff::Insert { value, .. }
504 | VectorDiff::Set { value, .. } => events.push(value),
505 VectorDiff::Reset { values } => events.extend(values),
506 VectorDiff::Clear
508 | VectorDiff::PopBack
509 | VectorDiff::PopFront
510 | VectorDiff::Remove { .. }
511 | VectorDiff::Truncate { .. } => {}
512 }
513 }
514
515 events
516 }
517}
518
519#[derive(Clone, Debug)]
522struct ThreadList {
523 pub items: Vec<ThreadListItem>,
525
526 pub prev_batch_token: Option<String>,
528}
529
530#[cfg(test)]
531mod tests {
532 use std::time::Duration;
533
534 use futures_util::pin_mut;
535 use matrix_sdk::test_utils::mocks::MatrixMockServer;
536 use matrix_sdk_test::{async_test, event_factory::EventFactory};
537 use ruma::{event_id, events::AnyTimelineEvent, room_id, serde::Raw, user_id};
538 use serde_json::json;
539 use stream_assert::{assert_next_matches, assert_pending};
540 use wiremock::ResponseTemplate;
541
542 use super::{ThreadListPaginationState, ThreadListService};
543
544 #[async_test]
545 async fn test_initial_state() {
546 let server = MatrixMockServer::new().await;
547 let service = make_service(&server).await;
548
549 assert_eq!(
550 service.pagination_state(),
551 ThreadListPaginationState::Idle { end_reached: false }
552 );
553 assert!(service.items().is_empty());
554 }
555
556 #[async_test]
557 async fn test_pagination() {
558 let server = MatrixMockServer::new().await;
559 let client = server.client_builder().build().await;
560 let room_id = room_id!("!a:b.c");
561 let sender_id = user_id!("@alice:b.c");
562
563 let f = EventFactory::new().room(room_id).sender(sender_id);
564
565 let eid1 = event_id!("$1");
566 let eid2 = event_id!("$2");
567
568 server
569 .mock_room_threads()
570 .ok(
571 vec![f.text_msg("Thread root 1").event_id(eid1).into_raw()],
572 Some("next_page_token".to_owned()),
573 )
574 .mock_once()
575 .mount()
576 .await;
577
578 server
579 .mock_room_threads()
580 .match_from("next_page_token")
581 .ok(vec![f.text_msg("Thread root 2").event_id(eid2).into_raw()], None)
582 .mock_once()
583 .mount()
584 .await;
585
586 let room = server.sync_joined_room(&client, room_id).await;
587 let service = ThreadListService::new(room);
588
589 service.paginate().await.expect("first paginate failed");
590
591 assert_eq!(
592 service.pagination_state(),
593 ThreadListPaginationState::Idle { end_reached: false }
594 );
595 assert_eq!(service.items().len(), 1);
596 assert_eq!(service.items()[0].root_event.event_id, eid1);
597
598 service.paginate().await.expect("second paginate failed");
599
600 assert_eq!(
601 service.pagination_state(),
602 ThreadListPaginationState::Idle { end_reached: true }
603 );
604 assert_eq!(service.items().len(), 2);
605 assert_eq!(service.items()[1].root_event.event_id, eid2);
606 }
607
608 #[async_test]
609 async fn test_pagination_end_reached() {
610 let server = MatrixMockServer::new().await;
611 let client = server.client_builder().build().await;
612 let room_id = room_id!("!a:b.c");
613 let sender_id = user_id!("@alice:b.c");
614 let f = EventFactory::new().room(room_id).sender(sender_id);
615 let eid1 = event_id!("$1");
616
617 server
618 .mock_room_threads()
619 .ok(vec![f.text_msg("Thread root").event_id(eid1).into_raw()], None)
620 .mock_once()
621 .mount()
622 .await;
623
624 let room = server.sync_joined_room(&client, room_id).await;
625 let service = ThreadListService::new(room);
626
627 service.paginate().await.expect("paginate failed");
628 assert_eq!(
629 service.pagination_state(),
630 ThreadListPaginationState::Idle { end_reached: true }
631 );
632 assert_eq!(service.items().len(), 1);
633
634 service.paginate().await.expect("second paginate should be a no-op");
635 assert_eq!(service.items().len(), 1);
636 assert_eq!(
637 service.pagination_state(),
638 ThreadListPaginationState::Idle { end_reached: true }
639 );
640 }
641
642 #[async_test]
647 async fn test_concurrent_pagination_is_not_possible() {
648 let server = MatrixMockServer::new().await;
649 let client = server.client_builder().build().await;
650 let room_id = room_id!("!a:b.c");
651 let sender_id = user_id!("@alice:b.c");
652 let f = EventFactory::new().room(room_id).sender(sender_id);
653 let eid1 = event_id!("$1");
654
655 let chunk: Vec<Raw<AnyTimelineEvent>> =
659 vec![f.text_msg("Thread root").event_id(eid1).into_raw()];
660 server
661 .mock_room_threads()
662 .respond_with(
663 ResponseTemplate::new(200)
664 .set_body_json(json!({ "chunk": chunk, "next_batch": null }))
665 .set_delay(Duration::from_millis(100)),
666 )
667 .expect(1)
668 .mount()
669 .await;
670
671 let room = server.sync_joined_room(&client, room_id).await;
672 let service = ThreadListService::new(room);
673
674 let (first, second) = tokio::join!(service.paginate(), service.paginate());
676
677 first.expect("first paginate should succeed");
678 second.expect("second (concurrent) paginate should succeed as a no-op");
679
680 assert_eq!(service.items().len(), 1);
682 assert_eq!(service.items()[0].root_event.event_id, eid1);
683 assert_eq!(
684 service.pagination_state(),
685 ThreadListPaginationState::Idle { end_reached: true }
686 );
687 }
688
689 #[async_test]
693 async fn test_pagination_error() {
694 let server = MatrixMockServer::new().await;
695 let client = server.client_builder().build().await;
696 let room_id = room_id!("!a:b.c");
697
698 server.mock_room_threads().error500().mock_once().mount().await;
699
700 let room = server.sync_joined_room(&client, room_id).await;
701 let service = ThreadListService::new(room);
702
703 service.paginate().await.expect_err("paginate should fail on a 500 response");
705
706 assert_eq!(
709 service.pagination_state(),
710 ThreadListPaginationState::Idle { end_reached: false }
711 );
712
713 assert!(service.items().is_empty());
715 }
716
717 #[async_test]
718 async fn test_reset() {
719 let server = MatrixMockServer::new().await;
720 let client = server.client_builder().build().await;
721 let room_id = room_id!("!a:b.c");
722 let sender_id = user_id!("@alice:b.c");
723 let f = EventFactory::new().room(room_id).sender(sender_id);
724 let eid1 = event_id!("$1");
725
726 server
727 .mock_room_threads()
728 .ok(vec![f.text_msg("Thread root").event_id(eid1).into_raw()], None)
729 .expect(2)
730 .mount()
731 .await;
732
733 let room = server.sync_joined_room(&client, room_id).await;
734 let service = ThreadListService::new(room);
735
736 service.paginate().await.expect("first paginate failed");
737 assert_eq!(service.items().len(), 1);
738 assert_eq!(
739 service.pagination_state(),
740 ThreadListPaginationState::Idle { end_reached: true }
741 );
742
743 service.reset().await;
744 assert!(service.items().is_empty());
745 assert_eq!(
746 service.pagination_state(),
747 ThreadListPaginationState::Idle { end_reached: false }
748 );
749
750 service.paginate().await.expect("paginate after reset failed");
751 assert_eq!(service.items().len(), 1);
752 }
753
754 #[async_test]
755 async fn test_pagination_state_subscriber() {
756 let server = MatrixMockServer::new().await;
757 let client = server.client_builder().build().await;
758 let room_id = room_id!("!a:b.c");
759 let sender_id = user_id!("@alice:b.c");
760 let f = EventFactory::new().room(room_id).sender(sender_id);
761 let eid1 = event_id!("$1");
762
763 server
764 .mock_room_threads()
765 .ok(
766 vec![f.text_msg("Thread root").event_id(eid1).into_raw()],
767 Some("next_token".to_owned()),
768 )
769 .mock_once()
770 .mount()
771 .await;
772
773 let room = server.sync_joined_room(&client, room_id).await;
774 let service = ThreadListService::new(room);
775
776 let subscriber = service.subscribe_to_pagination_state_updates();
777 pin_mut!(subscriber);
778
779 assert_pending!(subscriber);
780
781 service.paginate().await.expect("paginate failed");
782
783 assert_next_matches!(subscriber, ThreadListPaginationState::Idle { end_reached: false });
784 }
785
786 #[async_test]
787 async fn test_paginated_items_have_num_replies_zero_without_summary() {
788 let server = MatrixMockServer::new().await;
789 let client = server.client_builder().build().await;
790 let room_id = room_id!("!a:b.c");
791 let sender_id = user_id!("@alice:b.c");
792 let f = EventFactory::new().room(room_id).sender(sender_id);
793 let eid1 = event_id!("$1");
794
795 server
797 .mock_room_threads()
798 .ok(vec![f.text_msg("Thread root").event_id(eid1).into_raw()], None)
799 .mock_once()
800 .mount()
801 .await;
802
803 let room = server.sync_joined_room(&client, room_id).await;
804 let service = ThreadListService::new(room);
805
806 service.paginate().await.expect("paginate failed");
807
808 let items = service.items();
809 assert_eq!(items.len(), 1);
810 assert_eq!(items[0].num_replies, 0);
811 assert!(items[0].latest_event.is_none());
812 }
813
814 #[async_test]
815 async fn test_paginated_items_have_num_replies_from_bundled_summary() {
816 let server = MatrixMockServer::new().await;
817 let client = server.client_builder().build().await;
818 let room_id = room_id!("!a:b.c");
819 let sender_id = user_id!("@alice:b.c");
820 let f = EventFactory::new().room(room_id).sender(sender_id);
821 let root_id = event_id!("$root");
822 let reply_id = event_id!("$reply");
823
824 let reply_event =
828 f.text_msg("Reply in thread").event_id(reply_id).into_raw_sync().cast_unchecked();
829
830 let thread_root = f
832 .text_msg("Thread root")
833 .event_id(root_id)
834 .with_bundled_thread_summary(reply_event, 3, false)
835 .into_raw();
836
837 server.mock_room_threads().ok(vec![thread_root], None).mock_once().mount().await;
838
839 let room = server.sync_joined_room(&client, room_id).await;
840 let service = ThreadListService::new(room);
841
842 service.paginate().await.expect("paginate failed");
843
844 let items = service.items();
845 assert_eq!(items.len(), 1);
846 assert_eq!(items[0].root_event.event_id, root_id);
847 assert_eq!(items[0].num_replies, 3);
848
849 let latest = items[0].latest_event.as_ref().expect("should have latest_event");
851 assert_eq!(latest.event_id, reply_id);
852 assert_eq!(latest.sender.as_str(), sender_id.as_str());
853 }
854
855 async fn make_service(server: &MatrixMockServer) -> ThreadListService {
858 let client = server.client_builder().build().await;
859 let room_id = room_id!("!a:b.c");
860 let room = server.sync_joined_room(&client, room_id).await;
861 ThreadListService::new(room)
862 }
863}