matrix_sdk/send_queue/
upload.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Private implementations of the media upload mechanism.
16
17use matrix_sdk_base::{
18    event_cache::store::media::IgnoreMediaRetentionPolicy,
19    media::{MediaFormat, MediaRequestParameters},
20    store::{
21        ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
22        QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
23    },
24    RoomState,
25};
26use mime::Mime;
27use ruma::{
28    events::{
29        room::message::{FormattedBody, MessageType, RoomMessageEventContent},
30        AnyMessageLikeEventContent, Mentions,
31    },
32    MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
33};
34use tracing::{debug, error, instrument, trace, warn, Span};
35
36use super::{QueueStorage, RoomSendQueue, RoomSendQueueError};
37use crate::{
38    attachment::AttachmentConfig,
39    room::edit::update_media_caption,
40    send_queue::{
41        LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
42        SendHandle,
43    },
44    Client, Media, Room,
45};
46
47/// Replace the source by the final ones in all the media types handled by
48/// [`Room::make_attachment_type()`].
49fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
50    // Some variants look really similar below, but the `event` and `info` are all
51    // different types…
52    match &mut echo.msgtype {
53        MessageType::Audio(event) => {
54            event.source = sent.file;
55        }
56        MessageType::File(event) => {
57            event.source = sent.file;
58            if let Some(info) = event.info.as_mut() {
59                info.thumbnail_source = sent.thumbnail;
60            }
61        }
62        MessageType::Image(event) => {
63            event.source = sent.file;
64            if let Some(info) = event.info.as_mut() {
65                info.thumbnail_source = sent.thumbnail;
66            }
67        }
68        MessageType::Video(event) => {
69            event.source = sent.file;
70            if let Some(info) = event.info.as_mut() {
71                info.thumbnail_source = sent.thumbnail;
72            }
73        }
74
75        _ => {
76            // All `MessageType` created by `Room::make_attachment_type` should be
77            // handled here. The only way to end up here is that a message type has
78            // been tampered with in the database.
79            error!("Invalid message type in database: {}", echo.msgtype());
80            // Only crash debug builds.
81            debug_assert!(false, "invalid message type in database");
82        }
83    }
84}
85
86impl RoomSendQueue {
87    /// Queues an attachment to be sent to the room, using the send queue.
88    ///
89    /// This returns quickly (without sending or uploading anything), and will
90    /// push the event to be sent into a queue, handled in the background.
91    ///
92    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
93    /// the [`Self::subscribe()`] method to get updates about the sending of
94    /// that event.
95    ///
96    /// By default, if sending failed on the first attempt, it will be retried a
97    /// few times. If sending failed after those retries, the entire
98    /// client's sending queue will be disabled, and it will need to be
99    /// manually re-enabled by the caller (e.g. after network is back, or when
100    /// something has been done about the faulty requests).
101    ///
102    /// The attachment and its optional thumbnail are stored in the media cache
103    /// and can be retrieved at any time, by calling
104    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
105    /// in the local or remote echo, and using a `MediaFormat::File`.
106    #[instrument(skip_all, fields(event_txn))]
107    pub async fn send_attachment(
108        &self,
109        filename: impl Into<String>,
110        content_type: Mime,
111        data: Vec<u8>,
112        mut config: AttachmentConfig,
113    ) -> Result<SendHandle, RoomSendQueueError> {
114        let Some(room) = self.inner.room.get() else {
115            return Err(RoomSendQueueError::RoomDisappeared);
116        };
117
118        if room.state() != RoomState::Joined {
119            return Err(RoomSendQueueError::RoomNotJoined);
120        }
121
122        let filename = filename.into();
123        let upload_file_txn = TransactionId::new();
124        let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
125
126        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
127        debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
128
129        let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
130
131        let (upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info) = {
132            let client = room.client();
133            let cache_store = client
134                .event_cache_store()
135                .lock()
136                .await
137                .map_err(RoomSendQueueStorageError::LockError)?;
138
139            // Cache the file itself in the cache store.
140            cache_store
141                .add_media_content(
142                    &file_media_request,
143                    data.clone(),
144                    // Make sure that the file is stored until it has been uploaded.
145                    IgnoreMediaRetentionPolicy::Yes,
146                )
147                .await
148                .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
149
150            // Process the thumbnail, if it's been provided.
151            if let Some(thumbnail) = config.thumbnail.take() {
152                let txn = TransactionId::new();
153                trace!(upload_thumbnail_txn = %txn, "attachment has a thumbnail");
154
155                // Create the information required for filling the thumbnail section of the
156                // media event.
157                let (data, content_type, thumbnail_info) = thumbnail.into_parts();
158
159                // Cache thumbnail in the cache store.
160                let thumbnail_media_request = Media::make_local_file_media_request(&txn);
161                cache_store
162                    .add_media_content(
163                        &thumbnail_media_request,
164                        data,
165                        // Make sure that the thumbnail is stored until it has been uploaded.
166                        IgnoreMediaRetentionPolicy::Yes,
167                    )
168                    .await
169                    .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
170
171                (
172                    Some(txn.clone()),
173                    Some((thumbnail_media_request.source.clone(), thumbnail_info)),
174                    Some((
175                        FinishUploadThumbnailInfo { txn, width: None, height: None },
176                        thumbnail_media_request,
177                        content_type,
178                    )),
179                )
180            } else {
181                Default::default()
182            }
183        };
184
185        // Create the content for the media event.
186        let event_content = Room::make_attachment_event(
187            room.make_attachment_type(
188                &content_type,
189                filename,
190                file_media_request.source.clone(),
191                config.caption,
192                config.formatted_caption,
193                config.info,
194                event_thumbnail_info,
195            ),
196            config.mentions,
197        );
198
199        let created_at = MilliSecondsSinceUnixEpoch::now();
200
201        // Save requests in the queue storage.
202        self.inner
203            .queue
204            .push_media(
205                event_content.clone(),
206                content_type,
207                send_event_txn.clone().into(),
208                created_at,
209                upload_file_txn.clone(),
210                file_media_request,
211                queue_thumbnail_info,
212            )
213            .await?;
214
215        trace!("manager sends a media to the background task");
216
217        self.inner.notifier.notify_one();
218
219        let send_handle = SendHandle {
220            room: self.clone(),
221            transaction_id: send_event_txn.clone().into(),
222            media_handles: Some(MediaHandles { upload_thumbnail_txn, upload_file_txn }),
223            created_at,
224        };
225
226        let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
227            transaction_id: send_event_txn.clone().into(),
228            content: LocalEchoContent::Event {
229                serialized_event: SerializableEventContent::new(&event_content.into())
230                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
231                send_handle: send_handle.clone(),
232                send_error: None,
233            },
234        }));
235
236        Ok(send_handle)
237    }
238}
239
240impl QueueStorage {
241    /// Consumes a finished upload and queues sending of the final media event.
242    #[allow(clippy::too_many_arguments)]
243    pub(super) async fn handle_dependent_finish_upload(
244        &self,
245        client: &Client,
246        event_txn: OwnedTransactionId,
247        parent_key: SentRequestKey,
248        mut local_echo: RoomMessageEventContent,
249        file_upload_txn: OwnedTransactionId,
250        thumbnail_info: Option<FinishUploadThumbnailInfo>,
251        new_updates: &mut Vec<RoomSendQueueUpdate>,
252    ) -> Result<(), RoomSendQueueError> {
253        // Both uploads are ready: enqueue the event with its final data.
254        let sent_media = parent_key
255            .into_media()
256            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
257
258        // Update cache keys in the cache store.
259        {
260            // Do it for the file itself.
261            let from_req = Media::make_local_file_media_request(&file_upload_txn);
262
263            trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
264            let cache_store = client
265                .event_cache_store()
266                .lock()
267                .await
268                .map_err(RoomSendQueueStorageError::LockError)?;
269
270            // The media can now be removed during cleanups.
271            cache_store
272                .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
273                .await
274                .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
275
276            cache_store
277                .replace_media_key(
278                    &from_req,
279                    &MediaRequestParameters {
280                        source: sent_media.file.clone(),
281                        format: MediaFormat::File,
282                    },
283                )
284                .await
285                .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
286
287            // Rename the thumbnail too, if needs be.
288            if let Some((info, new_source)) =
289                thumbnail_info.as_ref().zip(sent_media.thumbnail.clone())
290            {
291                // Previously the media request used `MediaFormat::Thumbnail`. Handle this case
292                // for send queue requests that were in the state store before the change.
293                let from_req = if let Some((height, width)) = info.height.zip(info.width) {
294                    Media::make_local_thumbnail_media_request(&info.txn, height, width)
295                } else {
296                    Media::make_local_file_media_request(&info.txn)
297                };
298
299                trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
300
301                // The media can now be removed during cleanups.
302                cache_store
303                    .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
304                    .await
305                    .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
306
307                cache_store
308                    .replace_media_key(
309                        &from_req,
310                        &MediaRequestParameters { source: new_source, format: MediaFormat::File },
311                    )
312                    .await
313                    .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
314            }
315        }
316
317        update_media_event_after_upload(&mut local_echo, sent_media);
318
319        let new_content = SerializableEventContent::new(&local_echo.into())
320            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
321
322        // Indicates observers that the upload finished, by editing the local echo for
323        // the event into its final form before sending.
324        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
325            transaction_id: event_txn.clone(),
326            new_content: new_content.clone(),
327        });
328
329        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
330
331        client
332            .store()
333            .save_send_queue_request(
334                &self.room_id,
335                event_txn,
336                MilliSecondsSinceUnixEpoch::now(),
337                new_content.into(),
338                Self::HIGH_PRIORITY,
339            )
340            .await
341            .map_err(RoomSendQueueStorageError::StateStoreError)?;
342
343        Ok(())
344    }
345
346    /// Consumes a finished upload of a thumbnail and queues the file upload.
347    pub(super) async fn handle_dependent_file_upload_with_thumbnail(
348        &self,
349        client: &Client,
350        next_upload_txn: OwnedTransactionId,
351        parent_key: SentRequestKey,
352        content_type: String,
353        cache_key: MediaRequestParameters,
354        event_txn: OwnedTransactionId,
355    ) -> Result<(), RoomSendQueueError> {
356        // The thumbnail has been sent, now transform the dependent file upload request
357        // into a ready one.
358        let sent_media = parent_key
359            .into_media()
360            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
361
362        // The media we just uploaded was a thumbnail, so the thumbnail shouldn't have
363        // a thumbnail itself.
364        debug_assert!(sent_media.thumbnail.is_none());
365        if sent_media.thumbnail.is_some() {
366            warn!("unexpected thumbnail for a thumbnail!");
367        }
368
369        trace!(related_to = %event_txn, "done uploading thumbnail, now queuing a request to send the media file itself");
370
371        let request = QueuedRequestKind::MediaUpload {
372            content_type,
373            cache_key,
374            // The thumbnail for the next upload is the file we just uploaded here.
375            thumbnail_source: Some(sent_media.file),
376            related_to: event_txn,
377        };
378
379        client
380            .store()
381            .save_send_queue_request(
382                &self.room_id,
383                next_upload_txn,
384                MilliSecondsSinceUnixEpoch::now(),
385                request,
386                Self::HIGH_PRIORITY,
387            )
388            .await
389            .map_err(RoomSendQueueStorageError::StateStoreError)?;
390
391        Ok(())
392    }
393
394    /// Try to abort an upload that would be ongoing.
395    ///
396    /// Return true if any media (media itself or its thumbnail) was being
397    /// uploaded. In this case, the media event has also been removed from
398    /// the send queue. If it returns false, then the uploads already
399    /// happened, and the event sending *may* have started.
400    #[instrument(skip(self, handles))]
401    pub(super) async fn abort_upload(
402        &self,
403        event_txn: &TransactionId,
404        handles: &MediaHandles,
405    ) -> Result<bool, RoomSendQueueStorageError> {
406        let mut guard = self.store.lock().await;
407        let client = guard.client()?;
408
409        // Keep the lock until we're done touching the storage.
410        debug!("trying to abort an upload");
411
412        let store = client.store();
413
414        let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
415        let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
416
417        let mut removed_dependent_upload = false;
418        let mut removed_dependent_event = false;
419
420        if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn {
421            if store.remove_send_queue_request(&self.room_id, thumbnail_txn).await? {
422                // The thumbnail upload existed as a request: either it was pending (something
423                // else was being sent), or it was actively being sent.
424                trace!("could remove thumbnail request, removing 2 dependent requests now");
425
426                // 1. Try to abort sending using the being_sent info, in case it was active.
427                if let Some(info) = guard.being_sent.as_ref() {
428                    if info.transaction_id == *thumbnail_txn {
429                        // SAFETY: we knew it was Some(), two lines above.
430                        let info = guard.being_sent.take().unwrap();
431                        if info.cancel_upload() {
432                            trace!("aborted ongoing thumbnail upload");
433                        }
434                    }
435                }
436
437                // 2. Remove the dependent requests.
438                removed_dependent_upload = store
439                    .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
440                    .await?;
441
442                if !removed_dependent_upload {
443                    warn!("unable to find the dependent file upload request");
444                }
445
446                removed_dependent_event = store
447                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
448                    .await?;
449
450                if !removed_dependent_event {
451                    warn!("unable to find the dependent media event upload request");
452                }
453            }
454        }
455
456        // If we're here:
457        // - either there was no thumbnail to upload,
458        // - or the thumbnail request has terminated already.
459        //
460        // So the next target is the upload request itself, in both cases.
461
462        if !removed_dependent_upload {
463            if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
464                // The upload existed as a request: either it was pending (something else was
465                // being sent), or it was actively being sent.
466                trace!("could remove file upload request, removing 1 dependent request");
467
468                // 1. Try to abort sending using the being_sent info, in case it was active.
469                if let Some(info) = guard.being_sent.as_ref() {
470                    if info.transaction_id == handles.upload_file_txn {
471                        // SAFETY: we knew it was Some(), two lines above.
472                        let info = guard.being_sent.take().unwrap();
473                        if info.cancel_upload() {
474                            trace!("aborted ongoing file upload");
475                        }
476                    }
477                }
478
479                // 2. Remove the dependent request.
480                if !store
481                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
482                    .await?
483                {
484                    warn!("unable to find the dependent media event upload request");
485                }
486            } else {
487                // The upload was not in the send queue, so it's completed.
488                //
489                // It means the event sending is either still queued as a dependent request, or
490                // it's graduated into a request.
491                if !removed_dependent_event
492                    && !store
493                        .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
494                        .await?
495                {
496                    // The media event has been promoted into a request, or the promoted request
497                    // has been sent already: we couldn't abort, let the caller decide what to do.
498                    debug!("uploads already happened => deferring to aborting an event sending");
499                    return Ok(false);
500                }
501            }
502        }
503
504        // At this point, all the requests and dependent requests have been cleaned up.
505        // Perform the final step: empty the cache from the local items.
506        {
507            let event_cache = client.event_cache_store().lock().await?;
508            event_cache
509                .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
510                .await?;
511            if let Some(txn) = &handles.upload_thumbnail_txn {
512                event_cache.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
513            }
514        }
515
516        debug!("successfully aborted!");
517        Ok(true)
518    }
519
520    #[instrument(skip(self, caption, formatted_caption))]
521    pub(super) async fn edit_media_caption(
522        &self,
523        txn: &TransactionId,
524        caption: Option<String>,
525        formatted_caption: Option<FormattedBody>,
526        mentions: Option<Mentions>,
527    ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
528        // This error will be popular here.
529        use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
530
531        let guard = self.store.lock().await;
532        let client = guard.client()?;
533        let store = client.store();
534
535        // The media event can be in one of three states:
536        // - still stored as a dependent request,
537        // - stored as a queued request, active (aka it's being sent).
538        // - stored as a queued request, not active yet (aka it's not being sent yet),
539        //
540        // We'll handle each of these cases one by one.
541
542        {
543            // If the event can be found as a dependent event, update the captions, save it
544            // back into the database, and return early.
545            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
546
547            if let Some(found) =
548                dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
549            {
550                trace!("found the caption to edit in a dependent request");
551
552                let DependentQueuedRequestKind::FinishUpload {
553                    mut local_echo,
554                    file_upload,
555                    thumbnail_info,
556                } = found.kind
557                else {
558                    return Err(InvalidMediaCaptionEdit);
559                };
560
561                if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
562                    return Err(InvalidMediaCaptionEdit);
563                }
564
565                let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
566                    local_echo: local_echo.clone(),
567                    file_upload,
568                    thumbnail_info,
569                };
570                store
571                    .update_dependent_queued_request(
572                        &self.room_id,
573                        &found.own_transaction_id,
574                        new_dependent_request,
575                    )
576                    .await?;
577
578                trace!("caption successfully updated");
579                return Ok(Some(local_echo.into()));
580            }
581        }
582
583        let requests = store.load_send_queue_requests(&self.room_id).await?;
584        let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
585            // Couldn't be found anymore, it's not possible to update captions.
586            return Ok(None);
587        };
588
589        trace!("found the caption to edit as a request");
590
591        let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
592            return Err(InvalidMediaCaptionEdit);
593        };
594
595        let deserialized = serialized_content.deserialize()?;
596        let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
597            return Err(InvalidMediaCaptionEdit);
598        };
599
600        if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
601            return Err(InvalidMediaCaptionEdit);
602        }
603
604        let any_content: AnyMessageLikeEventContent = content.into();
605        let new_serialized = SerializableEventContent::new(&any_content.clone())?;
606
607        // If the request is active (being sent), send a dependent request.
608        if let Some(being_sent) = guard.being_sent.as_ref() {
609            if being_sent.transaction_id == *txn {
610                // Record a dependent request to edit, and exit.
611                store
612                    .save_dependent_queued_request(
613                        &self.room_id,
614                        txn,
615                        ChildTransactionId::new(),
616                        MilliSecondsSinceUnixEpoch::now(),
617                        DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
618                    )
619                    .await?;
620
621                trace!("media event was being sent, pushed a dependent edit");
622                return Ok(Some(any_content));
623            }
624        }
625
626        // The request is not active: edit the local echo.
627        store
628            .update_send_queue_request(
629                &self.room_id,
630                txn,
631                QueuedRequestKind::Event { content: new_serialized },
632            )
633            .await?;
634
635        trace!("media event was not being sent, updated local echo");
636        Ok(Some(any_content))
637    }
638}