matrix_sdk_ui/timeline/
pinned_events_loader.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fmt::Formatter, sync::Arc};
16
17use futures_util::{StreamExt, stream};
18use matrix_sdk::{
19    BoxFuture, Room, SendOutsideWasm, SyncOutsideWasm,
20    config::RequestConfig,
21    room::{IncludeRelations, RelationsOptions},
22};
23use matrix_sdk_base::deserialized_responses::TimelineEvent;
24use ruma::{
25    EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, events::relation::RelationType, uint,
26};
27use thiserror::Error;
28use tokio::sync::Mutex;
29use tracing::{debug, warn};
30
31/// Utility to load the pinned events in a room.
32pub struct PinnedEventsLoader {
33    /// Backend to load pinned events.
34    room: Arc<dyn PinnedEventsRoom>,
35
36    /// A list of pinned event ids we've observed previously.
37    ///
38    /// Starts as an empty vector and is updated when the list of pinned events
39    /// is updated.
40    previous_pinned_event_ids: Mutex<Vec<OwnedEventId>>,
41
42    /// Maximum number of pinned events to load (either from network or the
43    /// cache).
44    max_events_to_load: usize,
45
46    /// Number of requests to load pinned events that can run concurrently. This
47    /// is used to avoid overwhelming a home server with dozens or hundreds
48    /// of concurrent requests.
49    max_concurrent_requests: usize,
50}
51
52impl PinnedEventsLoader {
53    /// Creates a new `PinnedEventsLoader` instance.
54    pub fn new(
55        room: Arc<dyn PinnedEventsRoom>,
56        max_events_to_load: usize,
57        max_concurrent_requests: usize,
58    ) -> Self {
59        Self {
60            room,
61            max_events_to_load,
62            max_concurrent_requests,
63            previous_pinned_event_ids: Mutex::new(Vec::new()),
64        }
65    }
66
67    /// Loads the pinned events in this room, using the cache first and then
68    /// requesting the event from the homeserver if it couldn't be found.
69    /// This method will perform as many concurrent requests for events as
70    /// `max_concurrent_requests` allows, to avoid overwhelming the server.
71    ///
72    /// Returns `None` if the list of pinned events hasn't changed since the
73    /// previous time we loaded them. May return an error if there was an
74    /// issue fetching the full events.
75    pub async fn load_events(&self) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
76        let pinned_event_ids: Vec<OwnedEventId> = self
77            .room
78            .pinned_event_ids()
79            .unwrap_or_default()
80            .into_iter()
81            .rev()
82            .take(self.max_events_to_load)
83            .rev()
84            .collect();
85
86        // Check if the list of pinned events has changed since the last time.
87        if pinned_event_ids == *self.previous_pinned_event_ids.lock().await {
88            return Ok(None);
89        }
90
91        if pinned_event_ids.is_empty() {
92            *self.previous_pinned_event_ids.lock().await = Vec::new();
93            return Ok(Some(Vec::new()));
94        }
95
96        let request_config = Some(RequestConfig::default().retry_limit(3));
97
98        let mut loaded_events: Vec<TimelineEvent> =
99            stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
100                let provider = self.room.clone();
101                let relations_filter =
102                    Some(vec![RelationType::Annotation, RelationType::Replacement]);
103                async move {
104                    match provider
105                        .load_event_with_relations(&event_id, request_config, relations_filter)
106                        .await
107                    {
108                        Ok((event, related_events)) => {
109                            let mut events = vec![event];
110                            events.extend(related_events);
111                            Some(events)
112                        }
113                        Err(err) => {
114                            warn!("error when loading pinned event: {err}");
115                            None
116                        }
117                    }
118                }
119            }))
120            .buffer_unordered(self.max_concurrent_requests)
121            // Get only the `Some<Vec<_>>` results
122            .flat_map(stream::iter)
123            // Flatten the `Vec`s into a single one containing all their items
124            .flat_map(stream::iter)
125            .collect()
126            .await;
127
128        if loaded_events.is_empty() {
129            return Err(PinnedEventsLoaderError::TimelineReloadFailed);
130        }
131
132        // Sort using chronological ordering (oldest -> newest)
133        loaded_events.sort_by_key(|item| {
134            item.raw()
135                .deserialize()
136                .map(|e| e.origin_server_ts())
137                .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
138        });
139
140        // We've successfully loaded *some* pinned events, so we can update the list of
141        // previously seen pinned events.
142        *self.previous_pinned_event_ids.lock().await = pinned_event_ids;
143
144        Ok(Some(loaded_events))
145    }
146}
147
148pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
149    /// Load a single room event using the cache or network and any events
150    /// related to it, if they are cached.
151    ///
152    /// You can control which types of related events are retrieved using
153    /// `related_event_filters`. A `None` value will retrieve any type of
154    /// related event.
155    fn load_event_with_relations<'a>(
156        &'a self,
157        event_id: &'a EventId,
158        request_config: Option<RequestConfig>,
159        related_event_filters: Option<Vec<RelationType>>,
160    ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>>;
161
162    /// Get the pinned event ids for a room.
163    fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>>;
164
165    /// Checks whether an event id is pinned in this room.
166    ///
167    /// It avoids having to clone the whole list of event ids to check a single
168    /// value.
169    fn is_pinned_event(&self, event_id: &EventId) -> bool;
170}
171
172impl PinnedEventsRoom for Room {
173    fn load_event_with_relations<'a>(
174        &'a self,
175        event_id: &'a EventId,
176        request_config: Option<RequestConfig>,
177        related_event_filters: Option<Vec<RelationType>>,
178    ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>> {
179        Box::pin(async move {
180            // First try to load both the event and its relations from the cache.
181            if let Ok((cache, _handles)) = self.event_cache().await
182                && let Some(ret) =
183                    cache.find_event_with_relations(event_id, related_event_filters.clone()).await?
184            {
185                debug!("Loaded pinned event {event_id} and related events from cache");
186                return Ok(ret);
187            }
188
189            // The event was not in the cache. Try loading it from the homeserver.
190            debug!("Loading pinned event {event_id} from HS");
191            let event = self.event(event_id, request_config).await?;
192
193            // Maybe at least some relations are in the cache?
194            if let Ok((cache, _handles)) = self.event_cache().await {
195                let related_events =
196                    cache.find_event_relations(event_id, related_event_filters).await?;
197                if !related_events.is_empty() {
198                    debug!("Loaded relations for pinned event {event_id} from cache");
199                    return Ok((event, related_events));
200                }
201            }
202
203            // No relations were found in the cache. Try loading them from the homeserver.
204            debug!("Loading relations of pinned event {event_id} from HS");
205            let mut related_events = Vec::new();
206            let mut opts = RelationsOptions {
207                include_relations: IncludeRelations::AllRelations,
208                recurse: true,
209                limit: Some(uint!(256)),
210                ..Default::default()
211            };
212
213            loop {
214                let relations = self.relations(event_id.to_owned(), opts.clone()).await?;
215                related_events.extend(relations.chunk);
216                if let Some(next_from) = relations.next_batch_token {
217                    opts.from = Some(next_from);
218                } else {
219                    break;
220                }
221            }
222
223            Ok((event, related_events))
224        })
225    }
226
227    fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>> {
228        self.clone_info().pinned_event_ids()
229    }
230
231    fn is_pinned_event(&self, event_id: &EventId) -> bool {
232        self.clone_info().is_pinned_event(event_id)
233    }
234}
235
236#[cfg(not(tarpaulin_include))]
237impl std::fmt::Debug for PinnedEventsLoader {
238    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
239        f.debug_struct("PinnedEventsLoader")
240            .field("max_events_to_load", &self.max_events_to_load)
241            .finish()
242    }
243}
244
245/// Errors related to `PinnedEventsLoader` usage.
246#[derive(Error, Debug)]
247pub enum PinnedEventsLoaderError {
248    #[error("No event found for the given event id.")]
249    EventNotFound(OwnedEventId),
250
251    #[error("Timeline focus is not pinned events.")]
252    TimelineFocusNotPinnedEvents,
253
254    #[error("Could not load pinned events.")]
255    TimelineReloadFailed,
256}