matrix_sdk_ui/timeline/
mod.rs

1// Copyright 2022 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A high-level view into a room's contents.
16//!
17//! See [`Timeline`] for details.
18
19use std::{fs, path::PathBuf, sync::Arc};
20
21use algorithms::rfind_event_by_item_id;
22use event_item::TimelineItemHandle;
23use eyeball_im::VectorDiff;
24#[cfg(feature = "unstable-msc4274")]
25use futures::SendGallery;
26use futures_core::Stream;
27use imbl::Vector;
28use matrix_sdk::{
29    Result,
30    attachment::{AttachmentInfo, Thumbnail},
31    deserialized_responses::TimelineEvent,
32    event_cache::{EventCacheDropHandles, RoomEventCache},
33    executor::JoinHandle,
34    room::{
35        Receipts, Room,
36        edit::EditedContent,
37        reply::{EnforceThread, Reply},
38    },
39    send_queue::{RoomSendQueueError, SendHandle},
40};
41use mime::Mime;
42use pinned_events_loader::PinnedEventsRoom;
43use ruma::{
44    EventId, OwnedEventId, OwnedTransactionId, UserId,
45    api::client::receipt::create_receipt::v3::ReceiptType,
46    events::{
47        AnyMessageLikeEventContent, AnySyncTimelineEvent, Mentions,
48        poll::unstable_start::{NewUnstablePollStartEventContent, UnstablePollStartEventContent},
49        receipt::{Receipt, ReceiptThread},
50        relation::Thread,
51        room::{
52            message::{
53                Relation, RelationWithoutReplacement, ReplyWithinThread,
54                RoomMessageEventContentWithoutRelation, TextMessageEventContent,
55            },
56            pinned_events::RoomPinnedEventsEventContent,
57        },
58    },
59    room_version_rules::RoomVersionRules,
60};
61use subscriber::TimelineWithDropHandle;
62use thiserror::Error;
63use tracing::{instrument, trace, warn};
64
65use self::{
66    algorithms::rfind_event_by_id, controller::TimelineController, futures::SendAttachment,
67};
68use crate::timeline::controller::CryptoDropHandles;
69
70mod algorithms;
71mod builder;
72mod controller;
73mod date_dividers;
74mod error;
75mod event_handler;
76mod event_item;
77pub mod event_type_filter;
78pub mod futures;
79mod item;
80mod latest_event;
81mod pagination;
82mod pinned_events_loader;
83mod subscriber;
84mod tasks;
85#[cfg(test)]
86mod tests;
87mod traits;
88mod virtual_item;
89
90pub use self::{
91    builder::TimelineBuilder,
92    controller::default_event_filter,
93    error::*,
94    event_item::{
95        AnyOtherFullStateEventContent, EmbeddedEvent, EncryptedMessage, EventItemOrigin,
96        EventSendState, EventTimelineItem, InReplyToDetails, MediaUploadProgress,
97        MemberProfileChange, MembershipChange, Message, MsgLikeContent, MsgLikeKind,
98        OtherMessageLike, OtherState, PollResult, PollState, Profile, ReactionInfo, ReactionStatus,
99        ReactionsByKeyBySender, RoomMembershipChange, RoomPinnedEventsChange, Sticker,
100        ThreadSummary, TimelineDetails, TimelineEventItemId, TimelineEventShieldState,
101        TimelineEventShieldStateCode, TimelineItemContent,
102    },
103    event_type_filter::TimelineEventTypeFilter,
104    item::{TimelineItem, TimelineItemKind, TimelineUniqueId},
105    latest_event::{LatestEventValue, LatestEventValueLocalState},
106    traits::RoomExt,
107    virtual_item::VirtualTimelineItem,
108};
109
110/// A high-level view into a regular¹ room's contents.
111///
112/// ¹ This type is meant to be used in the context of rooms without a
113/// `room_type`, that is rooms that are primarily used to exchange text
114/// messages.
115#[derive(Debug)]
116pub struct Timeline {
117    /// Cloneable, inner fields of the `Timeline`, shared with some background
118    /// tasks.
119    controller: TimelineController,
120
121    /// The event cache specialized for this room's view.
122    event_cache: RoomEventCache,
123
124    /// References to long-running tasks held by the timeline.
125    drop_handle: Arc<TimelineDropHandle>,
126}
127
128/// What should the timeline focus on?
129#[derive(Clone, Debug, PartialEq)]
130pub enum TimelineFocus {
131    /// Focus on live events, i.e. receive events from sync and append them in
132    /// real-time.
133    Live {
134        /// Whether to hide in-thread replies from the live timeline.
135        ///
136        /// This should be set to true when the client can create
137        /// [`Self::Thread`]-focused timelines from the thread roots themselves.
138        hide_threaded_events: bool,
139    },
140
141    /// Focus on a specific event, e.g. after clicking a permalink.
142    Event {
143        target: OwnedEventId,
144        num_context_events: u16,
145        /// Whether to hide in-thread replies from the live timeline.
146        ///
147        /// This should be set to true when the client can create
148        /// [`Self::Thread`]-focused timelines from the thread roots themselves.
149        hide_threaded_events: bool,
150    },
151
152    /// Focus on a specific thread
153    Thread { root_event_id: OwnedEventId },
154
155    /// Only show pinned events.
156    PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 },
157}
158
159impl TimelineFocus {
160    pub(super) fn debug_string(&self) -> String {
161        match self {
162            TimelineFocus::Live { .. } => "live".to_owned(),
163            TimelineFocus::Event { target, .. } => format!("permalink:{target}"),
164            TimelineFocus::Thread { root_event_id, .. } => format!("thread:{root_event_id}"),
165            TimelineFocus::PinnedEvents { .. } => "pinned-events".to_owned(),
166        }
167    }
168}
169
170/// Changes how dividers get inserted, either in between each day or in between
171/// each month
172#[derive(Debug, Clone)]
173pub enum DateDividerMode {
174    Daily,
175    Monthly,
176}
177
178/// Configuration for sending an attachment.
179///
180/// Like [`matrix_sdk::attachment::AttachmentConfig`], but instead of the
181/// `reply` field, there's only a `in_reply_to` event id; it's the timeline
182/// deciding to fill the rest of the reply parameters.
183#[derive(Debug, Default)]
184pub struct AttachmentConfig {
185    pub txn_id: Option<OwnedTransactionId>,
186    pub info: Option<AttachmentInfo>,
187    pub thumbnail: Option<Thumbnail>,
188    pub caption: Option<TextMessageEventContent>,
189    pub mentions: Option<Mentions>,
190    pub in_reply_to: Option<OwnedEventId>,
191}
192
193impl Timeline {
194    /// Returns the room for this timeline.
195    pub fn room(&self) -> &Room {
196        self.controller.room()
197    }
198
199    /// Clear all timeline items.
200    pub async fn clear(&self) {
201        self.controller.clear().await;
202    }
203
204    /// Retry decryption of previously un-decryptable events given a list of
205    /// session IDs whose keys have been imported.
206    ///
207    /// # Examples
208    ///
209    /// ```no_run
210    /// # use std::{path::PathBuf, time::Duration};
211    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::room_id};
212    /// # use matrix_sdk_ui::Timeline;
213    /// # async {
214    /// # let mut client: Client = todo!();
215    /// # let room_id = ruma::room_id!("!example:example.org");
216    /// # let timeline: Timeline = todo!();
217    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
218    /// let result =
219    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
220    ///
221    /// // Given a timeline for a specific room_id
222    /// if let Some(keys_for_users) = result.keys.get(room_id) {
223    ///     let session_ids = keys_for_users.values().flatten();
224    ///     timeline.retry_decryption(session_ids).await;
225    /// }
226    /// # anyhow::Ok(()) };
227    /// ```
228    pub async fn retry_decryption<S: Into<String>>(
229        &self,
230        session_ids: impl IntoIterator<Item = S>,
231    ) {
232        self.controller
233            .retry_event_decryption(Some(session_ids.into_iter().map(Into::into).collect()))
234            .await;
235    }
236
237    #[tracing::instrument(skip(self))]
238    async fn retry_decryption_for_all_events(&self) {
239        self.controller.retry_event_decryption(None).await;
240    }
241
242    /// Get the current timeline item for the given event ID, if any.
243    ///
244    /// Will return a remote event, *or* a local echo that has been sent but not
245    /// yet replaced by a remote echo.
246    ///
247    /// It's preferable to store the timeline items in the model for your UI, if
248    /// possible, instead of just storing IDs and coming back to the timeline
249    /// object to look up items.
250    pub async fn item_by_event_id(&self, event_id: &EventId) -> Option<EventTimelineItem> {
251        let items = self.controller.items().await;
252        let (_, item) = rfind_event_by_id(&items, event_id)?;
253        Some(item.to_owned())
254    }
255
256    /// Get the latest of the timeline's remote event ids.
257    pub async fn latest_event_id(&self) -> Option<OwnedEventId> {
258        self.controller.latest_event_id().await
259    }
260
261    /// Get the current timeline items, along with a stream of updates of
262    /// timeline items.
263    ///
264    /// The stream produces `Vec<VectorDiff<_>>`, which means multiple updates
265    /// at once. There are no delays, it consumes as many updates as possible
266    /// and batches them.
267    pub async fn subscribe(
268        &self,
269    ) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>> + use<>)
270    {
271        let (items, stream) = self.controller.subscribe().await;
272        let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
273        (items, stream)
274    }
275
276    /// Send a message to the room, and add it to the timeline as a local echo.
277    ///
278    /// For simplicity, this method doesn't currently allow custom message
279    /// types.
280    ///
281    /// If the encryption feature is enabled, this method will transparently
282    /// encrypt the room message if the room is encrypted.
283    ///
284    /// If sending the message fails, the local echo item will change its
285    /// `send_state` to [`EventSendState::SendingFailed`].
286    ///
287    /// This will do the right thing in the presence of threads:
288    /// - if this timeline is not focused on a thread, then it will send the
289    ///   event as is.
290    /// - if this is a threaded timeline, and the event to send is a room
291    ///   message without a relationship, it will automatically mark it as a
292    ///   thread reply with the correct reply fallback, and send it.
293    ///
294    /// # Arguments
295    ///
296    /// * `content` - The content of the message event.
297    #[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
298    pub async fn send(&self, mut content: AnyMessageLikeEventContent) -> Result<SendHandle, Error> {
299        // If this is a room event we're sending in a threaded timeline, we add the
300        // thread relation ourselves.
301        if content.relation().is_none()
302            && let Some(reply) = self.infer_reply(None).await
303        {
304            match &mut content {
305                AnyMessageLikeEventContent::RoomMessage(room_msg_content) => {
306                    content = self
307                        .room()
308                        .make_reply_event(
309                            // Note: this `.into()` gets rid of the relation, but we've checked
310                            // previously that the `relates_to` field wasn't
311                            // set.
312                            room_msg_content.clone().into(),
313                            reply,
314                        )
315                        .await?
316                        .into();
317                }
318
319                AnyMessageLikeEventContent::UnstablePollStart(
320                    UnstablePollStartEventContent::New(poll),
321                ) => {
322                    if let Some(thread_root) = self.controller.thread_root() {
323                        poll.relates_to = Some(RelationWithoutReplacement::Thread(Thread::plain(
324                            thread_root,
325                            reply.event_id,
326                        )));
327                    }
328                }
329
330                AnyMessageLikeEventContent::Sticker(sticker) => {
331                    if let Some(thread_root) = self.controller.thread_root() {
332                        sticker.relates_to =
333                            Some(Relation::Thread(Thread::plain(thread_root, reply.event_id)));
334                    }
335                }
336
337                _ => {}
338            }
339        }
340
341        Ok(self.room().send_queue().send(content).await?)
342    }
343
344    /// Send a reply to the given event.
345    ///
346    /// Currently it only supports events with an event ID and JSON being
347    /// available (which can be removed by local redactions). This is subject to
348    /// change. Use [`EventTimelineItem::can_be_replied_to`] to decide whether
349    /// to render a reply button.
350    ///
351    /// The sender will be added to the mentions of the reply if
352    /// and only if the event has not been written by the sender.
353    ///
354    /// This will do the right thing in the presence of threads:
355    /// - if this timeline is not focused on a thread, then it will forward the
356    ///   thread relationship of the replied-to event, if present.
357    /// - if this is a threaded timeline, it will mark the reply as an in-thread
358    ///   reply.
359    ///
360    /// # Arguments
361    ///
362    /// * `content` - The content of the reply.
363    ///
364    /// * `in_reply_to` - The ID of the event to reply to.
365    #[instrument(skip(self, content))]
366    pub async fn send_reply(
367        &self,
368        content: RoomMessageEventContentWithoutRelation,
369        in_reply_to: OwnedEventId,
370    ) -> Result<(), Error> {
371        let reply = self
372            .infer_reply(Some(in_reply_to))
373            .await
374            .expect("the reply will always be set because we provided a replied-to event id");
375        let content = self.room().make_reply_event(content, reply).await?;
376        self.send(content.into()).await?;
377        Ok(())
378    }
379
380    /// Given a message or media to send, and an optional `in_reply_to` event,
381    /// automatically fills the [`Reply`] information based on the current
382    /// timeline focus.
383    pub(crate) async fn infer_reply(&self, in_reply_to: Option<OwnedEventId>) -> Option<Reply> {
384        // If there's a replied-to event id, the reply is pretty straightforward, and we
385        // should only infer the `EnforceThread` based on the current focus.
386        if let Some(in_reply_to) = in_reply_to {
387            let enforce_thread = if self.controller.is_threaded() {
388                EnforceThread::Threaded(ReplyWithinThread::Yes)
389            } else {
390                EnforceThread::MaybeThreaded
391            };
392            return Some(Reply { event_id: in_reply_to, enforce_thread });
393        }
394
395        let thread_root = self.controller.thread_root()?;
396
397        // The latest event id is used for the reply-to fallback, for clients which
398        // don't handle threads. It should be correctly set to the latest
399        // event in the thread, which the timeline instance might or might
400        // not know about; in this case, we do a best effort of filling it, and resort
401        // to using the thread root if we don't know about any event.
402        //
403        // Note: we could trigger a back-pagination if the timeline is empty, and wait
404        // for the results, if the timeline is too often empty.
405
406        let latest_event_id = self
407            .controller
408            .items()
409            .await
410            .iter()
411            .rev()
412            .find_map(|item| {
413                if let TimelineItemKind::Event(event) = item.kind() {
414                    event.event_id().map(ToOwned::to_owned)
415                } else {
416                    None
417                }
418            })
419            .unwrap_or(thread_root);
420
421        Some(Reply {
422            event_id: latest_event_id,
423            enforce_thread: EnforceThread::Threaded(ReplyWithinThread::No),
424        })
425    }
426
427    /// Edit an event given its [`TimelineEventItemId`] and some new content.
428    ///
429    /// Only supports events for which [`EventTimelineItem::is_editable()`]
430    /// returns `true`.
431    #[instrument(skip(self, new_content))]
432    pub async fn edit(
433        &self,
434        item_id: &TimelineEventItemId,
435        new_content: EditedContent,
436    ) -> Result<(), Error> {
437        let items = self.items().await;
438        let Some((_pos, item)) = rfind_event_by_item_id(&items, item_id) else {
439            return Err(Error::EventNotInTimeline(item_id.clone()));
440        };
441
442        match item.handle() {
443            TimelineItemHandle::Remote(event_id) => {
444                let content = self
445                    .room()
446                    .make_edit_event(event_id, new_content)
447                    .await
448                    .map_err(EditError::RoomError)?;
449                self.send(content).await?;
450                Ok(())
451            }
452
453            TimelineItemHandle::Local(handle) => {
454                // Relations are filled by the editing code itself.
455                let new_content: AnyMessageLikeEventContent = match new_content {
456                    EditedContent::RoomMessage(message) => {
457                        if item.content.is_message() {
458                            AnyMessageLikeEventContent::RoomMessage(message.into())
459                        } else {
460                            return Err(EditError::ContentMismatch {
461                                original: item.content.debug_string().to_owned(),
462                                new: "a message".to_owned(),
463                            }
464                            .into());
465                        }
466                    }
467
468                    EditedContent::PollStart { new_content, .. } => {
469                        if item.content.is_poll() {
470                            AnyMessageLikeEventContent::UnstablePollStart(
471                                UnstablePollStartEventContent::New(
472                                    NewUnstablePollStartEventContent::new(new_content),
473                                ),
474                            )
475                        } else {
476                            return Err(EditError::ContentMismatch {
477                                original: item.content.debug_string().to_owned(),
478                                new: "a poll".to_owned(),
479                            }
480                            .into());
481                        }
482                    }
483
484                    EditedContent::MediaCaption { caption, formatted_caption, mentions } => {
485                        if handle
486                            .edit_media_caption(caption, formatted_caption, mentions)
487                            .await
488                            .map_err(RoomSendQueueError::StorageError)?
489                        {
490                            return Ok(());
491                        }
492                        return Err(EditError::InvalidLocalEchoState.into());
493                    }
494                };
495
496                if !handle.edit(new_content).await.map_err(RoomSendQueueError::StorageError)? {
497                    return Err(EditError::InvalidLocalEchoState.into());
498                }
499
500                Ok(())
501            }
502        }
503    }
504
505    /// Toggle a reaction on an event.
506    ///
507    /// Adds or redacts a reaction based on the state of the reaction at the
508    /// time it is called.
509    ///
510    /// When redacting a previous reaction, the redaction reason is not set.
511    ///
512    /// Ensures that only one reaction is sent at a time to avoid race
513    /// conditions and spamming the homeserver with requests.
514    ///
515    /// Returns `true` if the reaction was added, `false` if it was removed.
516    pub async fn toggle_reaction(
517        &self,
518        item_id: &TimelineEventItemId,
519        reaction_key: &str,
520    ) -> Result<bool, Error> {
521        self.controller.toggle_reaction_local(item_id, reaction_key).await
522    }
523
524    /// Sends an attachment to the room.
525    ///
526    /// It does not currently support local echoes.
527    ///
528    /// If the encryption feature is enabled, this method will transparently
529    /// encrypt the room message if the room is encrypted.
530    ///
531    /// The attachment and its optional thumbnail are stored in the media cache
532    /// and can be retrieved at any time, by calling
533    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
534    /// in the corresponding `TimelineEventItem`, and using a
535    /// `MediaFormat::File`.
536    ///
537    /// # Arguments
538    ///
539    /// * `source` - The source of the attachment to send.
540    ///
541    /// * `mime_type` - The attachment's mime type.
542    ///
543    /// * `config` - An attachment configuration object containing details about
544    ///   the attachment like a thumbnail, its size, duration etc.
545    ///
546    /// [`Media::get_media_content()`]: matrix_sdk::Media::get_media_content
547    #[instrument(skip_all)]
548    pub fn send_attachment(
549        &self,
550        source: impl Into<AttachmentSource>,
551        mime_type: Mime,
552        config: AttachmentConfig,
553    ) -> SendAttachment<'_> {
554        SendAttachment::new(self, source.into(), mime_type, config)
555    }
556
557    /// Sends a media gallery to the room.
558    ///
559    /// If the encryption feature is enabled, this method will transparently
560    /// encrypt the room message if the room is encrypted.
561    ///
562    /// The attachments and their optional thumbnails are stored in the media
563    /// cache and can be retrieved at any time, by calling
564    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
565    /// in the corresponding `TimelineEventItem`, and using a
566    /// `MediaFormat::File`.
567    ///
568    /// # Arguments
569    /// * `gallery` - A configuration object containing details about the
570    ///   gallery like files, thumbnails, etc.
571    ///
572    /// [`Media::get_media_content()`]: matrix_sdk::Media::get_media_content
573    #[cfg(feature = "unstable-msc4274")]
574    #[instrument(skip_all)]
575    pub fn send_gallery(&self, gallery: GalleryConfig) -> SendGallery<'_> {
576        SendGallery::new(self, gallery)
577    }
578
579    /// Redact an event given its [`TimelineEventItemId`] and an optional
580    /// reason.
581    pub async fn redact(
582        &self,
583        item_id: &TimelineEventItemId,
584        reason: Option<&str>,
585    ) -> Result<(), Error> {
586        let items = self.items().await;
587        let Some((_pos, event)) = rfind_event_by_item_id(&items, item_id) else {
588            return Err(RedactError::ItemNotFound(item_id.clone()).into());
589        };
590
591        match event.handle() {
592            TimelineItemHandle::Remote(event_id) => {
593                self.room().redact(event_id, reason, None).await.map_err(RedactError::HttpError)?;
594            }
595            TimelineItemHandle::Local(handle) => {
596                if !handle.abort().await.map_err(RoomSendQueueError::StorageError)? {
597                    return Err(RedactError::InvalidLocalEchoState.into());
598                }
599            }
600        }
601
602        Ok(())
603    }
604
605    /// Fetch unavailable details about the event with the given ID.
606    ///
607    /// This method only works for IDs of remote [`EventTimelineItem`]s,
608    /// to prevent losing details when a local echo is replaced by its
609    /// remote echo.
610    ///
611    /// This method tries to make all the requests it can. If an error is
612    /// encountered for a given request, it is forwarded with the
613    /// [`TimelineDetails::Error`] variant.
614    ///
615    /// # Arguments
616    ///
617    /// * `event_id` - The event ID of the event to fetch details for.
618    ///
619    /// # Errors
620    ///
621    /// Returns an error if the identifier doesn't match any event with a remote
622    /// echo in the timeline, or if the event is removed from the timeline
623    /// before all requests are handled.
624    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
625    pub async fn fetch_details_for_event(&self, event_id: &EventId) -> Result<(), Error> {
626        self.controller.fetch_in_reply_to_details(event_id).await
627    }
628
629    /// Fetch all member events for the room this timeline is displaying.
630    ///
631    /// If the full member list is not known, sender profiles are currently
632    /// likely not going to be available. This will be fixed in the future.
633    ///
634    /// If fetching the members fails, any affected timeline items will have
635    /// the `sender_profile` set to [`TimelineDetails::Error`].
636    #[instrument(skip_all)]
637    pub async fn fetch_members(&self) {
638        self.controller.set_sender_profiles_pending().await;
639        match self.room().sync_members().await {
640            Ok(_) => {
641                self.controller.update_missing_sender_profiles().await;
642            }
643            Err(e) => {
644                self.controller.set_sender_profiles_error(Arc::new(e)).await;
645            }
646        }
647    }
648
649    /// Get the latest read receipt for the given user.
650    ///
651    /// Contrary to [`Room::load_user_receipt()`] that only keeps track of read
652    /// receipts received from the homeserver, this keeps also track of implicit
653    /// read receipts in this timeline, i.e. when a room member sends an event.
654    #[instrument(skip(self))]
655    pub async fn latest_user_read_receipt(
656        &self,
657        user_id: &UserId,
658    ) -> Option<(OwnedEventId, Receipt)> {
659        self.controller.latest_user_read_receipt(user_id).await
660    }
661
662    /// Get the ID of the timeline event with the latest read receipt for the
663    /// given user.
664    ///
665    /// In contrary to [`Self::latest_user_read_receipt()`], this allows to know
666    /// the position of the read receipt in the timeline even if the event it
667    /// applies to is not visible in the timeline, unless the event is unknown
668    /// by this timeline.
669    #[instrument(skip(self))]
670    pub async fn latest_user_read_receipt_timeline_event_id(
671        &self,
672        user_id: &UserId,
673    ) -> Option<OwnedEventId> {
674        self.controller.latest_user_read_receipt_timeline_event_id(user_id).await
675    }
676
677    /// Subscribe to changes in the read receipts of our own user.
678    pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> + use<> {
679        self.controller.subscribe_own_user_read_receipts_changed().await
680    }
681
682    /// Send the given receipt.
683    ///
684    /// This uses [`Room::send_single_receipt`] internally, but checks
685    /// first if the receipt points to an event in this timeline that is more
686    /// recent than the current ones, to avoid unnecessary requests.
687    ///
688    /// If an unthreaded receipt is sent, this will also unset the unread flag
689    /// of the room if necessary.
690    ///
691    /// The thread of the receipt is determined by the timeline instance's
692    /// focus mode and `hide_threaded_events` flag.
693    ///
694    /// Returns a boolean indicating if it sent the receipt or not.
695    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
696    pub async fn send_single_receipt(
697        &self,
698        receipt_type: ReceiptType,
699        event_id: OwnedEventId,
700    ) -> Result<bool> {
701        let thread = self.controller.infer_thread_for_read_receipt(&receipt_type);
702
703        if !self.controller.should_send_receipt(&receipt_type, &thread, &event_id).await {
704            trace!(
705                "not sending receipt, because we already cover the event with a previous receipt"
706            );
707
708            if thread == ReceiptThread::Unthreaded {
709                // Unset the read marker.
710                self.room().set_unread_flag(false).await?;
711            }
712
713            return Ok(false);
714        }
715
716        trace!("sending receipt");
717        self.room().send_single_receipt(receipt_type, thread, event_id).await?;
718        Ok(true)
719    }
720
721    /// Send the given receipts.
722    ///
723    /// This uses [`Room::send_multiple_receipts`] internally, but
724    /// checks first if the receipts point to events in this timeline that
725    /// are more recent than the current ones, to avoid unnecessary
726    /// requests.
727    ///
728    /// This also unsets the unread marker of the room if necessary.
729    #[instrument(skip(self))]
730    pub async fn send_multiple_receipts(&self, mut receipts: Receipts) -> Result<()> {
731        if let Some(fully_read) = &receipts.fully_read
732            && !self
733                .controller
734                .should_send_receipt(
735                    &ReceiptType::FullyRead,
736                    &ReceiptThread::Unthreaded,
737                    fully_read,
738                )
739                .await
740        {
741            receipts.fully_read = None;
742        }
743
744        if let Some(read_receipt) = &receipts.public_read_receipt
745            && !self
746                .controller
747                .should_send_receipt(&ReceiptType::Read, &ReceiptThread::Unthreaded, read_receipt)
748                .await
749        {
750            receipts.public_read_receipt = None;
751        }
752
753        if let Some(private_read_receipt) = &receipts.private_read_receipt
754            && !self
755                .controller
756                .should_send_receipt(
757                    &ReceiptType::ReadPrivate,
758                    &ReceiptThread::Unthreaded,
759                    private_read_receipt,
760                )
761                .await
762        {
763            receipts.private_read_receipt = None;
764        }
765
766        let room = self.room();
767
768        if !receipts.is_empty() {
769            room.send_multiple_receipts(receipts).await?;
770        } else {
771            room.set_unread_flag(false).await?;
772        }
773
774        Ok(())
775    }
776
777    /// Mark the timeline as read by attempting to send a read receipt on the
778    /// latest visible event.
779    ///
780    /// The latest visible event is determined from the timeline's focus kind
781    /// and whether or not it hides threaded events. If no latest event can
782    /// be determined and the timeline is live, the room's unread marker is
783    /// unset instead.
784    ///
785    /// # Arguments
786    ///
787    /// * `receipt_type` - The type of receipt to send. When using
788    ///   [`ReceiptType::FullyRead`], an unthreaded receipt will be sent. This
789    ///   works even if the latest event belongs to a thread, as a threaded
790    ///   reply also belongs to the unthreaded timeline. Otherwise the
791    ///   [`ReceiptThread`] will be determined based on the timeline's focus
792    ///   kind.
793    ///
794    /// # Returns
795    ///
796    /// A boolean indicating if the receipt was sent or not.
797    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
798    pub async fn mark_as_read(&self, receipt_type: ReceiptType) -> Result<bool> {
799        if let Some(event_id) = self.controller.latest_event_id().await {
800            self.send_single_receipt(receipt_type, event_id).await
801        } else {
802            trace!("can't mark room as read because there's no latest event id");
803
804            // For live timelines, unset the read marker in this case.
805            if self.controller.is_live() {
806                self.room().set_unread_flag(false).await?;
807            }
808
809            Ok(false)
810        }
811    }
812
813    /// Adds a new pinned event by sending an updated `m.room.pinned_events`
814    /// event containing the new event id.
815    ///
816    /// This method will first try to get the pinned events from the current
817    /// room's state and if it fails to do so it'll try to load them from the
818    /// homeserver.
819    ///
820    /// Returns `true` if we pinned the event, `false` if the event was already
821    /// pinned.
822    pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
823        let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
824            event_ids
825        } else {
826            self.room().load_pinned_events().await?.unwrap_or_default()
827        };
828        let event_id = event_id.to_owned();
829        if pinned_event_ids.contains(&event_id) {
830            Ok(false)
831        } else {
832            pinned_event_ids.push(event_id);
833            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
834            self.room().send_state_event(content).await?;
835            Ok(true)
836        }
837    }
838
839    /// Removes a pinned event by sending an updated `m.room.pinned_events`
840    /// event without the event id we want to remove.
841    ///
842    /// This method will first try to get the pinned events from the current
843    /// room's state and if it fails to do so it'll try to load them from the
844    /// homeserver.
845    ///
846    /// Returns `true` if we unpinned the event, `false` if the event wasn't
847    /// pinned before.
848    pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
849        let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
850            event_ids
851        } else {
852            self.room().load_pinned_events().await?.unwrap_or_default()
853        };
854        let event_id = event_id.to_owned();
855        if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
856            pinned_event_ids.remove(idx);
857            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
858            self.room().send_state_event(content).await?;
859            Ok(true)
860        } else {
861            Ok(false)
862        }
863    }
864
865    /// Create a [`EmbeddedEvent`] from an arbitrary event, be it in the
866    /// timeline or not.
867    ///
868    /// Can be `None` if the event cannot be represented as a standalone item,
869    /// because it's an aggregation.
870    pub async fn make_replied_to(
871        &self,
872        event: TimelineEvent,
873    ) -> Result<Option<EmbeddedEvent>, Error> {
874        self.controller.make_replied_to(event).await
875    }
876
877    /// Returns whether this timeline is focused on a thread (be it live, or
878    /// from a permalink to a threaded event).
879    pub fn is_threaded(&self) -> bool {
880        self.controller.is_threaded()
881    }
882}
883
884/// Test helpers, likely not very useful in production.
885#[doc(hidden)]
886impl Timeline {
887    /// Get the current list of timeline items.
888    pub async fn items(&self) -> Vector<Arc<TimelineItem>> {
889        self.controller.items().await
890    }
891
892    pub async fn subscribe_filter_map<U: Clone>(
893        &self,
894        f: impl Fn(Arc<TimelineItem>) -> Option<U>,
895    ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>) {
896        let (items, stream) = self.controller.subscribe_filter_map(f).await;
897        let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
898        (items, stream)
899    }
900}
901
902#[derive(Debug)]
903struct TimelineDropHandle {
904    room_update_join_handle: JoinHandle<()>,
905    pinned_events_join_handle: Option<JoinHandle<()>>,
906    thread_update_join_handle: Option<JoinHandle<()>>,
907    local_echo_listener_handle: JoinHandle<()>,
908    _event_cache_drop_handle: Arc<EventCacheDropHandles>,
909    _crypto_drop_handles: CryptoDropHandles,
910}
911
912impl Drop for TimelineDropHandle {
913    fn drop(&mut self) {
914        if let Some(handle) = self.pinned_events_join_handle.take() {
915            handle.abort();
916        }
917
918        if let Some(handle) = self.thread_update_join_handle.take() {
919            handle.abort();
920        }
921
922        self.local_echo_listener_handle.abort();
923        self.room_update_join_handle.abort();
924    }
925}
926
927#[cfg(not(target_family = "wasm"))]
928pub type TimelineEventFilterFn =
929    dyn Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool + Send + Sync;
930#[cfg(target_family = "wasm")]
931pub type TimelineEventFilterFn = dyn Fn(&AnySyncTimelineEvent, &RoomVersionRules) -> bool;
932
933/// A source for sending an attachment.
934///
935/// The [`AttachmentSource::File`] variant can be constructed from any type that
936/// implements `Into<PathBuf>`.
937#[derive(Debug, Clone)]
938pub enum AttachmentSource {
939    /// The data of the attachment.
940    Data {
941        /// The bytes of the attachment.
942        bytes: Vec<u8>,
943
944        /// The filename of the attachment.
945        filename: String,
946    },
947
948    /// An attachment loaded from a file.
949    ///
950    /// The bytes and the filename will be read from the file at the given path.
951    File(PathBuf),
952}
953
954impl AttachmentSource {
955    /// Try to convert this attachment source into a `(bytes, filename)` tuple.
956    pub(crate) fn try_into_bytes_and_filename(self) -> Result<(Vec<u8>, String), Error> {
957        match self {
958            Self::Data { bytes, filename } => Ok((bytes, filename)),
959            Self::File(path) => {
960                let filename = path
961                    .file_name()
962                    .ok_or(Error::InvalidAttachmentFileName)?
963                    .to_str()
964                    .ok_or(Error::InvalidAttachmentFileName)?
965                    .to_owned();
966                let bytes = fs::read(&path).map_err(|_| Error::InvalidAttachmentData)?;
967                Ok((bytes, filename))
968            }
969        }
970    }
971}
972
973impl<P> From<P> for AttachmentSource
974where
975    P: Into<PathBuf>,
976{
977    fn from(value: P) -> Self {
978        Self::File(value.into())
979    }
980}
981
982/// Configuration for sending a gallery.
983///
984/// This duplicates [`matrix_sdk::attachment::GalleryConfig`] but uses an
985/// `AttachmentSource` so that we can delay loading the actual data until we're
986/// inside the SendGallery future. This allows [`Timeline::send_gallery`] to
987/// return early without blocking the caller.
988#[cfg(feature = "unstable-msc4274")]
989#[derive(Debug, Default)]
990pub struct GalleryConfig {
991    pub(crate) txn_id: Option<OwnedTransactionId>,
992    pub(crate) items: Vec<GalleryItemInfo>,
993    pub(crate) caption: Option<TextMessageEventContent>,
994    pub(crate) mentions: Option<Mentions>,
995    pub(crate) in_reply_to: Option<OwnedEventId>,
996}
997
998#[cfg(feature = "unstable-msc4274")]
999impl GalleryConfig {
1000    /// Create a new empty `GalleryConfig`.
1001    pub fn new() -> Self {
1002        Self::default()
1003    }
1004
1005    /// Set the transaction ID to send.
1006    ///
1007    /// # Arguments
1008    ///
1009    /// * `txn_id` - A unique ID that can be attached to a `MessageEvent` held
1010    ///   in its unsigned field as `transaction_id`. If not given, one is
1011    ///   created for the message.
1012    #[must_use]
1013    pub fn txn_id(mut self, txn_id: OwnedTransactionId) -> Self {
1014        self.txn_id = Some(txn_id);
1015        self
1016    }
1017
1018    /// Adds a media item to the gallery.
1019    ///
1020    /// # Arguments
1021    ///
1022    /// * `item` - Information about the item to be added.
1023    #[must_use]
1024    pub fn add_item(mut self, item: GalleryItemInfo) -> Self {
1025        self.items.push(item);
1026        self
1027    }
1028
1029    /// Set the optional caption.
1030    ///
1031    /// # Arguments
1032    ///
1033    /// * `caption` - The optional caption.
1034    pub fn caption(mut self, caption: Option<TextMessageEventContent>) -> Self {
1035        self.caption = caption;
1036        self
1037    }
1038
1039    /// Set the mentions of the message.
1040    ///
1041    /// # Arguments
1042    ///
1043    /// * `mentions` - The mentions of the message.
1044    pub fn mentions(mut self, mentions: Option<Mentions>) -> Self {
1045        self.mentions = mentions;
1046        self
1047    }
1048
1049    /// Set the reply information of the message.
1050    ///
1051    /// # Arguments
1052    ///
1053    /// * `event_id` - The event ID to reply to.
1054    pub fn in_reply_to(mut self, event_id: Option<OwnedEventId>) -> Self {
1055        self.in_reply_to = event_id;
1056        self
1057    }
1058
1059    /// Returns the number of media items in the gallery.
1060    pub fn len(&self) -> usize {
1061        self.items.len()
1062    }
1063
1064    /// Checks whether the gallery contains any media items or not.
1065    pub fn is_empty(&self) -> bool {
1066        self.items.is_empty()
1067    }
1068}
1069
1070#[cfg(feature = "unstable-msc4274")]
1071#[derive(Debug)]
1072/// Metadata for a gallery item
1073pub struct GalleryItemInfo {
1074    /// The attachment source.
1075    pub source: AttachmentSource,
1076    /// The mime type.
1077    pub content_type: Mime,
1078    /// The attachment info.
1079    pub attachment_info: AttachmentInfo,
1080    /// The caption.
1081    pub caption: Option<TextMessageEventContent>,
1082    /// The thumbnail.
1083    pub thumbnail: Option<Thumbnail>,
1084}
1085
1086#[cfg(feature = "unstable-msc4274")]
1087impl TryFrom<GalleryItemInfo> for matrix_sdk::attachment::GalleryItemInfo {
1088    type Error = Error;
1089
1090    fn try_from(value: GalleryItemInfo) -> Result<Self, Self::Error> {
1091        let (data, filename) = value.source.try_into_bytes_and_filename()?;
1092        Ok(matrix_sdk::attachment::GalleryItemInfo {
1093            filename,
1094            content_type: value.content_type,
1095            data,
1096            attachment_info: value.attachment_info,
1097            caption: value.caption,
1098            thumbnail: value.thumbnail,
1099        })
1100    }
1101}
1102
1103#[derive(Clone, Debug)]
1104#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
1105/// The level of read receipt tracking for the timeline.
1106pub enum TimelineReadReceiptTracking {
1107    /// Track read receipts for all events.
1108    AllEvents,
1109    /// Track read receipts only for message-like events.
1110    MessageLikeEvents,
1111    /// Disable read receipt tracking.
1112    Disabled,
1113}
1114
1115impl TimelineReadReceiptTracking {
1116    /// Whether or not read receipt tracking is enabled.
1117    pub fn is_enabled(&self) -> bool {
1118        match self {
1119            TimelineReadReceiptTracking::AllEvents => true,
1120            TimelineReadReceiptTracking::MessageLikeEvents => true,
1121            TimelineReadReceiptTracking::Disabled => false,
1122        }
1123    }
1124}