matrix_sdk_ui/timeline/
mod.rs

1// Copyright 2022 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A high-level view into a room's contents.
16//!
17//! See [`Timeline`] for details.
18
19use std::{fs, path::PathBuf, sync::Arc};
20
21use algorithms::rfind_event_by_item_id;
22use event_item::TimelineItemHandle;
23use eyeball_im::VectorDiff;
24#[cfg(feature = "unstable-msc4274")]
25use futures::SendGallery;
26use futures_core::Stream;
27use imbl::Vector;
28use matrix_sdk::{
29    Result,
30    attachment::{AttachmentInfo, Thumbnail},
31    deserialized_responses::TimelineEvent,
32    event_cache::{EventCacheDropHandles, RoomEventCache},
33    executor::JoinHandle,
34    room::{
35        Receipts, Room,
36        edit::EditedContent,
37        reply::{EnforceThread, Reply},
38    },
39    send_queue::{RoomSendQueueError, SendHandle},
40};
41use mime::Mime;
42use pinned_events_loader::PinnedEventsRoom;
43use ruma::{
44    EventId, OwnedEventId, OwnedTransactionId, UserId,
45    api::client::receipt::create_receipt::v3::ReceiptType,
46    events::{
47        AnyMessageLikeEventContent, AnySyncTimelineEvent, Mentions,
48        poll::unstable_start::{NewUnstablePollStartEventContent, UnstablePollStartEventContent},
49        receipt::{Receipt, ReceiptThread},
50        relation::Thread,
51        room::{
52            message::{
53                FormattedBody, Relation, RelationWithoutReplacement, ReplyWithinThread,
54                RoomMessageEventContentWithoutRelation,
55            },
56            pinned_events::RoomPinnedEventsEventContent,
57        },
58    },
59    room_version_rules::RoomVersionRules,
60};
61use subscriber::TimelineWithDropHandle;
62use thiserror::Error;
63use tracing::{instrument, trace, warn};
64
65use self::{
66    algorithms::rfind_event_by_id, controller::TimelineController, futures::SendAttachment,
67};
68use crate::timeline::controller::CryptoDropHandles;
69
70mod algorithms;
71mod builder;
72mod controller;
73mod date_dividers;
74mod error;
75mod event_handler;
76mod event_item;
77pub mod event_type_filter;
78pub mod futures;
79mod item;
80mod latest_event;
81mod pagination;
82mod pinned_events_loader;
83mod subscriber;
84mod tasks;
85#[cfg(test)]
86mod tests;
87mod to_device;
88mod traits;
89mod virtual_item;
90
91pub use self::{
92    builder::TimelineBuilder,
93    controller::default_event_filter,
94    error::*,
95    event_item::{
96        AnyOtherFullStateEventContent, EmbeddedEvent, EncryptedMessage, EventItemOrigin,
97        EventSendState, EventTimelineItem, InReplyToDetails, MediaUploadProgress,
98        MemberProfileChange, MembershipChange, Message, MsgLikeContent, MsgLikeKind, OtherState,
99        PollResult, PollState, Profile, ReactionInfo, ReactionStatus, ReactionsByKeyBySender,
100        RoomMembershipChange, RoomPinnedEventsChange, Sticker, ThreadSummary, TimelineDetails,
101        TimelineEventItemId, TimelineItemContent,
102    },
103    event_type_filter::TimelineEventTypeFilter,
104    item::{TimelineItem, TimelineItemKind, TimelineUniqueId},
105    latest_event::LatestEventValue,
106    traits::RoomExt,
107    virtual_item::VirtualTimelineItem,
108};
109
110/// A high-level view into a regular¹ room's contents.
111///
112/// ¹ This type is meant to be used in the context of rooms without a
113/// `room_type`, that is rooms that are primarily used to exchange text
114/// messages.
115#[derive(Debug)]
116pub struct Timeline {
117    /// Cloneable, inner fields of the `Timeline`, shared with some background
118    /// tasks.
119    controller: TimelineController,
120
121    /// The event cache specialized for this room's view.
122    event_cache: RoomEventCache,
123
124    /// References to long-running tasks held by the timeline.
125    drop_handle: Arc<TimelineDropHandle>,
126}
127
128/// What should the timeline focus on?
129#[derive(Clone, Debug, PartialEq)]
130pub enum TimelineFocus {
131    /// Focus on live events, i.e. receive events from sync and append them in
132    /// real-time.
133    Live {
134        /// Whether to hide in-thread replies from the live timeline.
135        ///
136        /// This should be set to true when the client can create
137        /// [`Self::Thread`]-focused timelines from the thread roots themselves.
138        hide_threaded_events: bool,
139    },
140
141    /// Focus on a specific event, e.g. after clicking a permalink.
142    Event {
143        target: OwnedEventId,
144        num_context_events: u16,
145        /// Whether to hide in-thread replies from the live timeline.
146        ///
147        /// This should be set to true when the client can create
148        /// [`Self::Thread`]-focused timelines from the thread roots themselves.
149        hide_threaded_events: bool,
150    },
151
152    /// Focus on a specific thread
153    Thread { root_event_id: OwnedEventId },
154
155    /// Only show pinned events.
156    PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 },
157}
158
159impl TimelineFocus {
160    pub(super) fn debug_string(&self) -> String {
161        match self {
162            TimelineFocus::Live { .. } => "live".to_owned(),
163            TimelineFocus::Event { target, .. } => format!("permalink:{target}"),
164            TimelineFocus::Thread { root_event_id, .. } => format!("thread:{root_event_id}"),
165            TimelineFocus::PinnedEvents { .. } => "pinned-events".to_owned(),
166        }
167    }
168}
169
170/// Changes how dividers get inserted, either in between each day or in between
171/// each month
172#[derive(Debug, Clone)]
173pub enum DateDividerMode {
174    Daily,
175    Monthly,
176}
177
178/// Configuration for sending an attachment.
179///
180/// Like [`matrix_sdk::attachment::AttachmentConfig`], but instead of the
181/// `reply` field, there's only a `in_reply_to` event id; it's the timeline
182/// deciding to fill the rest of the reply parameters.
183#[derive(Debug, Default)]
184pub struct AttachmentConfig {
185    pub txn_id: Option<OwnedTransactionId>,
186    pub info: Option<AttachmentInfo>,
187    pub thumbnail: Option<Thumbnail>,
188    pub caption: Option<String>,
189    pub formatted_caption: Option<FormattedBody>,
190    pub mentions: Option<Mentions>,
191    pub in_reply_to: Option<OwnedEventId>,
192}
193
194impl Timeline {
195    /// Returns the room for this timeline.
196    pub fn room(&self) -> &Room {
197        self.controller.room()
198    }
199
200    /// Clear all timeline items.
201    pub async fn clear(&self) {
202        self.controller.clear().await;
203    }
204
205    /// Retry decryption of previously un-decryptable events given a list of
206    /// session IDs whose keys have been imported.
207    ///
208    /// # Examples
209    ///
210    /// ```no_run
211    /// # use std::{path::PathBuf, time::Duration};
212    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::room_id};
213    /// # use matrix_sdk_ui::Timeline;
214    /// # async {
215    /// # let mut client: Client = todo!();
216    /// # let room_id = ruma::room_id!("!example:example.org");
217    /// # let timeline: Timeline = todo!();
218    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
219    /// let result =
220    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
221    ///
222    /// // Given a timeline for a specific room_id
223    /// if let Some(keys_for_users) = result.keys.get(room_id) {
224    ///     let session_ids = keys_for_users.values().flatten();
225    ///     timeline.retry_decryption(session_ids).await;
226    /// }
227    /// # anyhow::Ok(()) };
228    /// ```
229    pub async fn retry_decryption<S: Into<String>>(
230        &self,
231        session_ids: impl IntoIterator<Item = S>,
232    ) {
233        self.controller
234            .retry_event_decryption(Some(session_ids.into_iter().map(Into::into).collect()))
235            .await;
236    }
237
238    #[tracing::instrument(skip(self))]
239    async fn retry_decryption_for_all_events(&self) {
240        self.controller.retry_event_decryption(None).await;
241    }
242
243    /// Get the current timeline item for the given event ID, if any.
244    ///
245    /// Will return a remote event, *or* a local echo that has been sent but not
246    /// yet replaced by a remote echo.
247    ///
248    /// It's preferable to store the timeline items in the model for your UI, if
249    /// possible, instead of just storing IDs and coming back to the timeline
250    /// object to look up items.
251    pub async fn item_by_event_id(&self, event_id: &EventId) -> Option<EventTimelineItem> {
252        let items = self.controller.items().await;
253        let (_, item) = rfind_event_by_id(&items, event_id)?;
254        Some(item.to_owned())
255    }
256
257    /// Get the latest of the timeline's event items.
258    pub async fn latest_event(&self) -> Option<EventTimelineItem> {
259        if self.controller.is_live() {
260            self.controller.items().await.last()?.as_event().cloned()
261        } else {
262            None
263        }
264    }
265
266    /// Get the current timeline items, along with a stream of updates of
267    /// timeline items.
268    ///
269    /// The stream produces `Vec<VectorDiff<_>>`, which means multiple updates
270    /// at once. There are no delays, it consumes as many updates as possible
271    /// and batches them.
272    pub async fn subscribe(
273        &self,
274    ) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + use<>)
275    {
276        let (items, stream) = self.controller.subscribe().await;
277        let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
278        (items, stream)
279    }
280
281    /// Send a message to the room, and add it to the timeline as a local echo.
282    ///
283    /// For simplicity, this method doesn't currently allow custom message
284    /// types.
285    ///
286    /// If the encryption feature is enabled, this method will transparently
287    /// encrypt the room message if the room is encrypted.
288    ///
289    /// If sending the message fails, the local echo item will change its
290    /// `send_state` to [`EventSendState::SendingFailed`].
291    ///
292    /// This will do the right thing in the presence of threads:
293    /// - if this timeline is not focused on a thread, then it will send the
294    ///   event as is.
295    /// - if this is a threaded timeline, and the event to send is a room
296    ///   message without a relationship, it will automatically mark it as a
297    ///   thread reply with the correct reply fallback, and send it.
298    ///
299    /// # Arguments
300    ///
301    /// * `content` - The content of the message event.
302    #[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
303    pub async fn send(&self, mut content: AnyMessageLikeEventContent) -> Result<SendHandle, Error> {
304        // If this is a room event we're sending in a threaded timeline, we add the
305        // thread relation ourselves.
306        if content.relation().is_none()
307            && let Some(reply) = self.infer_reply(None).await
308        {
309            match &mut content {
310                AnyMessageLikeEventContent::RoomMessage(room_msg_content) => {
311                    content = self
312                        .room()
313                        .make_reply_event(
314                            // Note: this `.into()` gets rid of the relation, but we've checked
315                            // previously that the `relates_to` field wasn't
316                            // set.
317                            room_msg_content.clone().into(),
318                            reply,
319                        )
320                        .await?
321                        .into();
322                }
323
324                AnyMessageLikeEventContent::UnstablePollStart(
325                    UnstablePollStartEventContent::New(poll),
326                ) => {
327                    if let Some(thread_root) = self.controller.thread_root() {
328                        poll.relates_to = Some(RelationWithoutReplacement::Thread(Thread::plain(
329                            thread_root,
330                            reply.event_id,
331                        )));
332                    }
333                }
334
335                AnyMessageLikeEventContent::Sticker(sticker) => {
336                    if let Some(thread_root) = self.controller.thread_root() {
337                        sticker.relates_to =
338                            Some(Relation::Thread(Thread::plain(thread_root, reply.event_id)));
339                    }
340                }
341
342                _ => {}
343            }
344        }
345
346        Ok(self.room().send_queue().send(content).await?)
347    }
348
349    /// Send a reply to the given event.
350    ///
351    /// Currently it only supports events with an event ID and JSON being
352    /// available (which can be removed by local redactions). This is subject to
353    /// change. Use [`EventTimelineItem::can_be_replied_to`] to decide whether
354    /// to render a reply button.
355    ///
356    /// The sender will be added to the mentions of the reply if
357    /// and only if the event has not been written by the sender.
358    ///
359    /// This will do the right thing in the presence of threads:
360    /// - if this timeline is not focused on a thread, then it will forward the
361    ///   thread relationship of the replied-to event, if present.
362    /// - if this is a threaded timeline, it will mark the reply as an in-thread
363    ///   reply.
364    ///
365    /// # Arguments
366    ///
367    /// * `content` - The content of the reply.
368    ///
369    /// * `in_reply_to` - The ID of the event to reply to.
370    #[instrument(skip(self, content))]
371    pub async fn send_reply(
372        &self,
373        content: RoomMessageEventContentWithoutRelation,
374        in_reply_to: OwnedEventId,
375    ) -> Result<(), Error> {
376        let reply = self
377            .infer_reply(Some(in_reply_to))
378            .await
379            .expect("the reply will always be set because we provided a replied-to event id");
380        let content = self.room().make_reply_event(content, reply).await?;
381        self.send(content.into()).await?;
382        Ok(())
383    }
384
385    /// Given a message or media to send, and an optional `in_reply_to` event,
386    /// automatically fills the [`Reply`] information based on the current
387    /// timeline focus.
388    pub(crate) async fn infer_reply(&self, in_reply_to: Option<OwnedEventId>) -> Option<Reply> {
389        // If there's a replied-to event id, the reply is pretty straightforward, and we
390        // should only infer the `EnforceThread` based on the current focus.
391        if let Some(in_reply_to) = in_reply_to {
392            let enforce_thread = if self.controller.is_threaded() {
393                EnforceThread::Threaded(ReplyWithinThread::Yes)
394            } else {
395                EnforceThread::MaybeThreaded
396            };
397            return Some(Reply { event_id: in_reply_to, enforce_thread });
398        }
399
400        let thread_root = self.controller.thread_root()?;
401
402        // The latest event id is used for the reply-to fallback, for clients which
403        // don't handle threads. It should be correctly set to the latest
404        // event in the thread, which the timeline instance might or might
405        // not know about; in this case, we do a best effort of filling it, and resort
406        // to using the thread root if we don't know about any event.
407        //
408        // Note: we could trigger a back-pagination if the timeline is empty, and wait
409        // for the results, if the timeline is too often empty.
410
411        let latest_event_id = self
412            .controller
413            .items()
414            .await
415            .iter()
416            .rev()
417            .find_map(|item| {
418                if let TimelineItemKind::Event(event) = item.kind() {
419                    event.event_id().map(ToOwned::to_owned)
420                } else {
421                    None
422                }
423            })
424            .unwrap_or(thread_root);
425
426        Some(Reply {
427            event_id: latest_event_id,
428            enforce_thread: EnforceThread::Threaded(ReplyWithinThread::No),
429        })
430    }
431
432    /// Edit an event given its [`TimelineEventItemId`] and some new content.
433    ///
434    /// Only supports events for which [`EventTimelineItem::is_editable()`]
435    /// returns `true`.
436    #[instrument(skip(self, new_content))]
437    pub async fn edit(
438        &self,
439        item_id: &TimelineEventItemId,
440        new_content: EditedContent,
441    ) -> Result<(), Error> {
442        let items = self.items().await;
443        let Some((_pos, item)) = rfind_event_by_item_id(&items, item_id) else {
444            return Err(Error::EventNotInTimeline(item_id.clone()));
445        };
446
447        match item.handle() {
448            TimelineItemHandle::Remote(event_id) => {
449                let content = self
450                    .room()
451                    .make_edit_event(event_id, new_content)
452                    .await
453                    .map_err(EditError::RoomError)?;
454                self.send(content).await?;
455                Ok(())
456            }
457
458            TimelineItemHandle::Local(handle) => {
459                // Relations are filled by the editing code itself.
460                let new_content: AnyMessageLikeEventContent = match new_content {
461                    EditedContent::RoomMessage(message) => {
462                        if item.content.is_message() {
463                            AnyMessageLikeEventContent::RoomMessage(message.into())
464                        } else {
465                            return Err(EditError::ContentMismatch {
466                                original: item.content.debug_string().to_owned(),
467                                new: "a message".to_owned(),
468                            }
469                            .into());
470                        }
471                    }
472
473                    EditedContent::PollStart { new_content, .. } => {
474                        if item.content.is_poll() {
475                            AnyMessageLikeEventContent::UnstablePollStart(
476                                UnstablePollStartEventContent::New(
477                                    NewUnstablePollStartEventContent::new(new_content),
478                                ),
479                            )
480                        } else {
481                            return Err(EditError::ContentMismatch {
482                                original: item.content.debug_string().to_owned(),
483                                new: "a poll".to_owned(),
484                            }
485                            .into());
486                        }
487                    }
488
489                    EditedContent::MediaCaption { caption, formatted_caption, mentions } => {
490                        if handle
491                            .edit_media_caption(caption, formatted_caption, mentions)
492                            .await
493                            .map_err(RoomSendQueueError::StorageError)?
494                        {
495                            return Ok(());
496                        }
497                        return Err(EditError::InvalidLocalEchoState.into());
498                    }
499                };
500
501                if !handle.edit(new_content).await.map_err(RoomSendQueueError::StorageError)? {
502                    return Err(EditError::InvalidLocalEchoState.into());
503                }
504
505                Ok(())
506            }
507        }
508    }
509
510    /// Toggle a reaction on an event.
511    ///
512    /// Adds or redacts a reaction based on the state of the reaction at the
513    /// time it is called.
514    ///
515    /// When redacting a previous reaction, the redaction reason is not set.
516    ///
517    /// Ensures that only one reaction is sent at a time to avoid race
518    /// conditions and spamming the homeserver with requests.
519    ///
520    /// Returns `true` if the reaction was added, `false` if it was removed.
521    pub async fn toggle_reaction(
522        &self,
523        item_id: &TimelineEventItemId,
524        reaction_key: &str,
525    ) -> Result<bool, Error> {
526        self.controller.toggle_reaction_local(item_id, reaction_key).await
527    }
528
529    /// Sends an attachment to the room.
530    ///
531    /// It does not currently support local echoes.
532    ///
533    /// If the encryption feature is enabled, this method will transparently
534    /// encrypt the room message if the room is encrypted.
535    ///
536    /// The attachment and its optional thumbnail are stored in the media cache
537    /// and can be retrieved at any time, by calling
538    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
539    /// in the corresponding `TimelineEventItem`, and using a
540    /// `MediaFormat::File`.
541    ///
542    /// # Arguments
543    ///
544    /// * `source` - The source of the attachment to send.
545    ///
546    /// * `mime_type` - The attachment's mime type.
547    ///
548    /// * `config` - An attachment configuration object containing details about
549    ///   the attachment like a thumbnail, its size, duration etc.
550    ///
551    /// [`Media::get_media_content()`]: matrix_sdk::Media::get_media_content
552    #[instrument(skip_all)]
553    pub fn send_attachment(
554        &self,
555        source: impl Into<AttachmentSource>,
556        mime_type: Mime,
557        config: AttachmentConfig,
558    ) -> SendAttachment<'_> {
559        SendAttachment::new(self, source.into(), mime_type, config)
560    }
561
562    /// Sends a media gallery to the room.
563    ///
564    /// If the encryption feature is enabled, this method will transparently
565    /// encrypt the room message if the room is encrypted.
566    ///
567    /// The attachments and their optional thumbnails are stored in the media
568    /// cache and can be retrieved at any time, by calling
569    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
570    /// in the corresponding `TimelineEventItem`, and using a
571    /// `MediaFormat::File`.
572    ///
573    /// # Arguments
574    /// * `gallery` - A configuration object containing details about the
575    ///   gallery like files, thumbnails, etc.
576    ///
577    /// [`Media::get_media_content()`]: matrix_sdk::Media::get_media_content
578    #[cfg(feature = "unstable-msc4274")]
579    #[instrument(skip_all)]
580    pub fn send_gallery(&self, gallery: GalleryConfig) -> SendGallery<'_> {
581        SendGallery::new(self, gallery)
582    }
583
584    /// Redact an event given its [`TimelineEventItemId`] and an optional
585    /// reason.
586    pub async fn redact(
587        &self,
588        item_id: &TimelineEventItemId,
589        reason: Option<&str>,
590    ) -> Result<(), Error> {
591        let items = self.items().await;
592        let Some((_pos, event)) = rfind_event_by_item_id(&items, item_id) else {
593            return Err(RedactError::ItemNotFound(item_id.clone()).into());
594        };
595
596        match event.handle() {
597            TimelineItemHandle::Remote(event_id) => {
598                self.room().redact(event_id, reason, None).await.map_err(RedactError::HttpError)?;
599            }
600            TimelineItemHandle::Local(handle) => {
601                if !handle.abort().await.map_err(RoomSendQueueError::StorageError)? {
602                    return Err(RedactError::InvalidLocalEchoState.into());
603                }
604            }
605        }
606
607        Ok(())
608    }
609
610    /// Fetch unavailable details about the event with the given ID.
611    ///
612    /// This method only works for IDs of remote [`EventTimelineItem`]s,
613    /// to prevent losing details when a local echo is replaced by its
614    /// remote echo.
615    ///
616    /// This method tries to make all the requests it can. If an error is
617    /// encountered for a given request, it is forwarded with the
618    /// [`TimelineDetails::Error`] variant.
619    ///
620    /// # Arguments
621    ///
622    /// * `event_id` - The event ID of the event to fetch details for.
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if the identifier doesn't match any event with a remote
627    /// echo in the timeline, or if the event is removed from the timeline
628    /// before all requests are handled.
629    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
630    pub async fn fetch_details_for_event(&self, event_id: &EventId) -> Result<(), Error> {
631        self.controller.fetch_in_reply_to_details(event_id).await
632    }
633
634    /// Fetch all member events for the room this timeline is displaying.
635    ///
636    /// If the full member list is not known, sender profiles are currently
637    /// likely not going to be available. This will be fixed in the future.
638    ///
639    /// If fetching the members fails, any affected timeline items will have
640    /// the `sender_profile` set to [`TimelineDetails::Error`].
641    #[instrument(skip_all)]
642    pub async fn fetch_members(&self) {
643        self.controller.set_sender_profiles_pending().await;
644        match self.room().sync_members().await {
645            Ok(_) => {
646                self.controller.update_missing_sender_profiles().await;
647            }
648            Err(e) => {
649                self.controller.set_sender_profiles_error(Arc::new(e)).await;
650            }
651        }
652    }
653
654    /// Get the latest read receipt for the given user.
655    ///
656    /// Contrary to [`Room::load_user_receipt()`] that only keeps track of read
657    /// receipts received from the homeserver, this keeps also track of implicit
658    /// read receipts in this timeline, i.e. when a room member sends an event.
659    #[instrument(skip(self))]
660    pub async fn latest_user_read_receipt(
661        &self,
662        user_id: &UserId,
663    ) -> Option<(OwnedEventId, Receipt)> {
664        self.controller.latest_user_read_receipt(user_id).await
665    }
666
667    /// Get the ID of the timeline event with the latest read receipt for the
668    /// given user.
669    ///
670    /// In contrary to [`Self::latest_user_read_receipt()`], this allows to know
671    /// the position of the read receipt in the timeline even if the event it
672    /// applies to is not visible in the timeline, unless the event is unknown
673    /// by this timeline.
674    #[instrument(skip(self))]
675    pub async fn latest_user_read_receipt_timeline_event_id(
676        &self,
677        user_id: &UserId,
678    ) -> Option<OwnedEventId> {
679        self.controller.latest_user_read_receipt_timeline_event_id(user_id).await
680    }
681
682    /// Subscribe to changes in the read receipts of our own user.
683    pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> + use<> {
684        self.controller.subscribe_own_user_read_receipts_changed().await
685    }
686
687    /// Send the given receipt.
688    ///
689    /// This uses [`Room::send_single_receipt`] internally, but checks
690    /// first if the receipt points to an event in this timeline that is more
691    /// recent than the current ones, to avoid unnecessary requests.
692    ///
693    /// If an unthreaded receipt is sent, this will also unset the unread flag
694    /// of the room if necessary.
695    ///
696    /// The thread of the receipt is determined by the timeline instance's
697    /// focus mode and `hide_threaded_events` flag.
698    ///
699    /// Returns a boolean indicating if it sent the receipt or not.
700    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
701    pub async fn send_single_receipt(
702        &self,
703        receipt_type: ReceiptType,
704        event_id: OwnedEventId,
705    ) -> Result<bool> {
706        let thread = self.controller.infer_thread_for_read_receipt(&receipt_type);
707
708        if !self.controller.should_send_receipt(&receipt_type, &thread, &event_id).await {
709            trace!(
710                "not sending receipt, because we already cover the event with a previous receipt"
711            );
712
713            if thread == ReceiptThread::Unthreaded {
714                // Unset the read marker.
715                self.room().set_unread_flag(false).await?;
716            }
717
718            return Ok(false);
719        }
720
721        trace!("sending receipt");
722        self.room().send_single_receipt(receipt_type, thread, event_id).await?;
723        Ok(true)
724    }
725
726    /// Send the given receipts.
727    ///
728    /// This uses [`Room::send_multiple_receipts`] internally, but
729    /// checks first if the receipts point to events in this timeline that
730    /// are more recent than the current ones, to avoid unnecessary
731    /// requests.
732    ///
733    /// This also unsets the unread marker of the room if necessary.
734    #[instrument(skip(self))]
735    pub async fn send_multiple_receipts(&self, mut receipts: Receipts) -> Result<()> {
736        if let Some(fully_read) = &receipts.fully_read
737            && !self
738                .controller
739                .should_send_receipt(
740                    &ReceiptType::FullyRead,
741                    &ReceiptThread::Unthreaded,
742                    fully_read,
743                )
744                .await
745        {
746            receipts.fully_read = None;
747        }
748
749        if let Some(read_receipt) = &receipts.public_read_receipt
750            && !self
751                .controller
752                .should_send_receipt(&ReceiptType::Read, &ReceiptThread::Unthreaded, read_receipt)
753                .await
754        {
755            receipts.public_read_receipt = None;
756        }
757
758        if let Some(private_read_receipt) = &receipts.private_read_receipt
759            && !self
760                .controller
761                .should_send_receipt(
762                    &ReceiptType::ReadPrivate,
763                    &ReceiptThread::Unthreaded,
764                    private_read_receipt,
765                )
766                .await
767        {
768            receipts.private_read_receipt = None;
769        }
770
771        let room = self.room();
772
773        if !receipts.is_empty() {
774            room.send_multiple_receipts(receipts).await?;
775        } else {
776            room.set_unread_flag(false).await?;
777        }
778
779        Ok(())
780    }
781
782    /// Mark the room as read by sending an unthreaded read receipt on the
783    /// latest event, be it visible or not.
784    ///
785    /// This works even if the latest event belongs to a thread, as a threaded
786    /// reply also belongs to the unthreaded timeline. No threaded receipt
787    /// will be sent here (see also #3123).
788    ///
789    /// This also unsets the unread marker of the room if necessary.
790    ///
791    /// Returns a boolean indicating if it sent the receipt or not.
792    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
793    pub async fn mark_as_read(&self, receipt_type: ReceiptType) -> Result<bool> {
794        if let Some(event_id) = self.controller.latest_event_id().await {
795            self.send_single_receipt(receipt_type, event_id).await
796        } else {
797            trace!("can't mark room as read because there's no latest event id");
798
799            // For live timelines, unset the read marker in this case.
800            if self.controller.is_live() {
801                self.room().set_unread_flag(false).await?;
802            }
803
804            Ok(false)
805        }
806    }
807
808    /// Adds a new pinned event by sending an updated `m.room.pinned_events`
809    /// event containing the new event id.
810    ///
811    /// This method will first try to get the pinned events from the current
812    /// room's state and if it fails to do so it'll try to load them from the
813    /// homeserver.
814    ///
815    /// Returns `true` if we pinned the event, `false` if the event was already
816    /// pinned.
817    pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
818        let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
819            event_ids
820        } else {
821            self.room().load_pinned_events().await?.unwrap_or_default()
822        };
823        let event_id = event_id.to_owned();
824        if pinned_event_ids.contains(&event_id) {
825            Ok(false)
826        } else {
827            pinned_event_ids.push(event_id);
828            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
829            self.room().send_state_event(content).await?;
830            Ok(true)
831        }
832    }
833
834    /// Removes a pinned event by sending an updated `m.room.pinned_events`
835    /// event without the event id we want to remove.
836    ///
837    /// This method will first try to get the pinned events from the current
838    /// room's state and if it fails to do so it'll try to load them from the
839    /// homeserver.
840    ///
841    /// Returns `true` if we unpinned the event, `false` if the event wasn't
842    /// pinned before.
843    pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
844        let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
845            event_ids
846        } else {
847            self.room().load_pinned_events().await?.unwrap_or_default()
848        };
849        let event_id = event_id.to_owned();
850        if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
851            pinned_event_ids.remove(idx);
852            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
853            self.room().send_state_event(content).await?;
854            Ok(true)
855        } else {
856            Ok(false)
857        }
858    }
859
860    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
861    /// timeline or not.
862    ///
863    /// Can be `None` if the event cannot be represented as a standalone item,
864    /// because it's an aggregation.
865    pub async fn make_replied_to(
866        &self,
867        event: TimelineEvent,
868    ) -> Result<Option<EmbeddedEvent>, Error> {
869        self.controller.make_replied_to(event).await
870    }
871
872    /// Returns whether this timeline is focused on a thread (be it live, or
873    /// from a permalink to a threaded event).
874    pub fn is_threaded(&self) -> bool {
875        self.controller.is_threaded()
876    }
877}
878
879/// Test helpers, likely not very useful in production.
880#[doc(hidden)]
881impl Timeline {
882    /// Get the current list of timeline items.
883    pub async fn items(&self) -> Vector<Arc<TimelineItem>> {
884        self.controller.items().await
885    }
886
887    pub async fn subscribe_filter_map<U: Clone>(
888        &self,
889        f: impl Fn(Arc<TimelineItem>) -> Option<U>,
890    ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>) {
891        let (items, stream) = self.controller.subscribe_filter_map(f).await;
892        let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
893        (items, stream)
894    }
895}
896
897#[derive(Debug)]
898struct TimelineDropHandle {
899    room_update_join_handle: JoinHandle<()>,
900    pinned_events_join_handle: Option<JoinHandle<()>>,
901    thread_update_join_handle: Option<JoinHandle<()>>,
902    local_echo_listener_handle: JoinHandle<()>,
903    _event_cache_drop_handle: Arc<EventCacheDropHandles>,
904    _crypto_drop_handles: CryptoDropHandles,
905}
906
907impl Drop for TimelineDropHandle {
908    fn drop(&mut self) {
909        if let Some(handle) = self.pinned_events_join_handle.take() {
910            handle.abort();
911        }
912
913        if let Some(handle) = self.thread_update_join_handle.take() {
914            handle.abort();
915        }
916
917        self.local_echo_listener_handle.abort();
918        self.room_update_join_handle.abort();
919    }
920}
921
922#[cfg(not(target_family = "wasm"))]
923pub type TimelineEventFilterFn =
924    dyn Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool + Send + Sync;
925#[cfg(target_family = "wasm")]
926pub type TimelineEventFilterFn = dyn Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool;
927
928/// A source for sending an attachment.
929///
930/// The [`AttachmentSource::File`] variant can be constructed from any type that
931/// implements `Into<PathBuf>`.
932#[derive(Debug, Clone)]
933pub enum AttachmentSource {
934    /// The data of the attachment.
935    Data {
936        /// The bytes of the attachment.
937        bytes: Vec<u8>,
938
939        /// The filename of the attachment.
940        filename: String,
941    },
942
943    /// An attachment loaded from a file.
944    ///
945    /// The bytes and the filename will be read from the file at the given path.
946    File(PathBuf),
947}
948
949impl AttachmentSource {
950    /// Try to convert this attachment source into a `(bytes, filename)` tuple.
951    pub(crate) fn try_into_bytes_and_filename(self) -> Result<(Vec<u8>, String), Error> {
952        match self {
953            Self::Data { bytes, filename } => Ok((bytes, filename)),
954            Self::File(path) => {
955                let filename = path
956                    .file_name()
957                    .ok_or(Error::InvalidAttachmentFileName)?
958                    .to_str()
959                    .ok_or(Error::InvalidAttachmentFileName)?
960                    .to_owned();
961                let bytes = fs::read(&path).map_err(|_| Error::InvalidAttachmentData)?;
962                Ok((bytes, filename))
963            }
964        }
965    }
966}
967
968impl<P> From<P> for AttachmentSource
969where
970    P: Into<PathBuf>,
971{
972    fn from(value: P) -> Self {
973        Self::File(value.into())
974    }
975}
976
977/// Configuration for sending a gallery.
978///
979/// This duplicates [`matrix_sdk::attachment::GalleryConfig`] but uses an
980/// `AttachmentSource` so that we can delay loading the actual data until we're
981/// inside the SendGallery future. This allows [`Timeline::send_gallery`] to
982/// return early without blocking the caller.
983#[cfg(feature = "unstable-msc4274")]
984#[derive(Debug, Default)]
985pub struct GalleryConfig {
986    pub(crate) txn_id: Option<OwnedTransactionId>,
987    pub(crate) items: Vec<GalleryItemInfo>,
988    pub(crate) caption: Option<String>,
989    pub(crate) formatted_caption: Option<FormattedBody>,
990    pub(crate) mentions: Option<Mentions>,
991    pub(crate) in_reply_to: Option<OwnedEventId>,
992}
993
994#[cfg(feature = "unstable-msc4274")]
995impl GalleryConfig {
996    /// Create a new empty `GalleryConfig`.
997    pub fn new() -> Self {
998        Self::default()
999    }
1000
1001    /// Set the transaction ID to send.
1002    ///
1003    /// # Arguments
1004    ///
1005    /// * `txn_id` - A unique ID that can be attached to a `MessageEvent` held
1006    ///   in its unsigned field as `transaction_id`. If not given, one is
1007    ///   created for the message.
1008    #[must_use]
1009    pub fn txn_id(mut self, txn_id: OwnedTransactionId) -> Self {
1010        self.txn_id = Some(txn_id);
1011        self
1012    }
1013
1014    /// Adds a media item to the gallery.
1015    ///
1016    /// # Arguments
1017    ///
1018    /// * `item` - Information about the item to be added.
1019    #[must_use]
1020    pub fn add_item(mut self, item: GalleryItemInfo) -> Self {
1021        self.items.push(item);
1022        self
1023    }
1024
1025    /// Set the optional caption.
1026    ///
1027    /// # Arguments
1028    ///
1029    /// * `caption` - The optional caption.
1030    pub fn caption(mut self, caption: Option<String>) -> Self {
1031        self.caption = caption;
1032        self
1033    }
1034
1035    /// Set the optional formatted caption.
1036    ///
1037    /// # Arguments
1038    ///
1039    /// * `formatted_caption` - The optional formatted caption.
1040    pub fn formatted_caption(mut self, formatted_caption: Option<FormattedBody>) -> Self {
1041        self.formatted_caption = formatted_caption;
1042        self
1043    }
1044
1045    /// Set the mentions of the message.
1046    ///
1047    /// # Arguments
1048    ///
1049    /// * `mentions` - The mentions of the message.
1050    pub fn mentions(mut self, mentions: Option<Mentions>) -> Self {
1051        self.mentions = mentions;
1052        self
1053    }
1054
1055    /// Set the reply information of the message.
1056    ///
1057    /// # Arguments
1058    ///
1059    /// * `event_id` - The event ID to reply to.
1060    pub fn in_reply_to(mut self, event_id: Option<OwnedEventId>) -> Self {
1061        self.in_reply_to = event_id;
1062        self
1063    }
1064
1065    /// Returns the number of media items in the gallery.
1066    pub fn len(&self) -> usize {
1067        self.items.len()
1068    }
1069
1070    /// Checks whether the gallery contains any media items or not.
1071    pub fn is_empty(&self) -> bool {
1072        self.items.is_empty()
1073    }
1074}
1075
1076#[cfg(feature = "unstable-msc4274")]
1077#[derive(Debug)]
1078/// Metadata for a gallery item
1079pub struct GalleryItemInfo {
1080    /// The attachment source.
1081    pub source: AttachmentSource,
1082    /// The mime type.
1083    pub content_type: Mime,
1084    /// The attachment info.
1085    pub attachment_info: AttachmentInfo,
1086    /// The caption.
1087    pub caption: Option<String>,
1088    /// The formatted caption.
1089    pub formatted_caption: Option<FormattedBody>,
1090    /// The thumbnail.
1091    pub thumbnail: Option<Thumbnail>,
1092}
1093
1094#[cfg(feature = "unstable-msc4274")]
1095impl TryFrom<GalleryItemInfo> for matrix_sdk::attachment::GalleryItemInfo {
1096    type Error = Error;
1097
1098    fn try_from(value: GalleryItemInfo) -> Result<Self, Self::Error> {
1099        let (data, filename) = value.source.try_into_bytes_and_filename()?;
1100        Ok(matrix_sdk::attachment::GalleryItemInfo {
1101            filename,
1102            content_type: value.content_type,
1103            data,
1104            attachment_info: value.attachment_info,
1105            caption: value.caption,
1106            formatted_caption: value.formatted_caption,
1107            thumbnail: value.thumbnail,
1108        })
1109    }
1110}