matrix_sdk/send_queue/
progress.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Progress facilities for the media upload system.
16
17use std::ops::Add;
18
19use eyeball::SharedObservable;
20#[cfg(feature = "unstable-msc4274")]
21use matrix_sdk_base::store::AccumulatedSentMediaInfo;
22use matrix_sdk_base::{media::MediaRequestParameters, store::DependentQueuedRequestKind};
23use matrix_sdk_common::executor::spawn;
24use ruma::{events::room::MediaSource, TransactionId};
25use tokio::sync::broadcast;
26use tracing::warn;
27
28use crate::{
29    send_queue::{QueueStorage, RoomSendQueue, RoomSendQueueStorageError, RoomSendQueueUpdate},
30    Room, TransmissionProgress,
31};
32
33/// Progress of an operation in abstract units.
34///
35/// Contrary to [`TransmissionProgress`], this allows tracking the progress
36/// of sending or receiving a payload in estimated pseudo units representing a
37/// percentage. This is helpful in cases where the exact progress in bytes isn't
38/// known, for instance, because encryption (which changes the size) happens on
39/// the fly.
40#[derive(Clone, Copy, Debug, Default)]
41pub struct AbstractProgress {
42    /// How many units were already transferred.
43    pub current: usize,
44    /// How many units there are in total.
45    pub total: usize,
46}
47
48// AbstractProgress can be added together, which adds their components.
49impl Add for AbstractProgress {
50    type Output = Self;
51
52    fn add(self, other: Self) -> Self::Output {
53        Self { current: self.current + other.current, total: self.total + other.total }
54    }
55}
56
57/// Information needed to compute the progress of uploading a media and its
58/// associated thumbnail.
59#[derive(Clone, Copy, Debug)]
60pub(super) struct MediaUploadProgressInfo {
61    /// The index of the uploaded item if this is a gallery upload. Otherwise,
62    /// zero.
63    pub index: u64,
64    /// The total number of bytes in the file currently being uploaded.
65    pub bytes: usize,
66    /// Some offset for the [`AbstractProgress`] computations.
67    ///
68    /// For a file upload, the offsets include the size of the thumbnail as part
69    /// of the already uploaded data and total. For a thumbnail upload, this
70    /// includes the size of the file to be uploaded in the total.
71    pub offsets: AbstractProgress,
72}
73
74impl RoomSendQueue {
75    /// Create metadata required to compute the progress of a media upload.
76    pub(super) async fn create_media_upload_progress_info(
77        own_txn_id: &TransactionId,
78        related_to: &TransactionId,
79        cache_key: &MediaRequestParameters,
80        thumbnail_source: Option<&MediaSource>,
81        #[cfg(feature = "unstable-msc4274")] accumulated: &[AccumulatedSentMediaInfo],
82        room: &Room,
83        queue: &QueueStorage,
84    ) -> MediaUploadProgressInfo {
85        // Determine the item's index, if this is a gallery upload.
86        let index = {
87            cfg_if::cfg_if! {
88                if #[cfg(feature = "unstable-msc4274")] {
89                    accumulated.len()
90                } else {
91                    0 // Before MSC4274 only a single file (and thumbnail) could be sent per event.
92                }
93            }
94        };
95
96        // Get the size of the file being uploaded from the event cache.
97        let bytes = match room.client().event_cache_store().lock().await {
98            Ok(cache) => match cache.get_media_content(cache_key).await {
99                Ok(Some(content)) => content.len(),
100                Ok(None) => 0,
101                Err(err) => {
102                    warn!("error when reading media content from cache store: {err}");
103                    0
104                }
105            },
106            Err(err) => {
107                warn!("couldn't acquire cache store lock: {err}");
108                0
109            }
110        };
111
112        let offsets = {
113            // If we're uploading a file, we may have already uploaded a thumbnail; get its
114            // size from the in-memory thumbnail sizes cache. This will account in the
115            // current and total size, for the overall progress of
116            // thumbnail+file.
117            let already_uploaded_thumbnail_bytes = if thumbnail_source.is_some() {
118                queue
119                    .thumbnail_file_sizes
120                    .lock()
121                    .get(related_to)
122                    .and_then(|sizes| sizes.get(index))
123                    .copied()
124                    .flatten()
125            } else {
126                None
127            };
128
129            let already_uploaded_thumbnail_bytes = already_uploaded_thumbnail_bytes.unwrap_or(0);
130
131            // If we're uploading a thumbnail, get the size of the file to be uploaded after
132            // it, from the database. This will account in the total progress of the
133            // file+thumbnail upload (we're currently uploading the thumbnail,
134            // in the first step).
135            let pending_file_bytes =
136                match RoomSendQueue::get_dependent_pending_file_upload_size(own_txn_id, room).await
137                {
138                    Ok(maybe_size) => maybe_size.unwrap_or(0),
139                    Err(err) => {
140                        warn!(
141                        "error when getting pending file upload size: {err}; using 0 as fallback"
142                    );
143                        0
144                    }
145                };
146
147            // In nominal cases where the send queue is used correctly, only one of these
148            // two values will be non-zero.
149            AbstractProgress {
150                current: already_uploaded_thumbnail_bytes,
151                total: already_uploaded_thumbnail_bytes + pending_file_bytes,
152            }
153        };
154
155        MediaUploadProgressInfo { index: index as u64, bytes, offsets }
156    }
157
158    /// Determine the size of a pending file upload, if this is a thumbnail
159    /// upload or return 0 otherwise.
160    async fn get_dependent_pending_file_upload_size(
161        txn_id: &TransactionId,
162        room: &Room,
163    ) -> Result<Option<usize>, RoomSendQueueStorageError> {
164        let client = room.client();
165        let dependent_requests =
166            client.state_store().load_dependent_queued_requests(room.room_id()).await?;
167
168        // Try to find a depending request which depends on the target one, and that's a
169        // media upload.
170        let Some((cache_key, parent_is_thumbnail_upload)) =
171            dependent_requests.into_iter().find_map(|r| {
172                if r.parent_transaction_id != txn_id {
173                    return None;
174                }
175
176                if let DependentQueuedRequestKind::UploadFileOrThumbnail {
177                    cache_key,
178                    parent_is_thumbnail_upload,
179                    ..
180                } = r.kind
181                {
182                    Some((cache_key, parent_is_thumbnail_upload))
183                } else {
184                    None
185                }
186            })
187        else {
188            // If there's none, we're done here.
189            return Ok(None);
190        };
191
192        // If this is not a thumbnail upload, we're uploading a gallery and the
193        // dependent request is for the next gallery item.
194        if !parent_is_thumbnail_upload {
195            return Ok(None);
196        }
197
198        let cache_store_guard = client.event_cache_store().lock().await?;
199
200        let maybe_content = cache_store_guard.get_media_content(&cache_key).await?;
201
202        Ok(maybe_content.map(|c| c.len()))
203    }
204
205    /// Create an observable to watch a media's upload progress.
206    pub(super) fn create_media_upload_progress_observable(
207        media_upload_info: &MediaUploadProgressInfo,
208        related_txn_id: &TransactionId,
209        update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
210    ) -> SharedObservable<TransmissionProgress> {
211        let progress: SharedObservable<TransmissionProgress> = Default::default();
212        let mut subscriber = progress.subscribe();
213
214        let related_txn_id = related_txn_id.to_owned();
215        let update_sender = update_sender.clone();
216        let media_upload_info = *media_upload_info;
217
218        // Watch and communicate the progress on a detached background task. Once
219        // the progress observable is dropped, next() will return None and the
220        // task will end.
221        spawn(async move {
222            while let Some(progress) = subscriber.next().await {
223                // Purposefully don't use `send_update` here, because we don't want to notify
224                // the global listeners about an upload progress update.
225                let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
226                    related_to: related_txn_id.clone(),
227                    file: None,
228                    index: media_upload_info.index,
229                    progress: estimate_media_upload_progress(progress, media_upload_info.bytes)
230                        + media_upload_info.offsets,
231                });
232            }
233        });
234
235        progress
236    }
237}
238
239/// Estimates the upload progress for a single media file (either a thumbnail or
240/// a file).
241///
242/// This proportionally maps the upload progress given in actual bytes sent
243/// (possibly after encryption) into units of the unencrypted file sizes.
244///
245/// # Arguments
246///
247/// * `progress` - The [`TransmissionProgress`] of uploading the file (possibly
248///   after encryption).
249///
250/// * `bytes` - The total number of bytes in the file before encryption.
251fn estimate_media_upload_progress(
252    progress: TransmissionProgress,
253    bytes: usize,
254) -> AbstractProgress {
255    if progress.total == 0 {
256        return AbstractProgress { current: 0, total: 0 };
257    }
258
259    // Did the file finish uploading?
260    if progress.current == progress.total {
261        return AbstractProgress { current: bytes, total: bytes };
262    }
263
264    // The file is still uploading. Use the rule of 3 to proportionally map the
265    // progress into units of the original file size.
266    AbstractProgress {
267        current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize,
268        total: bytes,
269    }
270}