1use std::{fs, path::PathBuf, sync::Arc};
20
21use algorithms::rfind_event_by_item_id;
22use event_item::TimelineItemHandle;
23use eyeball_im::VectorDiff;
24use futures_core::Stream;
25use imbl::Vector;
26use matrix_sdk::{
27 attachment::AttachmentConfig,
28 event_cache::{EventCacheDropHandles, RoomEventCache},
29 event_handler::EventHandlerHandle,
30 executor::JoinHandle,
31 room::{edit::EditedContent, reply::EnforceThread, Receipts, Room},
32 send_queue::{RoomSendQueueError, SendHandle},
33 Client, Result,
34};
35use mime::Mime;
36use pinned_events_loader::PinnedEventsRoom;
37use ruma::{
38 api::client::receipt::create_receipt::v3::ReceiptType,
39 events::{
40 poll::unstable_start::{NewUnstablePollStartEventContent, UnstablePollStartEventContent},
41 receipt::{Receipt, ReceiptThread},
42 room::{
43 message::RoomMessageEventContentWithoutRelation,
44 pinned_events::RoomPinnedEventsEventContent,
45 },
46 AnyMessageLikeEventContent, AnySyncTimelineEvent,
47 },
48 EventId, OwnedEventId, RoomVersionId, UserId,
49};
50use subscriber::TimelineWithDropHandle;
51use thiserror::Error;
52use tracing::{instrument, trace, warn};
53
54use self::{
55 algorithms::rfind_event_by_id, controller::TimelineController, futures::SendAttachment,
56};
57
58mod algorithms;
59mod builder;
60mod controller;
61mod date_dividers;
62mod error;
63mod event_handler;
64mod event_item;
65pub mod event_type_filter;
66pub mod futures;
67mod item;
68mod pagination;
69mod pinned_events_loader;
70mod subscriber;
71#[cfg(test)]
72mod tests;
73mod to_device;
74mod traits;
75mod virtual_item;
76
77pub use self::{
78 builder::TimelineBuilder,
79 controller::default_event_filter,
80 error::*,
81 event_item::{
82 AnyOtherFullStateEventContent, EncryptedMessage, EventItemOrigin, EventSendState,
83 EventTimelineItem, InReplyToDetails, MemberProfileChange, MembershipChange, Message,
84 MsgLikeContent, MsgLikeKind, OtherState, PollResult, PollState, Profile, ReactionInfo,
85 ReactionStatus, ReactionsByKeyBySender, RepliedToEvent, RoomMembershipChange,
86 RoomPinnedEventsChange, Sticker, TimelineDetails, TimelineEventItemId, TimelineItemContent,
87 },
88 event_type_filter::TimelineEventTypeFilter,
89 item::{TimelineItem, TimelineItemKind, TimelineUniqueId},
90 traits::RoomExt,
91 virtual_item::VirtualTimelineItem,
92};
93
94#[derive(Debug)]
100pub struct Timeline {
101 controller: TimelineController,
104
105 event_cache: RoomEventCache,
107
108 drop_handle: Arc<TimelineDropHandle>,
110}
111
112#[derive(Clone, Debug, PartialEq)]
114pub enum TimelineFocus {
115 Live,
118
119 Event { target: OwnedEventId, num_context_events: u16 },
121
122 PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 },
124}
125
126impl TimelineFocus {
127 pub(super) fn debug_string(&self) -> String {
128 match self {
129 TimelineFocus::Live => "live".to_owned(),
130 TimelineFocus::Event { target, .. } => format!("permalink:{target}"),
131 TimelineFocus::PinnedEvents { .. } => "pinned-events".to_owned(),
132 }
133 }
134}
135
136#[derive(Debug, Clone)]
139pub enum DateDividerMode {
140 Daily,
141 Monthly,
142}
143
144impl Timeline {
145 pub fn builder(room: &Room) -> TimelineBuilder {
147 TimelineBuilder::new(room)
148 }
149
150 pub fn room(&self) -> &Room {
152 self.controller.room()
153 }
154
155 pub async fn clear(&self) {
157 self.controller.clear().await;
158 }
159
160 pub async fn retry_decryption<S: Into<String>>(
185 &self,
186 session_ids: impl IntoIterator<Item = S>,
187 ) {
188 self.controller
189 .retry_event_decryption(Some(session_ids.into_iter().map(Into::into).collect()))
190 .await;
191 }
192
193 #[tracing::instrument(skip(self))]
194 async fn retry_decryption_for_all_events(&self) {
195 self.controller.retry_event_decryption(None).await;
196 }
197
198 pub async fn item_by_event_id(&self, event_id: &EventId) -> Option<EventTimelineItem> {
207 let items = self.controller.items().await;
208 let (_, item) = rfind_event_by_id(&items, event_id)?;
209 Some(item.to_owned())
210 }
211
212 pub async fn latest_event(&self) -> Option<EventTimelineItem> {
214 if self.controller.is_live().await {
215 self.controller.items().await.last()?.as_event().cloned()
216 } else {
217 None
218 }
219 }
220
221 pub async fn subscribe(
228 &self,
229 ) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>>) {
230 let (items, stream) = self.controller.subscribe().await;
231 let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
232 (items, stream)
233 }
234
235 #[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
253 pub async fn send(
254 &self,
255 content: AnyMessageLikeEventContent,
256 ) -> Result<SendHandle, RoomSendQueueError> {
257 self.room().send_queue().send(content).await
258 }
259
260 #[instrument(skip(self, content))]
278 pub async fn send_reply(
279 &self,
280 content: RoomMessageEventContentWithoutRelation,
281 event_id: OwnedEventId,
282 enforce_thread: EnforceThread,
283 ) -> Result<(), Error> {
284 let content = self.room().make_reply_event(content, &event_id, enforce_thread).await?;
285 self.send(content).await?;
286 Ok(())
287 }
288
289 #[instrument(skip(self, new_content))]
294 pub async fn edit(
295 &self,
296 item_id: &TimelineEventItemId,
297 new_content: EditedContent,
298 ) -> Result<(), Error> {
299 let items = self.items().await;
300 let Some((_pos, item)) = rfind_event_by_item_id(&items, item_id) else {
301 return Err(Error::EventNotInTimeline(item_id.clone()));
302 };
303
304 match item.handle() {
305 TimelineItemHandle::Remote(event_id) => {
306 let content = self
307 .room()
308 .make_edit_event(event_id, new_content)
309 .await
310 .map_err(EditError::RoomError)?;
311 self.send(content).await?;
312 Ok(())
313 }
314
315 TimelineItemHandle::Local(handle) => {
316 let new_content: AnyMessageLikeEventContent = match new_content {
318 EditedContent::RoomMessage(message) => {
319 if item.content.is_message() {
320 AnyMessageLikeEventContent::RoomMessage(message.into())
321 } else {
322 return Err(EditError::ContentMismatch {
323 original: item.content.debug_string().to_owned(),
324 new: "a message".to_owned(),
325 }
326 .into());
327 }
328 }
329
330 EditedContent::PollStart { new_content, .. } => {
331 if item.content.is_poll() {
332 AnyMessageLikeEventContent::UnstablePollStart(
333 UnstablePollStartEventContent::New(
334 NewUnstablePollStartEventContent::new(new_content),
335 ),
336 )
337 } else {
338 return Err(EditError::ContentMismatch {
339 original: item.content.debug_string().to_owned(),
340 new: "a poll".to_owned(),
341 }
342 .into());
343 }
344 }
345
346 EditedContent::MediaCaption { caption, formatted_caption, mentions } => {
347 if handle
348 .edit_media_caption(caption, formatted_caption, mentions)
349 .await
350 .map_err(RoomSendQueueError::StorageError)?
351 {
352 return Ok(());
353 }
354 return Err(EditError::InvalidLocalEchoState.into());
355 }
356 };
357
358 if !handle.edit(new_content).await.map_err(RoomSendQueueError::StorageError)? {
359 return Err(EditError::InvalidLocalEchoState.into());
360 }
361
362 Ok(())
363 }
364 }
365 }
366
367 pub async fn toggle_reaction(
377 &self,
378 item_id: &TimelineEventItemId,
379 reaction_key: &str,
380 ) -> Result<(), Error> {
381 self.controller.toggle_reaction_local(item_id, reaction_key).await?;
382 Ok(())
383 }
384
385 #[instrument(skip_all)]
409 pub fn send_attachment(
410 &self,
411 source: impl Into<AttachmentSource>,
412 mime_type: Mime,
413 config: AttachmentConfig,
414 ) -> SendAttachment<'_> {
415 SendAttachment::new(self, source.into(), mime_type, config)
416 }
417
418 pub async fn redact(
421 &self,
422 item_id: &TimelineEventItemId,
423 reason: Option<&str>,
424 ) -> Result<(), Error> {
425 let items = self.items().await;
426 let Some((_pos, event)) = rfind_event_by_item_id(&items, item_id) else {
427 return Err(RedactError::ItemNotFound(item_id.clone()).into());
428 };
429
430 match event.handle() {
431 TimelineItemHandle::Remote(event_id) => {
432 self.room().redact(event_id, reason, None).await.map_err(RedactError::HttpError)?;
433 }
434 TimelineItemHandle::Local(handle) => {
435 if !handle.abort().await.map_err(RoomSendQueueError::StorageError)? {
436 return Err(RedactError::InvalidLocalEchoState.into());
437 }
438 }
439 }
440
441 Ok(())
442 }
443
444 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
464 pub async fn fetch_details_for_event(&self, event_id: &EventId) -> Result<(), Error> {
465 self.controller.fetch_in_reply_to_details(event_id).await
466 }
467
468 #[instrument(skip_all)]
476 pub async fn fetch_members(&self) {
477 self.controller.set_sender_profiles_pending().await;
478 match self.room().sync_members().await {
479 Ok(_) => {
480 self.controller.update_missing_sender_profiles().await;
481 }
482 Err(e) => {
483 self.controller.set_sender_profiles_error(Arc::new(e)).await;
484 }
485 }
486 }
487
488 #[instrument(skip(self))]
494 pub async fn latest_user_read_receipt(
495 &self,
496 user_id: &UserId,
497 ) -> Option<(OwnedEventId, Receipt)> {
498 self.controller.latest_user_read_receipt(user_id).await
499 }
500
501 #[instrument(skip(self))]
509 pub async fn latest_user_read_receipt_timeline_event_id(
510 &self,
511 user_id: &UserId,
512 ) -> Option<OwnedEventId> {
513 self.controller.latest_user_read_receipt_timeline_event_id(user_id).await
514 }
515
516 pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
518 self.controller.subscribe_own_user_read_receipts_changed().await
519 }
520
521 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
529 pub async fn send_single_receipt(
530 &self,
531 receipt_type: ReceiptType,
532 thread: ReceiptThread,
533 event_id: OwnedEventId,
534 ) -> Result<bool> {
535 if !self.controller.should_send_receipt(&receipt_type, &thread, &event_id).await {
536 trace!(
537 "not sending receipt, because we already cover the event with a previous receipt"
538 );
539 return Ok(false);
540 }
541
542 trace!("sending receipt");
543 self.room().send_single_receipt(receipt_type, thread, event_id).await?;
544 Ok(true)
545 }
546
547 #[instrument(skip(self))]
554 pub async fn send_multiple_receipts(&self, mut receipts: Receipts) -> Result<()> {
555 if let Some(fully_read) = &receipts.fully_read {
556 if !self
557 .controller
558 .should_send_receipt(
559 &ReceiptType::FullyRead,
560 &ReceiptThread::Unthreaded,
561 fully_read,
562 )
563 .await
564 {
565 receipts.fully_read = None;
566 }
567 }
568
569 if let Some(read_receipt) = &receipts.public_read_receipt {
570 if !self
571 .controller
572 .should_send_receipt(&ReceiptType::Read, &ReceiptThread::Unthreaded, read_receipt)
573 .await
574 {
575 receipts.public_read_receipt = None;
576 }
577 }
578
579 if let Some(private_read_receipt) = &receipts.private_read_receipt {
580 if !self
581 .controller
582 .should_send_receipt(
583 &ReceiptType::ReadPrivate,
584 &ReceiptThread::Unthreaded,
585 private_read_receipt,
586 )
587 .await
588 {
589 receipts.private_read_receipt = None;
590 }
591 }
592
593 self.room().send_multiple_receipts(receipts).await
594 }
595
596 #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
605 pub async fn mark_as_read(&self, receipt_type: ReceiptType) -> Result<bool> {
606 if let Some(event_id) = self.controller.latest_event_id().await {
607 self.send_single_receipt(receipt_type, ReceiptThread::Unthreaded, event_id).await
608 } else {
609 trace!("can't mark room as read because there's no latest event id");
610 Ok(false)
611 }
612 }
613
614 pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
624 let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
625 event_ids
626 } else {
627 self.room().load_pinned_events().await?.unwrap_or_default()
628 };
629 let event_id = event_id.to_owned();
630 if pinned_event_ids.contains(&event_id) {
631 Ok(false)
632 } else {
633 pinned_event_ids.push(event_id);
634 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
635 self.room().send_state_event(content).await?;
636 Ok(true)
637 }
638 }
639
640 pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
650 let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
651 event_ids
652 } else {
653 self.room().load_pinned_events().await?.unwrap_or_default()
654 };
655 let event_id = event_id.to_owned();
656 if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
657 pinned_event_ids.remove(idx);
658 let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
659 self.room().send_state_event(content).await?;
660 Ok(true)
661 } else {
662 Ok(false)
663 }
664 }
665}
666
667#[doc(hidden)]
669impl Timeline {
670 pub async fn items(&self) -> Vector<Arc<TimelineItem>> {
672 self.controller.items().await
673 }
674
675 pub async fn subscribe_filter_map<U: Clone>(
676 &self,
677 f: impl Fn(Arc<TimelineItem>) -> Option<U>,
678 ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>) {
679 let (items, stream) = self.controller.subscribe_filter_map(f).await;
680 let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
681 (items, stream)
682 }
683}
684
685#[derive(Debug)]
686struct TimelineDropHandle {
687 client: Client,
688 event_handler_handles: Vec<EventHandlerHandle>,
689 room_update_join_handle: JoinHandle<()>,
690 pinned_events_join_handle: Option<JoinHandle<()>>,
691 room_key_from_backups_join_handle: JoinHandle<()>,
692 room_keys_received_join_handle: JoinHandle<()>,
693 room_key_backup_enabled_join_handle: JoinHandle<()>,
694 local_echo_listener_handle: JoinHandle<()>,
695 _event_cache_drop_handle: Arc<EventCacheDropHandles>,
696 encryption_changes_handle: JoinHandle<()>,
697}
698
699impl Drop for TimelineDropHandle {
700 fn drop(&mut self) {
701 for handle in self.event_handler_handles.drain(..) {
702 self.client.remove_event_handler(handle);
703 }
704
705 if let Some(handle) = self.pinned_events_join_handle.take() {
706 handle.abort()
707 };
708
709 self.local_echo_listener_handle.abort();
710 self.room_update_join_handle.abort();
711 self.room_key_from_backups_join_handle.abort();
712 self.room_key_backup_enabled_join_handle.abort();
713 self.room_keys_received_join_handle.abort();
714 self.encryption_changes_handle.abort();
715 }
716}
717
718pub type TimelineEventFilterFn =
719 dyn Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync;
720
721#[derive(Debug, Clone)]
726pub enum AttachmentSource {
727 Data {
729 bytes: Vec<u8>,
731
732 filename: String,
734 },
735
736 File(PathBuf),
740}
741
742impl AttachmentSource {
743 pub(crate) fn try_into_bytes_and_filename(self) -> Result<(Vec<u8>, String), Error> {
745 match self {
746 Self::Data { bytes, filename } => Ok((bytes, filename)),
747 Self::File(path) => {
748 let filename = path
749 .file_name()
750 .ok_or(Error::InvalidAttachmentFileName)?
751 .to_str()
752 .ok_or(Error::InvalidAttachmentFileName)?
753 .to_owned();
754 let bytes = fs::read(&path).map_err(|_| Error::InvalidAttachmentData)?;
755 Ok((bytes, filename))
756 }
757 }
758 }
759}
760
761impl<P> From<P> for AttachmentSource
762where
763 P: Into<PathBuf>,
764{
765 fn from(value: P) -> Self {
766 Self::File(value.into())
767 }
768}