matrix_sdk/send_queue/progress.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Progress facilities for the media upload system.
16
17use std::ops::Add;
18
19use eyeball::SharedObservable;
20#[cfg(feature = "unstable-msc4274")]
21use matrix_sdk_base::store::AccumulatedSentMediaInfo;
22use matrix_sdk_base::{media::MediaRequestParameters, store::DependentQueuedRequestKind};
23use matrix_sdk_common::executor::spawn;
24use ruma::{TransactionId, events::room::MediaSource};
25use tokio::sync::broadcast;
26use tracing::warn;
27
28use crate::{
29 Room, TransmissionProgress,
30 send_queue::{QueueStorage, RoomSendQueue, RoomSendQueueStorageError, RoomSendQueueUpdate},
31};
32
33/// Progress of an operation in abstract units.
34///
35/// Contrary to [`TransmissionProgress`], this allows tracking the progress
36/// of sending or receiving a payload in estimated pseudo units representing a
37/// percentage. This is helpful in cases where the exact progress in bytes isn't
38/// known, for instance, because encryption (which changes the size) happens on
39/// the fly.
40#[derive(Clone, Copy, Debug, Default)]
41pub struct AbstractProgress {
42 /// How many units were already transferred.
43 pub current: usize,
44 /// How many units there are in total.
45 pub total: usize,
46}
47
48// AbstractProgress can be added together, which adds their components.
49impl Add for AbstractProgress {
50 type Output = Self;
51
52 fn add(self, other: Self) -> Self::Output {
53 Self { current: self.current + other.current, total: self.total + other.total }
54 }
55}
56
57/// Information needed to compute the progress of uploading a media and its
58/// associated thumbnail.
59#[derive(Clone, Copy, Debug)]
60pub(super) struct MediaUploadProgressInfo {
61 /// The index of the uploaded item if this is a gallery upload. Otherwise,
62 /// zero.
63 pub index: u64,
64 /// The total number of bytes in the file currently being uploaded.
65 pub bytes: usize,
66 /// Some offset for the [`AbstractProgress`] computations.
67 ///
68 /// For a file upload, the offsets include the size of the thumbnail as part
69 /// of the already uploaded data and total. For a thumbnail upload, this
70 /// includes the size of the file to be uploaded in the total.
71 pub offsets: AbstractProgress,
72}
73
74impl RoomSendQueue {
75 /// Create metadata required to compute the progress of a media upload.
76 pub(super) async fn create_media_upload_progress_info(
77 own_txn_id: &TransactionId,
78 related_to: &TransactionId,
79 cache_key: &MediaRequestParameters,
80 thumbnail_source: Option<&MediaSource>,
81 #[cfg(feature = "unstable-msc4274")] accumulated: &[AccumulatedSentMediaInfo],
82 room: &Room,
83 queue: &QueueStorage,
84 ) -> MediaUploadProgressInfo {
85 // Determine the item's index, if this is a gallery upload.
86 let index = {
87 cfg_if::cfg_if! {
88 if #[cfg(feature = "unstable-msc4274")] {
89 accumulated.len()
90 } else {
91 0 // Before MSC4274 only a single file (and thumbnail) could be sent per event.
92 }
93 }
94 };
95
96 // Get the size of the file being uploaded from the event cache.
97 let bytes = match room.client().media_store().lock().await {
98 Ok(cache) => match cache.get_media_content(cache_key).await {
99 Ok(Some(content)) => content.len(),
100 Ok(None) => 0,
101 Err(err) => {
102 warn!("error when reading media content from media store: {err}");
103 0
104 }
105 },
106 Err(err) => {
107 warn!("couldn't acquire media store lock: {err}");
108 0
109 }
110 };
111
112 let offsets = {
113 // If we're uploading a file, we may have already uploaded a thumbnail; get its
114 // size from the in-memory thumbnail sizes cache. This will account in the
115 // current and total size, for the overall progress of
116 // thumbnail+file.
117 let already_uploaded_thumbnail_bytes = if thumbnail_source.is_some() {
118 queue
119 .thumbnail_file_sizes
120 .lock()
121 .get(related_to)
122 .and_then(|sizes| sizes.get(index))
123 .copied()
124 .flatten()
125 } else {
126 None
127 };
128
129 let already_uploaded_thumbnail_bytes = already_uploaded_thumbnail_bytes.unwrap_or(0);
130
131 // If we're uploading a thumbnail, get the size of the file to be uploaded after
132 // it, from the database. This will account in the total progress of the
133 // file+thumbnail upload (we're currently uploading the thumbnail,
134 // in the first step).
135 let pending_file_bytes = match RoomSendQueue::get_dependent_pending_file_upload_size(
136 own_txn_id, room,
137 )
138 .await
139 {
140 Ok(maybe_size) => maybe_size.unwrap_or(0),
141 Err(err) => {
142 warn!(
143 "error when getting pending file upload size: {err}; using 0 as fallback"
144 );
145 0
146 }
147 };
148
149 // In nominal cases where the send queue is used correctly, only one of these
150 // two values will be non-zero.
151 AbstractProgress {
152 current: already_uploaded_thumbnail_bytes,
153 total: already_uploaded_thumbnail_bytes + pending_file_bytes,
154 }
155 };
156
157 MediaUploadProgressInfo { index: index as u64, bytes, offsets }
158 }
159
160 /// Determine the size of a pending file upload, if this is a thumbnail
161 /// upload or return 0 otherwise.
162 async fn get_dependent_pending_file_upload_size(
163 txn_id: &TransactionId,
164 room: &Room,
165 ) -> Result<Option<usize>, RoomSendQueueStorageError> {
166 let client = room.client();
167 let dependent_requests =
168 client.state_store().load_dependent_queued_requests(room.room_id()).await?;
169
170 // Try to find a depending request which depends on the target one, and that's a
171 // media upload.
172 let Some((cache_key, parent_is_thumbnail_upload)) =
173 dependent_requests.into_iter().find_map(|r| {
174 if r.parent_transaction_id != txn_id {
175 return None;
176 }
177
178 if let DependentQueuedRequestKind::UploadFileOrThumbnail {
179 cache_key,
180 parent_is_thumbnail_upload,
181 ..
182 } = r.kind
183 {
184 Some((cache_key, parent_is_thumbnail_upload))
185 } else {
186 None
187 }
188 })
189 else {
190 // If there's none, we're done here.
191 return Ok(None);
192 };
193
194 // If this is not a thumbnail upload, we're uploading a gallery and the
195 // dependent request is for the next gallery item.
196 if !parent_is_thumbnail_upload {
197 return Ok(None);
198 }
199
200 let media_store_guard = client.media_store().lock().await?;
201
202 let maybe_content = media_store_guard.get_media_content(&cache_key).await?;
203
204 Ok(maybe_content.map(|c| c.len()))
205 }
206
207 /// Create an observable to watch a media's upload progress.
208 pub(super) fn create_media_upload_progress_observable(
209 media_upload_info: &MediaUploadProgressInfo,
210 related_txn_id: &TransactionId,
211 update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
212 ) -> SharedObservable<TransmissionProgress> {
213 let progress: SharedObservable<TransmissionProgress> = Default::default();
214 let mut subscriber = progress.subscribe();
215
216 let related_txn_id = related_txn_id.to_owned();
217 let update_sender = update_sender.clone();
218 let media_upload_info = *media_upload_info;
219
220 // Watch and communicate the progress on a detached background task. Once
221 // the progress observable is dropped, next() will return None and the
222 // task will end.
223 spawn(async move {
224 while let Some(progress) = subscriber.next().await {
225 // Purposefully don't use `send_update` here, because we don't want to notify
226 // the global listeners about an upload progress update.
227 let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
228 related_to: related_txn_id.clone(),
229 file: None,
230 index: media_upload_info.index,
231 progress: estimate_media_upload_progress(progress, media_upload_info.bytes)
232 + media_upload_info.offsets,
233 });
234 }
235 });
236
237 progress
238 }
239}
240
241/// Estimates the upload progress for a single media file (either a thumbnail or
242/// a file).
243///
244/// This proportionally maps the upload progress given in actual bytes sent
245/// (possibly after encryption) into units of the unencrypted file sizes.
246///
247/// # Arguments
248///
249/// * `progress` - The [`TransmissionProgress`] of uploading the file (possibly
250/// after encryption).
251///
252/// * `bytes` - The total number of bytes in the file before encryption.
253fn estimate_media_upload_progress(
254 progress: TransmissionProgress,
255 bytes: usize,
256) -> AbstractProgress {
257 if progress.total == 0 {
258 return AbstractProgress { current: 0, total: 0 };
259 }
260
261 // Did the file finish uploading?
262 if progress.current == progress.total {
263 return AbstractProgress { current: bytes, total: bytes };
264 }
265
266 // The file is still uploading. Use the rule of 3 to proportionally map the
267 // progress into units of the original file size.
268 AbstractProgress {
269 current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize,
270 total: bytes,
271 }
272}