matrix_sdk/send_queue/progress.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Progress facilities for the media upload system.
16
17use std::ops::Add;
18
19use eyeball::SharedObservable;
20#[cfg(feature = "unstable-msc4274")]
21use matrix_sdk_base::store::AccumulatedSentMediaInfo;
22use matrix_sdk_base::{media::MediaRequestParameters, store::DependentQueuedRequestKind};
23use matrix_sdk_common::executor::spawn;
24use ruma::{events::room::MediaSource, TransactionId};
25use tokio::sync::broadcast;
26use tracing::warn;
27
28use crate::{
29 send_queue::{QueueStorage, RoomSendQueue, RoomSendQueueStorageError, RoomSendQueueUpdate},
30 Room, TransmissionProgress,
31};
32
33/// Progress of an operation in abstract units.
34///
35/// Contrary to [`TransmissionProgress`], this allows tracking the progress
36/// of sending or receiving a payload in estimated pseudo units representing a
37/// percentage. This is helpful in cases where the exact progress in bytes isn't
38/// known, for instance, because encryption (which changes the size) happens on
39/// the fly.
40#[derive(Clone, Copy, Debug, Default)]
41pub struct AbstractProgress {
42 /// How many units were already transferred.
43 pub current: usize,
44 /// How many units there are in total.
45 pub total: usize,
46}
47
48// AbstractProgress can be added together, which adds their components.
49impl Add for AbstractProgress {
50 type Output = Self;
51
52 fn add(self, other: Self) -> Self::Output {
53 Self { current: self.current + other.current, total: self.total + other.total }
54 }
55}
56
57/// Information needed to compute the progress of uploading a media and its
58/// associated thumbnail.
59#[derive(Clone, Copy, Debug)]
60pub(super) struct MediaUploadProgressInfo {
61 /// The index of the uploaded item if this is a gallery upload. Otherwise,
62 /// zero.
63 pub index: u64,
64 /// The total number of bytes in the file currently being uploaded.
65 pub bytes: usize,
66 /// Some offset for the [`AbstractProgress`] computations.
67 ///
68 /// For a file upload, the offsets include the size of the thumbnail as part
69 /// of the already uploaded data and total. For a thumbnail upload, this
70 /// includes the size of the file to be uploaded in the total.
71 pub offsets: AbstractProgress,
72}
73
74impl RoomSendQueue {
75 /// Create metadata required to compute the progress of a media upload.
76 pub(super) async fn create_media_upload_progress_info(
77 own_txn_id: &TransactionId,
78 related_to: &TransactionId,
79 cache_key: &MediaRequestParameters,
80 thumbnail_source: Option<&MediaSource>,
81 #[cfg(feature = "unstable-msc4274")] accumulated: &[AccumulatedSentMediaInfo],
82 room: &Room,
83 queue: &QueueStorage,
84 ) -> MediaUploadProgressInfo {
85 // Determine the item's index, if this is a gallery upload.
86 let index = {
87 cfg_if::cfg_if! {
88 if #[cfg(feature = "unstable-msc4274")] {
89 accumulated.len()
90 } else {
91 0 // Before MSC4274 only a single file (and thumbnail) could be sent per event.
92 }
93 }
94 };
95
96 // Get the size of the file being uploaded from the event cache.
97 let bytes = match room.client().event_cache_store().lock().await {
98 Ok(cache) => match cache.get_media_content(cache_key).await {
99 Ok(Some(content)) => content.len(),
100 Ok(None) => 0,
101 Err(err) => {
102 warn!("error when reading media content from cache store: {err}");
103 0
104 }
105 },
106 Err(err) => {
107 warn!("couldn't acquire cache store lock: {err}");
108 0
109 }
110 };
111
112 let offsets = {
113 // If we're uploading a file, we may have already uploaded a thumbnail; get its
114 // size from the in-memory thumbnail sizes cache. This will account in the
115 // current and total size, for the overall progress of
116 // thumbnail+file.
117 let already_uploaded_thumbnail_bytes = if thumbnail_source.is_some() {
118 queue
119 .thumbnail_file_sizes
120 .lock()
121 .get(related_to)
122 .and_then(|sizes| sizes.get(index))
123 .copied()
124 .flatten()
125 } else {
126 None
127 };
128
129 let already_uploaded_thumbnail_bytes = already_uploaded_thumbnail_bytes.unwrap_or(0);
130
131 // If we're uploading a thumbnail, get the size of the file to be uploaded after
132 // it, from the database. This will account in the total progress of the
133 // file+thumbnail upload (we're currently uploading the thumbnail,
134 // in the first step).
135 let pending_file_bytes =
136 match RoomSendQueue::get_dependent_pending_file_upload_size(own_txn_id, room).await
137 {
138 Ok(maybe_size) => maybe_size.unwrap_or(0),
139 Err(err) => {
140 warn!(
141 "error when getting pending file upload size: {err}; using 0 as fallback"
142 );
143 0
144 }
145 };
146
147 // In nominal cases where the send queue is used correctly, only one of these
148 // two values will be non-zero.
149 AbstractProgress {
150 current: already_uploaded_thumbnail_bytes,
151 total: already_uploaded_thumbnail_bytes + pending_file_bytes,
152 }
153 };
154
155 MediaUploadProgressInfo { index: index as u64, bytes, offsets }
156 }
157
158 /// Determine the size of a pending file upload, if this is a thumbnail
159 /// upload or return 0 otherwise.
160 async fn get_dependent_pending_file_upload_size(
161 txn_id: &TransactionId,
162 room: &Room,
163 ) -> Result<Option<usize>, RoomSendQueueStorageError> {
164 let client = room.client();
165 let dependent_requests =
166 client.state_store().load_dependent_queued_requests(room.room_id()).await?;
167
168 // Try to find a depending request which depends on the target one, and that's a
169 // media upload.
170 let Some((cache_key, parent_is_thumbnail_upload)) =
171 dependent_requests.into_iter().find_map(|r| {
172 if r.parent_transaction_id != txn_id {
173 return None;
174 }
175
176 if let DependentQueuedRequestKind::UploadFileOrThumbnail {
177 cache_key,
178 parent_is_thumbnail_upload,
179 ..
180 } = r.kind
181 {
182 Some((cache_key, parent_is_thumbnail_upload))
183 } else {
184 None
185 }
186 })
187 else {
188 // If there's none, we're done here.
189 return Ok(None);
190 };
191
192 // If this is not a thumbnail upload, we're uploading a gallery and the
193 // dependent request is for the next gallery item.
194 if !parent_is_thumbnail_upload {
195 return Ok(None);
196 }
197
198 let cache_store_guard = client.event_cache_store().lock().await?;
199
200 let maybe_content = cache_store_guard.get_media_content(&cache_key).await?;
201
202 Ok(maybe_content.map(|c| c.len()))
203 }
204
205 /// Create an observable to watch a media's upload progress.
206 pub(super) fn create_media_upload_progress_observable(
207 media_upload_info: &MediaUploadProgressInfo,
208 related_txn_id: &TransactionId,
209 update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
210 ) -> SharedObservable<TransmissionProgress> {
211 let progress: SharedObservable<TransmissionProgress> = Default::default();
212 let mut subscriber = progress.subscribe();
213
214 let related_txn_id = related_txn_id.to_owned();
215 let update_sender = update_sender.clone();
216 let media_upload_info = *media_upload_info;
217
218 // Watch and communicate the progress on a detached background task. Once
219 // the progress observable is dropped, next() will return None and the
220 // task will end.
221 spawn(async move {
222 while let Some(progress) = subscriber.next().await {
223 // Purposefully don't use `send_update` here, because we don't want to notify
224 // the global listeners about an upload progress update.
225 let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
226 related_to: related_txn_id.clone(),
227 file: None,
228 index: media_upload_info.index,
229 progress: estimate_media_upload_progress(progress, media_upload_info.bytes)
230 + media_upload_info.offsets,
231 });
232 }
233 });
234
235 progress
236 }
237}
238
239/// Estimates the upload progress for a single media file (either a thumbnail or
240/// a file).
241///
242/// This proportionally maps the upload progress given in actual bytes sent
243/// (possibly after encryption) into units of the unencrypted file sizes.
244///
245/// # Arguments
246///
247/// * `progress` - The [`TransmissionProgress`] of uploading the file (possibly
248/// after encryption).
249///
250/// * `bytes` - The total number of bytes in the file before encryption.
251fn estimate_media_upload_progress(
252 progress: TransmissionProgress,
253 bytes: usize,
254) -> AbstractProgress {
255 if progress.total == 0 {
256 return AbstractProgress { current: 0, total: 0 };
257 }
258
259 // Did the file finish uploading?
260 if progress.current == progress.total {
261 return AbstractProgress { current: bytes, total: bytes };
262 }
263
264 // The file is still uploading. Use the rule of 3 to proportionally map the
265 // progress into units of the original file size.
266 AbstractProgress {
267 current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize,
268 total: bytes,
269 }
270}