matrix_sdk_ui/timeline/
pinned_events_loader.rs1use std::{fmt::Formatter, sync::Arc};
16
17use futures_util::{StreamExt, stream};
18use matrix_sdk::{
19 BoxFuture, Room, SendOutsideWasm, SyncOutsideWasm,
20 config::RequestConfig,
21 room::{IncludeRelations, RelationsOptions},
22};
23use matrix_sdk_base::deserialized_responses::TimelineEvent;
24use ruma::{
25 EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, events::relation::RelationType, uint,
26};
27use thiserror::Error;
28use tokio::sync::Mutex;
29use tracing::{debug, warn};
30
31pub struct PinnedEventsLoader {
33 room: Arc<dyn PinnedEventsRoom>,
35
36 previous_pinned_event_ids: Mutex<Vec<OwnedEventId>>,
41
42 max_events_to_load: usize,
45
46 max_concurrent_requests: usize,
50}
51
52impl PinnedEventsLoader {
53 pub fn new(
55 room: Arc<dyn PinnedEventsRoom>,
56 max_events_to_load: usize,
57 max_concurrent_requests: usize,
58 ) -> Self {
59 Self {
60 room,
61 max_events_to_load,
62 max_concurrent_requests,
63 previous_pinned_event_ids: Mutex::new(Vec::new()),
64 }
65 }
66
67 pub async fn load_events(&self) -> Result<Option<Vec<TimelineEvent>>, PinnedEventsLoaderError> {
76 let pinned_event_ids: Vec<OwnedEventId> = self
77 .room
78 .pinned_event_ids()
79 .unwrap_or_default()
80 .into_iter()
81 .rev()
82 .take(self.max_events_to_load)
83 .rev()
84 .collect();
85
86 if pinned_event_ids == *self.previous_pinned_event_ids.lock().await {
88 return Ok(None);
89 }
90
91 if pinned_event_ids.is_empty() {
92 *self.previous_pinned_event_ids.lock().await = Vec::new();
93 return Ok(Some(Vec::new()));
94 }
95
96 let request_config = Some(RequestConfig::default().retry_limit(3));
97
98 let mut loaded_events: Vec<TimelineEvent> =
99 stream::iter(pinned_event_ids.clone().into_iter().map(|event_id| {
100 let provider = self.room.clone();
101 let relations_filter =
102 Some(vec![RelationType::Annotation, RelationType::Replacement]);
103 async move {
104 match provider
105 .load_event_with_relations(&event_id, request_config, relations_filter)
106 .await
107 {
108 Ok((event, related_events)) => {
109 let mut events = vec![event];
110 events.extend(related_events);
111 Some(events)
112 }
113 Err(err) => {
114 warn!("error when loading pinned event: {err}");
115 None
116 }
117 }
118 }
119 }))
120 .buffer_unordered(self.max_concurrent_requests)
121 .flat_map(stream::iter)
123 .flat_map(stream::iter)
125 .collect()
126 .await;
127
128 if loaded_events.is_empty() {
129 return Err(PinnedEventsLoaderError::TimelineReloadFailed);
130 }
131
132 loaded_events.sort_by_key(|item| {
134 item.raw()
135 .deserialize()
136 .map(|e| e.origin_server_ts())
137 .unwrap_or_else(|_| MilliSecondsSinceUnixEpoch::now())
138 });
139
140 *self.previous_pinned_event_ids.lock().await = pinned_event_ids;
143
144 Ok(Some(loaded_events))
145 }
146}
147
148pub trait PinnedEventsRoom: SendOutsideWasm + SyncOutsideWasm {
149 fn load_event_with_relations<'a>(
156 &'a self,
157 event_id: &'a EventId,
158 request_config: Option<RequestConfig>,
159 related_event_filters: Option<Vec<RelationType>>,
160 ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>>;
161
162 fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>>;
164
165 fn is_pinned_event(&self, event_id: &EventId) -> bool;
170}
171
172impl PinnedEventsRoom for Room {
173 fn load_event_with_relations<'a>(
174 &'a self,
175 event_id: &'a EventId,
176 request_config: Option<RequestConfig>,
177 related_event_filters: Option<Vec<RelationType>>,
178 ) -> BoxFuture<'a, Result<(TimelineEvent, Vec<TimelineEvent>), matrix_sdk::Error>> {
179 Box::pin(async move {
180 if let Ok((cache, _handles)) = self.event_cache().await
182 && let Some(ret) =
183 cache.find_event_with_relations(event_id, related_event_filters.clone()).await?
184 {
185 debug!("Loaded pinned event {event_id} and related events from cache");
186 return Ok(ret);
187 }
188
189 debug!("Loading pinned event {event_id} from HS");
191 let event = self.event(event_id, request_config).await?;
192
193 if let Ok((cache, _handles)) = self.event_cache().await {
195 let related_events =
196 cache.find_event_relations(event_id, related_event_filters).await?;
197 if !related_events.is_empty() {
198 debug!("Loaded relations for pinned event {event_id} from cache");
199 return Ok((event, related_events));
200 }
201 }
202
203 debug!("Loading relations of pinned event {event_id} from HS");
205 let mut related_events = Vec::new();
206 let mut opts = RelationsOptions {
207 include_relations: IncludeRelations::AllRelations,
208 recurse: true,
209 limit: Some(uint!(256)),
210 ..Default::default()
211 };
212
213 loop {
214 let relations = self.relations(event_id.to_owned(), opts.clone()).await?;
215 related_events.extend(relations.chunk);
216 if let Some(next_from) = relations.next_batch_token {
217 opts.from = Some(next_from);
218 } else {
219 break;
220 }
221 }
222
223 Ok((event, related_events))
224 })
225 }
226
227 fn pinned_event_ids(&self) -> Option<Vec<OwnedEventId>> {
228 self.clone_info().pinned_event_ids()
229 }
230
231 fn is_pinned_event(&self, event_id: &EventId) -> bool {
232 self.clone_info().is_pinned_event(event_id)
233 }
234}
235
236#[cfg(not(tarpaulin_include))]
237impl std::fmt::Debug for PinnedEventsLoader {
238 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
239 f.debug_struct("PinnedEventsLoader")
240 .field("max_events_to_load", &self.max_events_to_load)
241 .finish()
242 }
243}
244
245#[derive(Error, Debug)]
247pub enum PinnedEventsLoaderError {
248 #[error("No event found for the given event id.")]
249 EventNotFound(OwnedEventId),
250
251 #[error("Timeline focus is not pinned events.")]
252 TimelineFocusNotPinnedEvents,
253
254 #[error("Could not load pinned events.")]
255 TimelineReloadFailed,
256}