1use std::{fs, path::PathBuf, sync::Arc};
20
21use algorithms::rfind_event_by_item_id;
22use event_item::{extract_room_msg_edit_content, TimelineItemHandle};
23use eyeball_im::VectorDiff;
24use futures_core::Stream;
25use imbl::Vector;
26use matrix_sdk::{
27 attachment::AttachmentConfig,
28 event_cache::{EventCacheDropHandles, RoomEventCache},
29 event_handler::EventHandlerHandle,
30 executor::JoinHandle,
31 room::{edit::EditedContent, Receipts, Room},
32 send_queue::{RoomSendQueueError, SendHandle},
33 Client, Result,
34};
35use mime::Mime;
36use pinned_events_loader::PinnedEventsRoom;
37use ruma::{
38 api::client::receipt::create_receipt::v3::ReceiptType,
39 events::{
40 poll::unstable_start::{NewUnstablePollStartEventContent, UnstablePollStartEventContent},
41 receipt::{Receipt, ReceiptThread},
42 room::{
43 message::{
44 AddMentions, ForwardThread, OriginalRoomMessageEvent,
45 RoomMessageEventContentWithoutRelation,
46 },
47 pinned_events::RoomPinnedEventsEventContent,
48 },
49 AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
50 SyncMessageLikeEvent,
51 },
52 serde::Raw,
53 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomVersionId, UserId,
54};
55use subscriber::TimelineWithDropHandle;
56use thiserror::Error;
57use tracing::{error, instrument, trace, warn};
58
59use self::{
60 algorithms::rfind_event_by_id, controller::TimelineController, futures::SendAttachment,
61};
62
63mod algorithms;
64mod builder;
65mod controller;
66mod date_dividers;
67mod error;
68mod event_handler;
69mod event_item;
70pub mod event_type_filter;
71pub mod futures;
72mod item;
73mod pagination;
74mod pinned_events_loader;
75mod subscriber;
76#[cfg(test)]
77mod tests;
78mod to_device;
79mod traits;
80mod virtual_item;
81
82pub use self::{
83 builder::TimelineBuilder,
84 controller::default_event_filter,
85 error::*,
86 event_item::{
87 AnyOtherFullStateEventContent, EncryptedMessage, EventItemOrigin, EventSendState,
88 EventTimelineItem, InReplyToDetails, MemberProfileChange, MembershipChange, Message,
89 OtherState, PollResult, PollState, Profile, ReactionInfo, ReactionStatus,
90 ReactionsByKeyBySender, RepliedToEvent, RoomMembershipChange, RoomPinnedEventsChange,
91 Sticker, TimelineDetails, TimelineEventItemId, TimelineItemContent,
92 },
93 event_type_filter::TimelineEventTypeFilter,
94 item::{TimelineItem, TimelineItemKind, TimelineUniqueId},
95 pagination::LiveBackPaginationStatus,
96 traits::RoomExt,
97 virtual_item::VirtualTimelineItem,
98};
99
100#[derive(Debug, Clone)]
102pub struct RepliedToInfo {
103 event_id: OwnedEventId,
105 sender: OwnedUserId,
107 timestamp: MilliSecondsSinceUnixEpoch,
109 content: ReplyContent,
111}
112
113impl RepliedToInfo {
114 pub fn event_id(&self) -> &EventId {
116 &self.event_id
117 }
118
119 pub fn sender(&self) -> &UserId {
121 &self.sender
122 }
123
124 pub fn content(&self) -> &ReplyContent {
126 &self.content
127 }
128}
129
130#[derive(Debug, Clone)]
132pub enum ReplyContent {
133 Message(Message),
135 Raw(Raw<AnySyncTimelineEvent>),
137}
138
139#[derive(Debug)]
145pub struct Timeline {
146 controller: TimelineController,
149
150 event_cache: RoomEventCache,
152
153 drop_handle: Arc<TimelineDropHandle>,
155}
156
157#[derive(Clone, Debug, PartialEq)]
159pub enum TimelineFocus {
160 Live,
163
164 Event { target: OwnedEventId, num_context_events: u16 },
166
167 PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 },
169}
170
171impl TimelineFocus {
172 pub(super) fn debug_string(&self) -> String {
173 match self {
174 TimelineFocus::Live => "live".to_owned(),
175 TimelineFocus::Event { target, .. } => format!("permalink:{target}"),
176 TimelineFocus::PinnedEvents { .. } => "pinned-events".to_owned(),
177 }
178 }
179}
180
181#[derive(Debug, Clone)]
184pub enum DateDividerMode {
185 Daily,
186 Monthly,
187}
188
189impl Timeline {
190 pub fn builder(room: &Room) -> TimelineBuilder {
192 TimelineBuilder::new(room)
193 }
194
195 pub fn room(&self) -> &Room {
197 self.controller.room()
198 }
199
200 pub async fn clear(&self) {
202 self.controller.clear().await;
203 }
204
205 pub async fn retry_decryption<S: Into<String>>(
230 &self,
231 session_ids: impl IntoIterator<Item = S>,
232 ) {
233 self.controller
234 .retry_event_decryption(
235 self.room(),
236 Some(session_ids.into_iter().map(Into::into).collect()),
237 )
238 .await;
239 }
240
241 #[tracing::instrument(skip(self))]
242 async fn retry_decryption_for_all_events(&self) {
243 self.controller.retry_event_decryption(self.room(), None).await;
244 }
245
246 pub async fn item_by_event_id(&self, event_id: &EventId) -> Option<EventTimelineItem> {
255 let items = self.controller.items().await;
256 let (_, item) = rfind_event_by_id(&items, event_id)?;
257 Some(item.to_owned())
258 }
259
260 pub async fn latest_event(&self) -> Option<EventTimelineItem> {
262 if self.controller.is_live().await {
263 self.controller.items().await.last()?.as_event().cloned()
264 } else {
265 None
266 }
267 }
268
269 pub async fn subscribe(
276 &self,
277 ) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>>) {
278 let (items, stream) = self.controller.subscribe().await;
279 let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
280 (items, stream)
281 }
282
283 #[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
301 pub async fn send(
302 &self,
303 content: AnyMessageLikeEventContent,
304 ) -> Result<SendHandle, RoomSendQueueError> {
305 self.room().send_queue().send(content).await
306 }
307
308 #[instrument(skip(self, content, replied_to_info))]
329 pub async fn send_reply(
330 &self,
331 content: RoomMessageEventContentWithoutRelation,
332 replied_to_info: RepliedToInfo,
333 forward_thread: ForwardThread,
334 ) -> Result<(), RoomSendQueueError> {
335 let event_id = replied_to_info.event_id;
336
337 let mention_the_sender = if self.room().own_user_id() == replied_to_info.sender {
345 AddMentions::No
346 } else {
347 AddMentions::Yes
348 };
349
350 let content = match replied_to_info.content {
351 ReplyContent::Message(msg) => {
352 let event = OriginalRoomMessageEvent {
353 event_id: event_id.to_owned(),
354 sender: replied_to_info.sender,
355 origin_server_ts: replied_to_info.timestamp,
356 room_id: self.room().room_id().to_owned(),
357 content: msg.to_content(),
358 unsigned: Default::default(),
359 };
360 content.make_reply_to(&event, forward_thread, mention_the_sender)
361 }
362 ReplyContent::Raw(raw_event) => content.make_reply_to_raw(
363 &raw_event,
364 event_id.to_owned(),
365 self.room().room_id(),
366 forward_thread,
367 mention_the_sender,
368 ),
369 };
370
371 self.send(content.into()).await?;
372
373 Ok(())
374 }
375
376 pub async fn replied_to_info_from_event_id(
378 &self,
379 event_id: &EventId,
380 ) -> Result<RepliedToInfo, UnsupportedReplyItem> {
381 if let Some(timeline_item) = self.item_by_event_id(event_id).await {
382 return timeline_item.replied_to_info();
383 }
384
385 let event = self.room().event(event_id, None).await.map_err(|error| {
386 error!("Failed to fetch event with ID {event_id} with error: {error}");
387 UnsupportedReplyItem::MissingEvent
388 })?;
389
390 let raw_sync_event = event.into_raw();
391 let sync_event = raw_sync_event.deserialize().map_err(|error| {
392 error!("Failed to deserialize event with ID {event_id} with error: {error}");
393 UnsupportedReplyItem::FailedToDeserializeEvent
394 })?;
395
396 let reply_content = match &sync_event {
397 AnySyncTimelineEvent::MessageLike(message_like_event) => {
398 if let AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(
399 original_message,
400 )) = message_like_event
401 {
402 let reactions = Default::default();
404 ReplyContent::Message(Message::from_event(
405 original_message.content.clone(),
406 extract_room_msg_edit_content(message_like_event.relations()),
407 &self.items().await,
408 reactions,
409 ))
410 } else {
411 ReplyContent::Raw(raw_sync_event)
412 }
413 }
414 AnySyncTimelineEvent::State(_) => return Err(UnsupportedReplyItem::StateEvent),
415 };
416
417 Ok(RepliedToInfo {
418 event_id: event_id.to_owned(),
419 sender: sync_event.sender().to_owned(),
420 timestamp: sync_event.origin_server_ts(),
421 content: reply_content,
422 })
423 }
424
425 #[instrument(skip(self, new_content))]
430 pub async fn edit(
431 &self,
432 item_id: &TimelineEventItemId,
433 new_content: EditedContent,
434 ) -> Result<(), Error> {
435 let items = self.items().await;
436 let Some((_pos, item)) = rfind_event_by_item_id(&items, item_id) else {
437 return Err(Error::EventNotInTimeline(item_id.clone()));
438 };
439
440 match item.handle() {
441 TimelineItemHandle::Remote(event_id) => {
442 let content = self
443 .room()
444 .make_edit_event(event_id, new_content)
445 .await
446 .map_err(EditError::RoomError)?;
447 self.send(content).await?;
448 Ok(())
449 }
450
451 TimelineItemHandle::Local(handle) => {
452 let new_content: AnyMessageLikeEventContent = match new_content {
454 EditedContent::RoomMessage(message) => {
455 if matches!(item.content, TimelineItemContent::Message(_)) {
456 AnyMessageLikeEventContent::RoomMessage(message.into())
457 } else {
458 return Err(EditError::ContentMismatch {
459 original: item.content.debug_string().to_owned(),
460 new: "a message".to_owned(),
461 }
462 .into());
463 }
464 }
465
466 EditedContent::PollStart { new_content, .. } => {
467 if matches!(item.content, TimelineItemContent::Poll(_)) {
468 AnyMessageLikeEventContent::UnstablePollStart(
469 UnstablePollStartEventContent::New(
470 NewUnstablePollStartEventContent::new(new_content),
471 ),
472 )
473 } else {
474 return Err(EditError::ContentMismatch {
475 original: item.content.debug_string().to_owned(),
476 new: "a poll".to_owned(),
477 }
478 .into());
479 }
480 }
481
482 EditedContent::MediaCaption { caption, formatted_caption, mentions } => {
483 if handle
484 .edit_media_caption(caption, formatted_caption, mentions)
485 .await
486 .map_err(RoomSendQueueError::StorageError)?
487 {
488 return Ok(());
489 }
490 return Err(EditError::InvalidLocalEchoState.into());
491 }
492 };
493
494 if !handle.edit(new_content).await.map_err(RoomSendQueueError::StorageError)? {
495 return Err(EditError::InvalidLocalEchoState.into());
496 }
497
498 Ok(())
499 }
500 }
501 }
502
503 pub async fn toggle_reaction(
513 &self,
514 item_id: &TimelineEventItemId,
515 reaction_key: &str,
516 ) -> Result<(), Error> {
517 self.controller.toggle_reaction_local(item_id, reaction_key).await?;
518 Ok(())
519 }
520
521 #[instrument(skip_all)]
545 pub fn send_attachment(
546 &self,
547 source: impl Into<AttachmentSource>,
548 mime_type: Mime,
549 config: AttachmentConfig,
550 ) -> SendAttachment<'_> {
551 SendAttachment::new(self, source.into(), mime_type, config)
552 }
553
554 pub async fn redact(
557 &self,
558 item_id: &TimelineEventItemId,
559 reason: Option<&str>,
560 ) -> Result<(), Error> {
561 let items = self.items().await;
562 let Some((_pos, event)) = rfind_event_by_item_id(&items, item_id) else {
563 return Err(RedactError::ItemNotFound(item_id.clone()).into());
564 };
565
566 match event.handle() {
567 TimelineItemHandle::Remote(event_id) => {
568 self.room().redact(event_id, reason, None).await.map_err(RedactError::HttpError)?;
569 }
570 TimelineItemHandle::Local(handle) => {
571 if !handle.abort().await.map_err(RoomSendQueueError::StorageError)? {
572 return Err(RedactError::InvalidLocalEchoState.into());
573 }
574 }
575 }
576
577 Ok(())
578 }
579
580 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
600 pub async fn fetch_details_for_event(&self, event_id: &EventId) -> Result<(), Error> {
601 self.controller.fetch_in_reply_to_details(event_id).await
602 }
603
604 #[instrument(skip_all)]
612 pub async fn fetch_members(&self) {
613 self.controller.set_sender_profiles_pending().await;
614 match self.room().sync_members().await {
615 Ok(_) => {
616 self.controller.update_missing_sender_profiles().await;
617 }
618 Err(e) => {
619 self.controller.set_sender_profiles_error(Arc::new(e)).await;
620 }
621 }
622 }
623
624 #[instrument(skip(self))]
630 pub async fn latest_user_read_receipt(
631 &self,
632 user_id: &UserId,
633 ) -> Option<(OwnedEventId, Receipt)> {
634 self.controller.latest_user_read_receipt(user_id).await
635 }
636
637 #[instrument(skip(self))]
645 pub async fn latest_user_read_receipt_timeline_event_id(
646 &self,
647 user_id: &UserId,
648 ) -> Option<OwnedEventId> {
649 self.controller.latest_user_read_receipt_timeline_event_id(user_id).await
650 }
651
652 pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
654 self.controller.subscribe_own_user_read_receipts_changed().await
655 }
656
657 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
665 pub async fn send_single_receipt(
666 &self,
667 receipt_type: ReceiptType,
668 thread: ReceiptThread,
669 event_id: OwnedEventId,
670 ) -> Result<bool> {
671 if !self.controller.should_send_receipt(&receipt_type, &thread, &event_id).await {
672 trace!(
673 "not sending receipt, because we already cover the event with a previous receipt"
674 );
675 return Ok(false);
676 }
677
678 trace!("sending receipt");
679 self.room().send_single_receipt(receipt_type, thread, event_id).await?;
680 Ok(true)
681 }
682
683 #[instrument(skip(self))]
690 pub async fn send_multiple_receipts(&self, mut receipts: Receipts) -> Result<()> {
691 if let Some(fully_read) = &receipts.fully_read {
692 if !self
693 .controller
694 .should_send_receipt(
695 &ReceiptType::FullyRead,
696 &ReceiptThread::Unthreaded,
697 fully_read,
698 )
699 .await
700 {
701 receipts.fully_read = None;
702 }
703 }
704
705 if let Some(read_receipt) = &receipts.public_read_receipt {
706 if !self
707 .controller
708 .should_send_receipt(&ReceiptType::Read, &ReceiptThread::Unthreaded, read_receipt)
709 .await
710 {
711 receipts.public_read_receipt = None;
712 }
713 }
714
715 if let Some(private_read_receipt) = &receipts.private_read_receipt {
716 if !self
717 .controller
718 .should_send_receipt(
719 &ReceiptType::ReadPrivate,
720 &ReceiptThread::Unthreaded,
721 private_read_receipt,
722 )
723 .await
724 {
725 receipts.private_read_receipt = None;
726 }
727 }
728
729 self.room().send_multiple_receipts(receipts).await
730 }
731
732 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
741 pub async fn mark_as_read(&self, receipt_type: ReceiptType) -> Result<bool> {
742 if let Some(event_id) = self.controller.latest_event_id().await {
743 self.send_single_receipt(receipt_type, ReceiptThread::Unthreaded, event_id).await
744 } else {
745 trace!("can't mark room as read because there's no latest event id");
746 Ok(false)
747 }
748 }
749
750 pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
760 let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
761 event_ids
762 } else {
763 self.room().load_pinned_events().await?.unwrap_or_default()
764 };
765 let event_id = event_id.to_owned();
766 if pinned_event_ids.contains(&event_id) {
767 Ok(false)
768 } else {
769 pinned_event_ids.push(event_id);
770 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
771 self.room().send_state_event(content).await?;
772 Ok(true)
773 }
774 }
775
776 pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
786 let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
787 event_ids
788 } else {
789 self.room().load_pinned_events().await?.unwrap_or_default()
790 };
791 let event_id = event_id.to_owned();
792 if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
793 pinned_event_ids.remove(idx);
794 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
795 self.room().send_state_event(content).await?;
796 Ok(true)
797 } else {
798 Ok(false)
799 }
800 }
801}
802
803#[doc(hidden)]
805impl Timeline {
806 pub async fn items(&self) -> Vector<Arc<TimelineItem>> {
808 self.controller.items().await
809 }
810
811 pub async fn subscribe_filter_map<U: Clone>(
812 &self,
813 f: impl Fn(Arc<TimelineItem>) -> Option<U>,
814 ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>) {
815 let (items, stream) = self.controller.subscribe_filter_map(f).await;
816 let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
817 (items, stream)
818 }
819}
820
821#[derive(Debug)]
822struct TimelineDropHandle {
823 client: Client,
824 event_handler_handles: Vec<EventHandlerHandle>,
825 room_update_join_handle: JoinHandle<()>,
826 pinned_events_join_handle: Option<JoinHandle<()>>,
827 room_key_from_backups_join_handle: JoinHandle<()>,
828 room_keys_received_join_handle: JoinHandle<()>,
829 room_key_backup_enabled_join_handle: JoinHandle<()>,
830 local_echo_listener_handle: JoinHandle<()>,
831 _event_cache_drop_handle: Arc<EventCacheDropHandles>,
832 encryption_changes_handle: JoinHandle<()>,
833}
834
835impl Drop for TimelineDropHandle {
836 fn drop(&mut self) {
837 for handle in self.event_handler_handles.drain(..) {
838 self.client.remove_event_handler(handle);
839 }
840
841 if let Some(handle) = self.pinned_events_join_handle.take() {
842 handle.abort()
843 };
844
845 self.local_echo_listener_handle.abort();
846 self.room_update_join_handle.abort();
847 self.room_key_from_backups_join_handle.abort();
848 self.room_key_backup_enabled_join_handle.abort();
849 self.room_keys_received_join_handle.abort();
850 self.encryption_changes_handle.abort();
851 }
852}
853
854pub type TimelineEventFilterFn =
855 dyn Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync;
856
857#[derive(Debug, Clone)]
862pub enum AttachmentSource {
863 Data {
865 bytes: Vec<u8>,
867
868 filename: String,
870 },
871
872 File(PathBuf),
876}
877
878impl AttachmentSource {
879 pub(crate) fn try_into_bytes_and_filename(self) -> Result<(Vec<u8>, String), Error> {
881 match self {
882 Self::Data { bytes, filename } => Ok((bytes, filename)),
883 Self::File(path) => {
884 let filename = path
885 .file_name()
886 .ok_or(Error::InvalidAttachmentFileName)?
887 .to_str()
888 .ok_or(Error::InvalidAttachmentFileName)?
889 .to_owned();
890 let bytes = fs::read(&path).map_err(|_| Error::InvalidAttachmentData)?;
891 Ok((bytes, filename))
892 }
893 }
894 }
895}
896
897impl<P> From<P> for AttachmentSource
898where
899 P: Into<PathBuf>,
900{
901 fn from(value: P) -> Self {
902 Self::File(value.into())
903 }
904}