matrix_sdk_ui/timeline/
pinned_events_loader.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Formatter, sync::Arc};
16
17use futures_util::{stream, StreamExt};
18use matrix_sdk::{
19    config::RequestConfig, event_cache::paginator::PaginatorError, BoxFuture, Room,
20    SendOutsideWasm, SyncOutsideWasm,
21};
22use matrix_sdk_base::deserialized_responses::TimelineEvent;
23use ruma::{events::relation::RelationType, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId};
24use thiserror::Error;
25use tracing::{debug, warn};
26
27/// Utility to load the pinned events in a room.
28pub struct PinnedEventsLoader {
29    /// Backend to load pinned events.
30    room: Arc<dyn PinnedEventsRoom>,
31
32    /// Maximum number of pinned events to load (either from network or the
33    /// cache).
34    max_events_to_load: usize,
35
36    /// Number of requests to load pinned events that can run concurrently. This
37    /// is used to avoid overwhelming a home server with dozens or hundreds
38    /// of concurrent requests.
39    max_concurrent_requests: usize,
40}
41
42impl PinnedEventsLoader {
43    /// Creates a new `PinnedEventsLoader` instance.
44    pub fn new(
45        room: Arc<dyn PinnedEventsRoom>,
46        max_events_to_load: usize,
47        max_concurrent_requests: usize,
48    ) -> Self {
49        Self { room, max_events_to_load, max_concurrent_requests }
50    }
51
52    /// Loads the pinned events in this room, using the cache first and then
53    /// requesting the event from the homeserver if it couldn't be found.
54    /// This method will perform as many concurrent requests for events as
55    /// `max_concurrent_requests` allows, to avoid overwhelming the server.
56    ///
57    /// It returns a `Result` with either a
58    /// chronologically sorted list of retrieved [`TimelineEvent`]s
59    /// or a [`PinnedEventsLoaderError`].
60    pub async fn load_events(&self) -> Result<Vec<TimelineEvent>, PinnedEventsLoaderError> {
61        let pinned_event_ids: Vec<OwnedEventId> = self
62            .room
63            .pinned_event_ids()
64            .unwrap_or_default()
65            .into_iter()
66            .rev()
67            .take(self.max_events_to_load)
68            .rev()
69            .collect();
70
71        if pinned_event_ids.is_empty() {
72            return Ok(Vec::new());
73        }
74
75        let request_config = Some(RequestConfig::default().retry_limit(3));
76
77        let mut loaded_events: Vec<TimelineEvent> =
78            stream::iter(pinned_event_ids.into_iter().map(|event_id| {
79                let provider = self.room.clone();
80                let relations_filter =
81                    Some(vec![RelationType::Annotation, RelationType::Replacement]);
82                async move {
83                    match provider
84                        .load_event_with_relations(&event_id, request_config, relations_filter)
85                        .await
86                    {
87                        Ok((event, related_events)) => {
88                            let mut events = vec![event];
89                            events.extend(related_events);
90                            Some(events)
91                        }
92                        Err(err) => {
93                            warn!("error when loading pinned event: {err}");
94                            None
95                        }
96                    }
97                }
98            }))
99            .buffer_unordered(self.max_concurrent_requests)
100            // Get only the `Some<Vec<_>>` results
101            .flat_map(stream::iter)
102            // Flatten the `Vec`s into a single one containing all their items
103            .flat_map(stream::iter)
104            .collect()
105            .await;
106
107        if loaded_events.is_empty() {
108            return Err(PinnedEventsLoaderError::TimelineReloadFailed);
109        }
110
111        // Sort using chronological ordering (oldest -> newest)
112        loaded_events.sort_by_key(|item| {
113            item.raw()
114                .deserialize()
115                .map(|e| e.origin_server_ts())
116                .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
117        });
118
119        Ok(loaded_events)
120    }
121}
122
123pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
124    /// Load a single room event using the cache or network and any events
125    /// related to it, if they are cached.
126    ///
127    /// You can control which types of related events are retrieved using
128    /// `related_event_filters`. A `None` value will retrieve any type of
129    /// related event.
130    fn load_event_with_relations<'a>(
131        &'a self,
132        event_id: &'a EventId,
133        request_config: Option<RequestConfig>,
134        related_event_filters: Option<Vec<RelationType>>,
135    ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), PaginatorError>>;
136
137    /// Get the pinned event ids for a room.
138    fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>>;
139
140    /// Checks whether an event id is pinned in this room.
141    ///
142    /// It avoids having to clone the whole list of event ids to check a single
143    /// value.
144    fn is_pinned_event(&self, event_id: &EventId) -> bool;
145}
146
147impl PinnedEventsRoom for Room {
148    fn load_event_with_relations<'a>(
149        &'a self,
150        event_id: &'a EventId,
151        request_config: Option<RequestConfig>,
152        related_event_filters: Option<Vec<RelationType>>,
153    ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), PaginatorError>> {
154        Box::pin(async move {
155            if let Ok((cache, _handles)) = self.event_cache().await {
156                if let Some(ret) = cache.event_with_relations(event_id, related_event_filters).await
157                {
158                    debug!("Loaded pinned event {event_id} and related events from cache");
159                    return Ok(ret);
160                }
161            }
162
163            debug!("Loading pinned event {event_id} from HS");
164            self.event(event_id, request_config)
165                .await
166                .map(|e| (e, Vec::new()))
167                .map_err(|err| PaginatorError::SdkError(Box::new(err)))
168        })
169    }
170
171    fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>> {
172        self.clone_info().pinned_event_ids()
173    }
174
175    fn is_pinned_event(&self, event_id: &EventId) -> bool {
176        self.clone_info().is_pinned_event(event_id)
177    }
178}
179
180#[cfg(not(tarpaulin_include))]
181impl std::fmt::Debug for PinnedEventsLoader {
182    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
183        f.debug_struct("PinnedEventsLoader")
184            .field("max_events_to_load", &self.max_events_to_load)
185            .finish()
186    }
187}
188
189/// Errors related to `PinnedEventsLoader` usage.
190#[derive(Error, Debug)]
191pub enum PinnedEventsLoaderError {
192    #[error("No event found for the given event id.")]
193    EventNotFound(OwnedEventId),
194
195    #[error("Timeline focus is not pinned events.")]
196    TimelineFocusNotPinnedEvents,
197
198    #[error("Could not load pinned events.")]
199    TimelineReloadFailed,
200}