matrix_sdk/send_queue/
progress.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Progress facilities for the media upload system.
16
17use std::ops::Add;
18
19use eyeball::SharedObservable;
20#[cfg(feature = "unstable-msc4274")]
21use matrix_sdk_base::store::AccumulatedSentMediaInfo;
22use matrix_sdk_base::{media::MediaRequestParameters, store::DependentQueuedRequestKind};
23use matrix_sdk_common::executor::spawn;
24use ruma::{TransactionId, events::room::MediaSource};
25use tokio::sync::broadcast;
26use tracing::warn;
27
28use crate::{
29    Room, TransmissionProgress,
30    send_queue::{QueueStorage, RoomSendQueue, RoomSendQueueStorageError, RoomSendQueueUpdate},
31};
32
33/// Progress of an operation in abstract units.
34///
35/// Contrary to [`TransmissionProgress`], this allows tracking the progress
36/// of sending or receiving a payload in estimated pseudo units representing a
37/// percentage. This is helpful in cases where the exact progress in bytes isn't
38/// known, for instance, because encryption (which changes the size) happens on
39/// the fly.
40#[derive(Clone, Copy, Debug, Default)]
41pub struct AbstractProgress {
42    /// How many units were already transferred.
43    pub current: usize,
44    /// How many units there are in total.
45    pub total: usize,
46}
47
48// AbstractProgress can be added together, which adds their components.
49impl Add for AbstractProgress {
50    type Output = Self;
51
52    fn add(self, other: Self) -> Self::Output {
53        Self { current: self.current + other.current, total: self.total + other.total }
54    }
55}
56
57/// Information needed to compute the progress of uploading a media and its
58/// associated thumbnail.
59#[derive(Clone, Copy, Debug)]
60pub(super) struct MediaUploadProgressInfo {
61    /// The index of the uploaded item if this is a gallery upload. Otherwise,
62    /// zero.
63    pub index: u64,
64    /// The total number of bytes in the file currently being uploaded.
65    pub bytes: usize,
66    /// Some offset for the [`AbstractProgress`] computations.
67    ///
68    /// For a file upload, the offsets include the size of the thumbnail as part
69    /// of the already uploaded data and total. For a thumbnail upload, this
70    /// includes the size of the file to be uploaded in the total.
71    pub offsets: AbstractProgress,
72}
73
74impl RoomSendQueue {
75    /// Create metadata required to compute the progress of a media upload.
76    pub(super) async fn create_media_upload_progress_info(
77        own_txn_id: &TransactionId,
78        related_to: &TransactionId,
79        cache_key: &MediaRequestParameters,
80        thumbnail_source: Option<&MediaSource>,
81        #[cfg(feature = "unstable-msc4274")] accumulated: &[AccumulatedSentMediaInfo],
82        room: &Room,
83        queue: &QueueStorage,
84    ) -> MediaUploadProgressInfo {
85        // Determine the item's index, if this is a gallery upload.
86        let index = {
87            cfg_if::cfg_if! {
88                if #[cfg(feature = "unstable-msc4274")] {
89                    accumulated.len()
90                } else {
91                    0 // Before MSC4274 only a single file (and thumbnail) could be sent per event.
92                }
93            }
94        };
95
96        // Get the size of the file being uploaded from the event cache.
97        let bytes = match room.client().media_store().lock().await {
98            Ok(cache) => match cache.get_media_content(cache_key).await {
99                Ok(Some(content)) => content.len(),
100                Ok(None) => 0,
101                Err(err) => {
102                    warn!("error when reading media content from media store: {err}");
103                    0
104                }
105            },
106            Err(err) => {
107                warn!("couldn't acquire media store lock: {err}");
108                0
109            }
110        };
111
112        let offsets = {
113            // If we're uploading a file, we may have already uploaded a thumbnail; get its
114            // size from the in-memory thumbnail sizes cache. This will account in the
115            // current and total size, for the overall progress of
116            // thumbnail+file.
117            let already_uploaded_thumbnail_bytes = if thumbnail_source.is_some() {
118                queue
119                    .thumbnail_file_sizes
120                    .lock()
121                    .get(related_to)
122                    .and_then(|sizes| sizes.get(index))
123                    .copied()
124                    .flatten()
125            } else {
126                None
127            };
128
129            let already_uploaded_thumbnail_bytes = already_uploaded_thumbnail_bytes.unwrap_or(0);
130
131            // If we're uploading a thumbnail, get the size of the file to be uploaded after
132            // it, from the database. This will account in the total progress of the
133            // file+thumbnail upload (we're currently uploading the thumbnail,
134            // in the first step).
135            let pending_file_bytes = match RoomSendQueue::get_dependent_pending_file_upload_size(
136                own_txn_id, room,
137            )
138            .await
139            {
140                Ok(maybe_size) => maybe_size.unwrap_or(0),
141                Err(err) => {
142                    warn!(
143                        "error when getting pending file upload size: {err}; using 0 as fallback"
144                    );
145                    0
146                }
147            };
148
149            // In nominal cases where the send queue is used correctly, only one of these
150            // two values will be non-zero.
151            AbstractProgress {
152                current: already_uploaded_thumbnail_bytes,
153                total: already_uploaded_thumbnail_bytes + pending_file_bytes,
154            }
155        };
156
157        MediaUploadProgressInfo { index: index as u64, bytes, offsets }
158    }
159
160    /// Determine the size of a pending file upload, if this is a thumbnail
161    /// upload or return 0 otherwise.
162    async fn get_dependent_pending_file_upload_size(
163        txn_id: &TransactionId,
164        room: &Room,
165    ) -> Result<Option<usize>, RoomSendQueueStorageError> {
166        let client = room.client();
167        let dependent_requests =
168            client.state_store().load_dependent_queued_requests(room.room_id()).await?;
169
170        // Try to find a depending request which depends on the target one, and that's a
171        // media upload.
172        let Some((cache_key, parent_is_thumbnail_upload)) =
173            dependent_requests.into_iter().find_map(|r| {
174                if r.parent_transaction_id != txn_id {
175                    return None;
176                }
177
178                if let DependentQueuedRequestKind::UploadFileOrThumbnail {
179                    cache_key,
180                    parent_is_thumbnail_upload,
181                    ..
182                } = r.kind
183                {
184                    Some((cache_key, parent_is_thumbnail_upload))
185                } else {
186                    None
187                }
188            })
189        else {
190            // If there's none, we're done here.
191            return Ok(None);
192        };
193
194        // If this is not a thumbnail upload, we're uploading a gallery and the
195        // dependent request is for the next gallery item.
196        if !parent_is_thumbnail_upload {
197            return Ok(None);
198        }
199
200        let media_store_guard = client.media_store().lock().await?;
201
202        let maybe_content = media_store_guard.get_media_content(&cache_key).await?;
203
204        Ok(maybe_content.map(|c| c.len()))
205    }
206
207    /// Create an observable to watch a media's upload progress.
208    pub(super) fn create_media_upload_progress_observable(
209        media_upload_info: &MediaUploadProgressInfo,
210        related_txn_id: &TransactionId,
211        update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
212    ) -> SharedObservable<TransmissionProgress> {
213        let progress: SharedObservable<TransmissionProgress> = Default::default();
214        let mut subscriber = progress.subscribe();
215
216        let related_txn_id = related_txn_id.to_owned();
217        let update_sender = update_sender.clone();
218        let media_upload_info = *media_upload_info;
219
220        // Watch and communicate the progress on a detached background task. Once
221        // the progress observable is dropped, next() will return None and the
222        // task will end.
223        spawn(async move {
224            while let Some(progress) = subscriber.next().await {
225                // Purposefully don't use `send_update` here, because we don't want to notify
226                // the global listeners about an upload progress update.
227                let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
228                    related_to: related_txn_id.clone(),
229                    file: None,
230                    index: media_upload_info.index,
231                    progress: estimate_media_upload_progress(progress, media_upload_info.bytes)
232                        + media_upload_info.offsets,
233                });
234            }
235        });
236
237        progress
238    }
239}
240
241/// Estimates the upload progress for a single media file (either a thumbnail or
242/// a file).
243///
244/// This proportionally maps the upload progress given in actual bytes sent
245/// (possibly after encryption) into units of the unencrypted file sizes.
246///
247/// # Arguments
248///
249/// * `progress` - The [`TransmissionProgress`] of uploading the file (possibly
250///   after encryption).
251///
252/// * `bytes` - The total number of bytes in the file before encryption.
253fn estimate_media_upload_progress(
254    progress: TransmissionProgress,
255    bytes: usize,
256) -> AbstractProgress {
257    if progress.total == 0 {
258        return AbstractProgress { current: 0, total: 0 };
259    }
260
261    // Did the file finish uploading?
262    if progress.current == progress.total {
263        return AbstractProgress { current: bytes, total: bytes };
264    }
265
266    // The file is still uploading. Use the rule of 3 to proportionally map the
267    // progress into units of the original file size.
268    AbstractProgress {
269        current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize,
270        total: bytes,
271    }
272}