matrix_sdk_ui/timeline/
mod.rs

1// Copyright 2022 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A high-level view into a room's contents.
16//!
17//! See [`Timeline`] for details.
18
19use std::{fs, path::PathBuf, sync::Arc};
20
21use algorithms::rfind_event_by_item_id;
22use event_item::{extract_room_msg_edit_content, TimelineItemHandle};
23use eyeball_im::VectorDiff;
24use futures_core::Stream;
25use imbl::Vector;
26use matrix_sdk::{
27    attachment::AttachmentConfig,
28    event_cache::{EventCacheDropHandles, RoomEventCache},
29    event_handler::EventHandlerHandle,
30    executor::JoinHandle,
31    room::{edit::EditedContent, Receipts, Room},
32    send_queue::{RoomSendQueueError, SendHandle},
33    Client, Result,
34};
35use mime::Mime;
36use pinned_events_loader::PinnedEventsRoom;
37use ruma::{
38    api::client::receipt::create_receipt::v3::ReceiptType,
39    events::{
40        poll::unstable_start::{NewUnstablePollStartEventContent, UnstablePollStartEventContent},
41        receipt::{Receipt, ReceiptThread},
42        room::{
43            message::{
44                AddMentions, ForwardThread, OriginalRoomMessageEvent,
45                RoomMessageEventContentWithoutRelation,
46            },
47            pinned_events::RoomPinnedEventsEventContent,
48        },
49        AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
50        SyncMessageLikeEvent,
51    },
52    serde::Raw,
53    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, RoomVersionId, UserId,
54};
55use subscriber::TimelineWithDropHandle;
56use thiserror::Error;
57use tracing::{error, instrument, trace, warn};
58
59use self::{
60    algorithms::rfind_event_by_id, controller::TimelineController, futures::SendAttachment,
61};
62
63mod algorithms;
64mod builder;
65mod controller;
66mod date_dividers;
67mod error;
68mod event_handler;
69mod event_item;
70pub mod event_type_filter;
71pub mod futures;
72mod item;
73mod pagination;
74mod pinned_events_loader;
75mod subscriber;
76#[cfg(test)]
77mod tests;
78mod to_device;
79mod traits;
80mod virtual_item;
81
82pub use self::{
83    builder::TimelineBuilder,
84    controller::default_event_filter,
85    error::*,
86    event_item::{
87        AnyOtherFullStateEventContent, EncryptedMessage, EventItemOrigin, EventSendState,
88        EventTimelineItem, InReplyToDetails, MemberProfileChange, MembershipChange, Message,
89        OtherState, PollResult, PollState, Profile, ReactionInfo, ReactionStatus,
90        ReactionsByKeyBySender, RepliedToEvent, RoomMembershipChange, RoomPinnedEventsChange,
91        Sticker, TimelineDetails, TimelineEventItemId, TimelineItemContent,
92    },
93    event_type_filter::TimelineEventTypeFilter,
94    item::{TimelineItem, TimelineItemKind, TimelineUniqueId},
95    pagination::LiveBackPaginationStatus,
96    traits::RoomExt,
97    virtual_item::VirtualTimelineItem,
98};
99
100/// Information needed to reply to an event.
101#[derive(Debug, Clone)]
102pub struct RepliedToInfo {
103    /// The event ID of the event to reply to.
104    event_id: OwnedEventId,
105    /// The sender of the event to reply to.
106    sender: OwnedUserId,
107    /// The timestamp of the event to reply to.
108    timestamp: MilliSecondsSinceUnixEpoch,
109    /// The content of the event to reply to.
110    content: ReplyContent,
111}
112
113impl RepliedToInfo {
114    /// The event ID of the event to reply to.
115    pub fn event_id(&self) -> &EventId {
116        &self.event_id
117    }
118
119    /// The sender of the event to reply to.
120    pub fn sender(&self) -> &UserId {
121        &self.sender
122    }
123
124    /// The content of the event to reply to.
125    pub fn content(&self) -> &ReplyContent {
126        &self.content
127    }
128}
129
130/// The content of a reply.
131#[derive(Debug, Clone)]
132pub enum ReplyContent {
133    /// Content of a message event.
134    Message(Message),
135    /// Content of any other kind of event stored as raw JSON.
136    Raw(Raw<AnySyncTimelineEvent>),
137}
138
139/// A high-level view into a regular¹ room's contents.
140///
141/// ¹ This type is meant to be used in the context of rooms without a
142/// `room_type`, that is rooms that are primarily used to exchange text
143/// messages.
144#[derive(Debug)]
145pub struct Timeline {
146    /// Cloneable, inner fields of the `Timeline`, shared with some background
147    /// tasks.
148    controller: TimelineController,
149
150    /// The event cache specialized for this room's view.
151    event_cache: RoomEventCache,
152
153    /// References to long-running tasks held by the timeline.
154    drop_handle: Arc<TimelineDropHandle>,
155}
156
157/// What should the timeline focus on?
158#[derive(Clone, Debug, PartialEq)]
159pub enum TimelineFocus {
160    /// Focus on live events, i.e. receive events from sync and append them in
161    /// real-time.
162    Live,
163
164    /// Focus on a specific event, e.g. after clicking a permalink.
165    Event { target: OwnedEventId, num_context_events: u16 },
166
167    /// Only show pinned events.
168    PinnedEvents { max_events_to_load: u16, max_concurrent_requests: u16 },
169}
170
171impl TimelineFocus {
172    pub(super) fn debug_string(&self) -> String {
173        match self {
174            TimelineFocus::Live => "live".to_owned(),
175            TimelineFocus::Event { target, .. } => format!("permalink:{target}"),
176            TimelineFocus::PinnedEvents { .. } => "pinned-events".to_owned(),
177        }
178    }
179}
180
181/// Changes how dividers get inserted, either in between each day or in between
182/// each month
183#[derive(Debug, Clone)]
184pub enum DateDividerMode {
185    Daily,
186    Monthly,
187}
188
189impl Timeline {
190    /// Create a new [`TimelineBuilder`] for the given room.
191    pub fn builder(room: &Room) -> TimelineBuilder {
192        TimelineBuilder::new(room)
193    }
194
195    /// Returns the room for this timeline.
196    pub fn room(&self) -> &Room {
197        self.controller.room()
198    }
199
200    /// Clear all timeline items.
201    pub async fn clear(&self) {
202        self.controller.clear().await;
203    }
204
205    /// Retry decryption of previously un-decryptable events given a list of
206    /// session IDs whose keys have been imported.
207    ///
208    /// # Examples
209    ///
210    /// ```no_run
211    /// # use std::{path::PathBuf, time::Duration};
212    /// # use matrix_sdk::{Client, config::SyncSettings, ruma::room_id};
213    /// # use matrix_sdk_ui::Timeline;
214    /// # async {
215    /// # let mut client: Client = todo!();
216    /// # let room_id = ruma::room_id!("!example:example.org");
217    /// # let timeline: Timeline = todo!();
218    /// let path = PathBuf::from("/home/example/e2e-keys.txt");
219    /// let result =
220    ///     client.encryption().import_room_keys(path, "secret-passphrase").await?;
221    ///
222    /// // Given a timeline for a specific room_id
223    /// if let Some(keys_for_users) = result.keys.get(room_id) {
224    ///     let session_ids = keys_for_users.values().flatten();
225    ///     timeline.retry_decryption(session_ids).await;
226    /// }
227    /// # anyhow::Ok(()) };
228    /// ```
229    pub async fn retry_decryption<S: Into<String>>(
230        &self,
231        session_ids: impl IntoIterator<Item = S>,
232    ) {
233        self.controller
234            .retry_event_decryption(
235                self.room(),
236                Some(session_ids.into_iter().map(Into::into).collect()),
237            )
238            .await;
239    }
240
241    #[tracing::instrument(skip(self))]
242    async fn retry_decryption_for_all_events(&self) {
243        self.controller.retry_event_decryption(self.room(), None).await;
244    }
245
246    /// Get the current timeline item for the given event ID, if any.
247    ///
248    /// Will return a remote event, *or* a local echo that has been sent but not
249    /// yet replaced by a remote echo.
250    ///
251    /// It's preferable to store the timeline items in the model for your UI, if
252    /// possible, instead of just storing IDs and coming back to the timeline
253    /// object to look up items.
254    pub async fn item_by_event_id(&self, event_id: &EventId) -> Option<EventTimelineItem> {
255        let items = self.controller.items().await;
256        let (_, item) = rfind_event_by_id(&items, event_id)?;
257        Some(item.to_owned())
258    }
259
260    /// Get the latest of the timeline's event items.
261    pub async fn latest_event(&self) -> Option<EventTimelineItem> {
262        if self.controller.is_live().await {
263            self.controller.items().await.last()?.as_event().cloned()
264        } else {
265            None
266        }
267    }
268
269    /// Get the current timeline items, along with a stream of updates of
270    /// timeline items.
271    ///
272    /// The stream produces `Vec<VectorDiff<_>>`, which means multiple updates
273    /// at once. There are no delays, it consumes as many updates as possible
274    /// and batches them.
275    pub async fn subscribe(
276        &self,
277    ) -> (Vector<Arc<TimelineItem>>, impl Stream<Item = Vec<VectorDiff<Arc<TimelineItem>>>>) {
278        let (items, stream) = self.controller.subscribe().await;
279        let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
280        (items, stream)
281    }
282
283    /// Send a message to the room, and add it to the timeline as a local echo.
284    ///
285    /// For simplicity, this method doesn't currently allow custom message
286    /// types.
287    ///
288    /// If the encryption feature is enabled, this method will transparently
289    /// encrypt the room message if the room is encrypted.
290    ///
291    /// If sending the message fails, the local echo item will change its
292    /// `send_state` to [`EventSendState::SendingFailed`].
293    ///
294    /// # Arguments
295    ///
296    /// * `content` - The content of the message event.
297    ///
298    /// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned
299    /// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent
300    #[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
301    pub async fn send(
302        &self,
303        content: AnyMessageLikeEventContent,
304    ) -> Result<SendHandle, RoomSendQueueError> {
305        self.room().send_queue().send(content).await
306    }
307
308    /// Send a reply to the given event.
309    ///
310    /// Currently it only supports events with an event ID and JSON being
311    /// available (which can be removed by local redactions). This is subject to
312    /// change. Please check [`EventTimelineItem::can_be_replied_to`] to decide
313    /// whether to render a reply button.
314    ///
315    /// The sender will be added to the mentions of the reply if
316    /// and only if the event has not been written by the sender.
317    ///
318    /// # Arguments
319    ///
320    /// * `content` - The content of the reply
321    ///
322    /// * `replied_to_info` - A wrapper that contains the event ID, sender,
323    ///   content and timestamp of the event to reply to
324    ///
325    /// * `forward_thread` - Usually `Yes`, unless you explicitly want to the
326    ///   reply to show up in the main timeline even though the `reply_item` is
327    ///   part of a thread
328    #[instrument(skip(self, content, replied_to_info))]
329    pub async fn send_reply(
330        &self,
331        content: RoomMessageEventContentWithoutRelation,
332        replied_to_info: RepliedToInfo,
333        forward_thread: ForwardThread,
334    ) -> Result<(), RoomSendQueueError> {
335        let event_id = replied_to_info.event_id;
336
337        // [The specification](https://spec.matrix.org/v1.10/client-server-api/#user-and-room-mentions) says:
338        //
339        // > Users should not add their own Matrix ID to the `m.mentions` property as
340        // > outgoing messages cannot self-notify.
341        //
342        // If the replied to event has been written by the current user, let's toggle to
343        // `AddMentions::No`.
344        let mention_the_sender = if self.room().own_user_id() == replied_to_info.sender {
345            AddMentions::No
346        } else {
347            AddMentions::Yes
348        };
349
350        let content = match replied_to_info.content {
351            ReplyContent::Message(msg) => {
352                let event = OriginalRoomMessageEvent {
353                    event_id: event_id.to_owned(),
354                    sender: replied_to_info.sender,
355                    origin_server_ts: replied_to_info.timestamp,
356                    room_id: self.room().room_id().to_owned(),
357                    content: msg.to_content(),
358                    unsigned: Default::default(),
359                };
360                content.make_reply_to(&event, forward_thread, mention_the_sender)
361            }
362            ReplyContent::Raw(raw_event) => content.make_reply_to_raw(
363                &raw_event,
364                event_id.to_owned(),
365                self.room().room_id(),
366                forward_thread,
367                mention_the_sender,
368            ),
369        };
370
371        self.send(content.into()).await?;
372
373        Ok(())
374    }
375
376    /// Get the information needed to reply to the event with the given ID.
377    pub async fn replied_to_info_from_event_id(
378        &self,
379        event_id: &EventId,
380    ) -> Result<RepliedToInfo, UnsupportedReplyItem> {
381        if let Some(timeline_item) = self.item_by_event_id(event_id).await {
382            return timeline_item.replied_to_info();
383        }
384
385        let event = self.room().event(event_id, None).await.map_err(|error| {
386            error!("Failed to fetch event with ID {event_id} with error: {error}");
387            UnsupportedReplyItem::MissingEvent
388        })?;
389
390        let raw_sync_event = event.into_raw();
391        let sync_event = raw_sync_event.deserialize().map_err(|error| {
392            error!("Failed to deserialize event with ID {event_id} with error: {error}");
393            UnsupportedReplyItem::FailedToDeserializeEvent
394        })?;
395
396        let reply_content = match &sync_event {
397            AnySyncTimelineEvent::MessageLike(message_like_event) => {
398                if let AnySyncMessageLikeEvent::RoomMessage(SyncMessageLikeEvent::Original(
399                    original_message,
400                )) = message_like_event
401                {
402                    // We don't have access to reactions here.
403                    let reactions = Default::default();
404                    ReplyContent::Message(Message::from_event(
405                        original_message.content.clone(),
406                        extract_room_msg_edit_content(message_like_event.relations()),
407                        &self.items().await,
408                        reactions,
409                    ))
410                } else {
411                    ReplyContent::Raw(raw_sync_event)
412                }
413            }
414            AnySyncTimelineEvent::State(_) => return Err(UnsupportedReplyItem::StateEvent),
415        };
416
417        Ok(RepliedToInfo {
418            event_id: event_id.to_owned(),
419            sender: sync_event.sender().to_owned(),
420            timestamp: sync_event.origin_server_ts(),
421            content: reply_content,
422        })
423    }
424
425    /// Edit an event given its [`TimelineEventItemId`] and some new content.
426    ///
427    /// Only supports events for which [`EventTimelineItem::is_editable()`]
428    /// returns `true`.
429    #[instrument(skip(self, new_content))]
430    pub async fn edit(
431        &self,
432        item_id: &TimelineEventItemId,
433        new_content: EditedContent,
434    ) -> Result<(), Error> {
435        let items = self.items().await;
436        let Some((_pos, item)) = rfind_event_by_item_id(&items, item_id) else {
437            return Err(Error::EventNotInTimeline(item_id.clone()));
438        };
439
440        match item.handle() {
441            TimelineItemHandle::Remote(event_id) => {
442                let content = self
443                    .room()
444                    .make_edit_event(event_id, new_content)
445                    .await
446                    .map_err(EditError::RoomError)?;
447                self.send(content).await?;
448                Ok(())
449            }
450
451            TimelineItemHandle::Local(handle) => {
452                // Relations are filled by the editing code itself.
453                let new_content: AnyMessageLikeEventContent = match new_content {
454                    EditedContent::RoomMessage(message) => {
455                        if matches!(item.content, TimelineItemContent::Message(_)) {
456                            AnyMessageLikeEventContent::RoomMessage(message.into())
457                        } else {
458                            return Err(EditError::ContentMismatch {
459                                original: item.content.debug_string().to_owned(),
460                                new: "a message".to_owned(),
461                            }
462                            .into());
463                        }
464                    }
465
466                    EditedContent::PollStart { new_content, .. } => {
467                        if matches!(item.content, TimelineItemContent::Poll(_)) {
468                            AnyMessageLikeEventContent::UnstablePollStart(
469                                UnstablePollStartEventContent::New(
470                                    NewUnstablePollStartEventContent::new(new_content),
471                                ),
472                            )
473                        } else {
474                            return Err(EditError::ContentMismatch {
475                                original: item.content.debug_string().to_owned(),
476                                new: "a poll".to_owned(),
477                            }
478                            .into());
479                        }
480                    }
481
482                    EditedContent::MediaCaption { caption, formatted_caption, mentions } => {
483                        if handle
484                            .edit_media_caption(caption, formatted_caption, mentions)
485                            .await
486                            .map_err(RoomSendQueueError::StorageError)?
487                        {
488                            return Ok(());
489                        }
490                        return Err(EditError::InvalidLocalEchoState.into());
491                    }
492                };
493
494                if !handle.edit(new_content).await.map_err(RoomSendQueueError::StorageError)? {
495                    return Err(EditError::InvalidLocalEchoState.into());
496                }
497
498                Ok(())
499            }
500        }
501    }
502
503    /// Toggle a reaction on an event.
504    ///
505    /// Adds or redacts a reaction based on the state of the reaction at the
506    /// time it is called.
507    ///
508    /// When redacting a previous reaction, the redaction reason is not set.
509    ///
510    /// Ensures that only one reaction is sent at a time to avoid race
511    /// conditions and spamming the homeserver with requests.
512    pub async fn toggle_reaction(
513        &self,
514        item_id: &TimelineEventItemId,
515        reaction_key: &str,
516    ) -> Result<(), Error> {
517        self.controller.toggle_reaction_local(item_id, reaction_key).await?;
518        Ok(())
519    }
520
521    /// Sends an attachment to the room.
522    ///
523    /// It does not currently support local echoes.
524    ///
525    /// If the encryption feature is enabled, this method will transparently
526    /// encrypt the room message if the room is encrypted.
527    ///
528    /// The attachment and its optional thumbnail are stored in the media cache
529    /// and can be retrieved at any time, by calling
530    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
531    /// in the corresponding `TimelineEventItem`, and using a
532    /// `MediaFormat::File`.
533    ///
534    /// # Arguments
535    ///
536    /// * `source` - The source of the attachment to send.
537    ///
538    /// * `mime_type` - The attachment's mime type.
539    ///
540    /// * `config` - An attachment configuration object containing details about
541    ///   the attachment like a thumbnail, its size, duration etc.
542    ///
543    /// [`Media::get_media_content()`]: matrix_sdk::Media::get_media_content
544    #[instrument(skip_all)]
545    pub fn send_attachment(
546        &self,
547        source: impl Into<AttachmentSource>,
548        mime_type: Mime,
549        config: AttachmentConfig,
550    ) -> SendAttachment<'_> {
551        SendAttachment::new(self, source.into(), mime_type, config)
552    }
553
554    /// Redact an event given its [`TimelineEventItemId`] and an optional
555    /// reason.
556    pub async fn redact(
557        &self,
558        item_id: &TimelineEventItemId,
559        reason: Option<&str>,
560    ) -> Result<(), Error> {
561        let items = self.items().await;
562        let Some((_pos, event)) = rfind_event_by_item_id(&items, item_id) else {
563            return Err(RedactError::ItemNotFound(item_id.clone()).into());
564        };
565
566        match event.handle() {
567            TimelineItemHandle::Remote(event_id) => {
568                self.room().redact(event_id, reason, None).await.map_err(RedactError::HttpError)?;
569            }
570            TimelineItemHandle::Local(handle) => {
571                if !handle.abort().await.map_err(RoomSendQueueError::StorageError)? {
572                    return Err(RedactError::InvalidLocalEchoState.into());
573                }
574            }
575        }
576
577        Ok(())
578    }
579
580    /// Fetch unavailable details about the event with the given ID.
581    ///
582    /// This method only works for IDs of remote [`EventTimelineItem`]s,
583    /// to prevent losing details when a local echo is replaced by its
584    /// remote echo.
585    ///
586    /// This method tries to make all the requests it can. If an error is
587    /// encountered for a given request, it is forwarded with the
588    /// [`TimelineDetails::Error`] variant.
589    ///
590    /// # Arguments
591    ///
592    /// * `event_id` - The event ID of the event to fetch details for.
593    ///
594    /// # Errors
595    ///
596    /// Returns an error if the identifier doesn't match any event with a remote
597    /// echo in the timeline, or if the event is removed from the timeline
598    /// before all requests are handled.
599    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
600    pub async fn fetch_details_for_event(&self, event_id: &EventId) -> Result<(), Error> {
601        self.controller.fetch_in_reply_to_details(event_id).await
602    }
603
604    /// Fetch all member events for the room this timeline is displaying.
605    ///
606    /// If the full member list is not known, sender profiles are currently
607    /// likely not going to be available. This will be fixed in the future.
608    ///
609    /// If fetching the members fails, any affected timeline items will have
610    /// the `sender_profile` set to [`TimelineDetails::Error`].
611    #[instrument(skip_all)]
612    pub async fn fetch_members(&self) {
613        self.controller.set_sender_profiles_pending().await;
614        match self.room().sync_members().await {
615            Ok(_) => {
616                self.controller.update_missing_sender_profiles().await;
617            }
618            Err(e) => {
619                self.controller.set_sender_profiles_error(Arc::new(e)).await;
620            }
621        }
622    }
623
624    /// Get the latest read receipt for the given user.
625    ///
626    /// Contrary to [`Room::load_user_receipt()`] that only keeps track of read
627    /// receipts received from the homeserver, this keeps also track of implicit
628    /// read receipts in this timeline, i.e. when a room member sends an event.
629    #[instrument(skip(self))]
630    pub async fn latest_user_read_receipt(
631        &self,
632        user_id: &UserId,
633    ) -> Option<(OwnedEventId, Receipt)> {
634        self.controller.latest_user_read_receipt(user_id).await
635    }
636
637    /// Get the ID of the timeline event with the latest read receipt for the
638    /// given user.
639    ///
640    /// In contrary to [`Self::latest_user_read_receipt()`], this allows to know
641    /// the position of the read receipt in the timeline even if the event it
642    /// applies to is not visible in the timeline, unless the event is unknown
643    /// by this timeline.
644    #[instrument(skip(self))]
645    pub async fn latest_user_read_receipt_timeline_event_id(
646        &self,
647        user_id: &UserId,
648    ) -> Option<OwnedEventId> {
649        self.controller.latest_user_read_receipt_timeline_event_id(user_id).await
650    }
651
652    /// Subscribe to changes in the read receipts of our own user.
653    pub async fn subscribe_own_user_read_receipts_changed(&self) -> impl Stream<Item = ()> {
654        self.controller.subscribe_own_user_read_receipts_changed().await
655    }
656
657    /// Send the given receipt.
658    ///
659    /// This uses [`Room::send_single_receipt`] internally, but checks
660    /// first if the receipt points to an event in this timeline that is more
661    /// recent than the current ones, to avoid unnecessary requests.
662    ///
663    /// Returns a boolean indicating if it sent the request or not.
664    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
665    pub async fn send_single_receipt(
666        &self,
667        receipt_type: ReceiptType,
668        thread: ReceiptThread,
669        event_id: OwnedEventId,
670    ) -> Result<bool> {
671        if !self.controller.should_send_receipt(&receipt_type, &thread, &event_id).await {
672            trace!(
673                "not sending receipt, because we already cover the event with a previous receipt"
674            );
675            return Ok(false);
676        }
677
678        trace!("sending receipt");
679        self.room().send_single_receipt(receipt_type, thread, event_id).await?;
680        Ok(true)
681    }
682
683    /// Send the given receipts.
684    ///
685    /// This uses [`Room::send_multiple_receipts`] internally, but
686    /// checks first if the receipts point to events in this timeline that
687    /// are more recent than the current ones, to avoid unnecessary
688    /// requests.
689    #[instrument(skip(self))]
690    pub async fn send_multiple_receipts(&self, mut receipts: Receipts) -> Result<()> {
691        if let Some(fully_read) = &receipts.fully_read {
692            if !self
693                .controller
694                .should_send_receipt(
695                    &ReceiptType::FullyRead,
696                    &ReceiptThread::Unthreaded,
697                    fully_read,
698                )
699                .await
700            {
701                receipts.fully_read = None;
702            }
703        }
704
705        if let Some(read_receipt) = &receipts.public_read_receipt {
706            if !self
707                .controller
708                .should_send_receipt(&ReceiptType::Read, &ReceiptThread::Unthreaded, read_receipt)
709                .await
710            {
711                receipts.public_read_receipt = None;
712            }
713        }
714
715        if let Some(private_read_receipt) = &receipts.private_read_receipt {
716            if !self
717                .controller
718                .should_send_receipt(
719                    &ReceiptType::ReadPrivate,
720                    &ReceiptThread::Unthreaded,
721                    private_read_receipt,
722                )
723                .await
724            {
725                receipts.private_read_receipt = None;
726            }
727        }
728
729        self.room().send_multiple_receipts(receipts).await
730    }
731
732    /// Mark the room as read by sending an unthreaded read receipt on the
733    /// latest event, be it visible or not.
734    ///
735    /// This works even if the latest event belongs to a thread, as a threaded
736    /// reply also belongs to the unthreaded timeline. No threaded receipt
737    /// will be sent here (see also #3123).
738    ///
739    /// Returns a boolean indicating if we sent the request or not.
740    #[instrument(skip(self), fields(room_id = ?self.room().room_id()))]
741    pub async fn mark_as_read(&self, receipt_type: ReceiptType) -> Result<bool> {
742        if let Some(event_id) = self.controller.latest_event_id().await {
743            self.send_single_receipt(receipt_type, ReceiptThread::Unthreaded, event_id).await
744        } else {
745            trace!("can't mark room as read because there's no latest event id");
746            Ok(false)
747        }
748    }
749
750    /// Adds a new pinned event by sending an updated `m.room.pinned_events`
751    /// event containing the new event id.
752    ///
753    /// This method will first try to get the pinned events from the current
754    /// room's state and if it fails to do so it'll try to load them from the
755    /// homeserver.
756    ///
757    /// Returns `true` if we pinned the event, `false` if the event was already
758    /// pinned.
759    pub async fn pin_event(&self, event_id: &EventId) -> Result<bool> {
760        let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
761            event_ids
762        } else {
763            self.room().load_pinned_events().await?.unwrap_or_default()
764        };
765        let event_id = event_id.to_owned();
766        if pinned_event_ids.contains(&event_id) {
767            Ok(false)
768        } else {
769            pinned_event_ids.push(event_id);
770            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
771            self.room().send_state_event(content).await?;
772            Ok(true)
773        }
774    }
775
776    /// Removes a pinned event by sending an updated `m.room.pinned_events`
777    /// event without the event id we want to remove.
778    ///
779    /// This method will first try to get the pinned events from the current
780    /// room's state and if it fails to do so it'll try to load them from the
781    /// homeserver.
782    ///
783    /// Returns `true` if we unpinned the event, `false` if the event wasn't
784    /// pinned before.
785    pub async fn unpin_event(&self, event_id: &EventId) -> Result<bool> {
786        let mut pinned_event_ids = if let Some(event_ids) = self.room().pinned_event_ids() {
787            event_ids
788        } else {
789            self.room().load_pinned_events().await?.unwrap_or_default()
790        };
791        let event_id = event_id.to_owned();
792        if let Some(idx) = pinned_event_ids.iter().position(|e| *e == *event_id) {
793            pinned_event_ids.remove(idx);
794            let content = RoomPinnedEventsEventContent::new(pinned_event_ids);
795            self.room().send_state_event(content).await?;
796            Ok(true)
797        } else {
798            Ok(false)
799        }
800    }
801}
802
803/// Test helpers, likely not very useful in production.
804#[doc(hidden)]
805impl Timeline {
806    /// Get the current list of timeline items.
807    pub async fn items(&self) -> Vector<Arc<TimelineItem>> {
808        self.controller.items().await
809    }
810
811    pub async fn subscribe_filter_map<U: Clone>(
812        &self,
813        f: impl Fn(Arc<TimelineItem>) -> Option<U>,
814    ) -> (Vector<U>, impl Stream<Item = VectorDiff<U>>) {
815        let (items, stream) = self.controller.subscribe_filter_map(f).await;
816        let stream = TimelineWithDropHandle::new(stream, self.drop_handle.clone());
817        (items, stream)
818    }
819}
820
821#[derive(Debug)]
822struct TimelineDropHandle {
823    client: Client,
824    event_handler_handles: Vec<EventHandlerHandle>,
825    room_update_join_handle: JoinHandle<()>,
826    pinned_events_join_handle: Option<JoinHandle<()>>,
827    room_key_from_backups_join_handle: JoinHandle<()>,
828    room_keys_received_join_handle: JoinHandle<()>,
829    room_key_backup_enabled_join_handle: JoinHandle<()>,
830    local_echo_listener_handle: JoinHandle<()>,
831    _event_cache_drop_handle: Arc<EventCacheDropHandles>,
832    encryption_changes_handle: JoinHandle<()>,
833}
834
835impl Drop for TimelineDropHandle {
836    fn drop(&mut self) {
837        for handle in self.event_handler_handles.drain(..) {
838            self.client.remove_event_handler(handle);
839        }
840
841        if let Some(handle) = self.pinned_events_join_handle.take() {
842            handle.abort()
843        };
844
845        self.local_echo_listener_handle.abort();
846        self.room_update_join_handle.abort();
847        self.room_key_from_backups_join_handle.abort();
848        self.room_key_backup_enabled_join_handle.abort();
849        self.room_keys_received_join_handle.abort();
850        self.encryption_changes_handle.abort();
851    }
852}
853
854pub type TimelineEventFilterFn =
855    dyn Fn(&AnySyncTimelineEvent, &RoomVersionId) -> bool + Send + Sync;
856
857/// A source for sending an attachment.
858///
859/// The [`AttachmentSource::File`] variant can be constructed from any type that
860/// implements `Into<PathBuf>`.
861#[derive(Debug, Clone)]
862pub enum AttachmentSource {
863    /// The data of the attachment.
864    Data {
865        /// The bytes of the attachment.
866        bytes: Vec<u8>,
867
868        /// The filename of the attachment.
869        filename: String,
870    },
871
872    /// An attachment loaded from a file.
873    ///
874    /// The bytes and the filename will be read from the file at the given path.
875    File(PathBuf),
876}
877
878impl AttachmentSource {
879    /// Try to convert this attachment source into a `(bytes, filename)` tuple.
880    pub(crate) fn try_into_bytes_and_filename(self) -> Result<(Vec<u8>, String), Error> {
881        match self {
882            Self::Data { bytes, filename } => Ok((bytes, filename)),
883            Self::File(path) => {
884                let filename = path
885                    .file_name()
886                    .ok_or(Error::InvalidAttachmentFileName)?
887                    .to_str()
888                    .ok_or(Error::InvalidAttachmentFileName)?
889                    .to_owned();
890                let bytes = fs::read(&path).map_err(|_| Error::InvalidAttachmentData)?;
891                Ok((bytes, filename))
892            }
893        }
894    }
895}
896
897impl<P> From<P> for AttachmentSource
898where
899    P: Into<PathBuf>,
900{
901    fn from(value: P) -> Self {
902        Self::File(value.into())
903    }
904}