matrix_sdk/send_queue/
upload.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Private implementations of the media upload mechanism.
16
17#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21    event_cache::store::media::IgnoreMediaRetentionPolicy,
22    media::{MediaFormat, MediaRequestParameters},
23    store::{
24        ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
25        QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
26    },
27    RoomState,
28};
29#[cfg(feature = "unstable-msc4274")]
30use matrix_sdk_base::{
31    media::UniqueKey,
32    store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
33};
34use mime::Mime;
35#[cfg(feature = "unstable-msc4274")]
36use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
37use ruma::{
38    events::{
39        room::{
40            message::{FormattedBody, MessageType, RoomMessageEventContent},
41            MediaSource, ThumbnailInfo,
42        },
43        AnyMessageLikeEventContent, Mentions,
44    },
45    MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
46};
47use tracing::{debug, error, instrument, trace, warn, Span};
48
49use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError};
50use crate::{
51    attachment::{AttachmentConfig, Thumbnail},
52    room::edit::update_media_caption,
53    send_queue::{
54        LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
55        SendHandle,
56    },
57    Client, Media, Room,
58};
59#[cfg(feature = "unstable-msc4274")]
60use crate::{
61    attachment::{GalleryConfig, GalleryItemInfo},
62    send_queue::GalleryItemQueueInfo,
63};
64
65/// Replace the source by the final ones in all the media types handled by
66/// [`Room::make_attachment_type()`].
67fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
68    // Some variants look really similar below, but the `event` and `info` are all
69    // different types…
70    match &mut echo.msgtype {
71        MessageType::Audio(event) => {
72            event.source = sent.file;
73        }
74        MessageType::File(event) => {
75            event.source = sent.file;
76            if let Some(info) = event.info.as_mut() {
77                info.thumbnail_source = sent.thumbnail;
78            }
79        }
80        MessageType::Image(event) => {
81            event.source = sent.file;
82            if let Some(info) = event.info.as_mut() {
83                info.thumbnail_source = sent.thumbnail;
84            }
85        }
86        MessageType::Video(event) => {
87            event.source = sent.file;
88            if let Some(info) = event.info.as_mut() {
89                info.thumbnail_source = sent.thumbnail;
90            }
91        }
92
93        _ => {
94            // All `MessageType` created by `Room::make_attachment_type` should be
95            // handled here. The only way to end up here is that a message type has
96            // been tampered with in the database.
97            error!("Invalid message type in database: {}", echo.msgtype());
98            // Only crash debug builds.
99            debug_assert!(false, "invalid message type in database");
100        }
101    }
102}
103
104/// Replace the sources by the final ones in all the media types handled by
105/// [`Room::make_gallery_item_type()`].
106#[cfg(feature = "unstable-msc4274")]
107fn update_gallery_event_after_upload(
108    echo: &mut RoomMessageEventContent,
109    sent: HashMap<String, AccumulatedSentMediaInfo>,
110) {
111    let MessageType::Gallery(gallery) = &mut echo.msgtype else {
112        // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
113        // handled here. The only way to end up here is that a item type has
114        // been tampered with in the database.
115        error!("Invalid gallery item types in database");
116        // Only crash debug builds.
117        debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
118        return;
119    };
120
121    // Some variants look really similar below, but the `event` and `info` are all
122    // different types…
123    for itemtype in gallery.itemtypes.iter_mut() {
124        match itemtype {
125            GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
126                Some(sent) => event.source = sent.file.clone(),
127                None => error!("key for item {:?} does not exist on gallery event", event.source),
128            },
129            GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
130                Some(sent) => {
131                    event.source = sent.file.clone();
132                    if let Some(info) = event.info.as_mut() {
133                        info.thumbnail_source = sent.thumbnail.clone();
134                    }
135                }
136                None => error!("key for item {:?} does not exist on gallery event", event.source),
137            },
138            GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
139                Some(sent) => {
140                    event.source = sent.file.clone();
141                    if let Some(info) = event.info.as_mut() {
142                        info.thumbnail_source = sent.thumbnail.clone();
143                    }
144                }
145                None => error!("key for item {:?} does not exist on gallery event", event.source),
146            },
147            GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
148                Some(sent) => {
149                    event.source = sent.file.clone();
150                    if let Some(info) = event.info.as_mut() {
151                        info.thumbnail_source = sent.thumbnail.clone();
152                    }
153                }
154                None => error!("key for item {:?} does not exist on gallery event", event.source),
155            },
156
157            _ => {
158                // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
159                // handled here. The only way to end up here is that a item type has
160                // been tampered with in the database.
161                error!("Invalid gallery item types in database");
162                // Only crash debug builds.
163                debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
164            }
165        }
166    }
167}
168
169#[derive(Default)]
170struct MediaCacheResult {
171    upload_thumbnail_txn: Option<OwnedTransactionId>,
172    event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
173    queue_thumbnail_info: Option<QueueThumbnailInfo>,
174}
175
176impl RoomSendQueue {
177    /// Queues an attachment to be sent to the room, using the send queue.
178    ///
179    /// This returns quickly (without sending or uploading anything), and will
180    /// push the event to be sent into a queue, handled in the background.
181    ///
182    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
183    /// the [`Self::subscribe()`] method to get updates about the sending of
184    /// that event.
185    ///
186    /// By default, if sending failed on the first attempt, it will be retried a
187    /// few times. If sending failed after those retries, the entire
188    /// client's sending queue will be disabled, and it will need to be
189    /// manually re-enabled by the caller (e.g. after network is back, or when
190    /// something has been done about the faulty requests).
191    ///
192    /// The attachment and its optional thumbnail are stored in the media cache
193    /// and can be retrieved at any time, by calling
194    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
195    /// in the local or remote echo, and using a `MediaFormat::File`.
196    #[instrument(skip_all, fields(event_txn))]
197    pub async fn send_attachment(
198        &self,
199        filename: impl Into<String>,
200        content_type: Mime,
201        data: Vec<u8>,
202        mut config: AttachmentConfig,
203    ) -> Result<SendHandle, RoomSendQueueError> {
204        let Some(room) = self.inner.room.get() else {
205            return Err(RoomSendQueueError::RoomDisappeared);
206        };
207
208        if room.state() != RoomState::Joined {
209            return Err(RoomSendQueueError::RoomNotJoined);
210        }
211
212        let filename = filename.into();
213        let upload_file_txn = TransactionId::new();
214        let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
215
216        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
217        debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
218
219        let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
220
221        let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
222            RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
223                .await?;
224
225        // Create the content for the media event.
226        let event_content = room
227            .make_media_event(
228                Room::make_attachment_type(
229                    &content_type,
230                    filename,
231                    file_media_request.source.clone(),
232                    config.caption,
233                    config.formatted_caption,
234                    config.info,
235                    event_thumbnail_info,
236                ),
237                config.mentions,
238                config.reply,
239            )
240            .await
241            .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
242
243        let created_at = MilliSecondsSinceUnixEpoch::now();
244
245        // Save requests in the queue storage.
246        self.inner
247            .queue
248            .push_media(
249                event_content.clone(),
250                content_type,
251                send_event_txn.clone().into(),
252                created_at,
253                upload_file_txn.clone(),
254                file_media_request,
255                queue_thumbnail_info,
256            )
257            .await?;
258
259        trace!("manager sends a media to the background task");
260
261        self.inner.notifier.notify_one();
262
263        let send_handle = SendHandle {
264            room: self.clone(),
265            transaction_id: send_event_txn.clone().into(),
266            media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
267            created_at,
268        };
269
270        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
271            transaction_id: send_event_txn.clone().into(),
272            content: LocalEchoContent::Event {
273                serialized_event: SerializableEventContent::new(&event_content.into())
274                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
275                send_handle: send_handle.clone(),
276                send_error: None,
277            },
278        }));
279
280        Ok(send_handle)
281    }
282
283    /// Queues a gallery to be sent to the room, using the send queue.
284    ///
285    /// This returns quickly (without sending or uploading anything), and will
286    /// push the event to be sent into a queue, handled in the background.
287    ///
288    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
289    /// the [`Self::subscribe()`] method to get updates about the sending of
290    /// that event.
291    ///
292    /// By default, if sending failed on the first attempt, it will be retried a
293    /// few times. If sending failed after those retries, the entire
294    /// client's sending queue will be disabled, and it will need to be
295    /// manually re-enabled by the caller (e.g. after network is back, or when
296    /// something has been done about the faulty requests).
297    ///
298    /// The attachments and their optional thumbnails are stored in the media
299    /// cache and can be retrieved at any time, by calling
300    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
301    /// in the local or remote echo, and using a `MediaFormat::File`.
302    #[cfg(feature = "unstable-msc4274")]
303    #[instrument(skip_all, fields(event_txn))]
304    pub async fn send_gallery(
305        &self,
306        gallery: GalleryConfig,
307    ) -> Result<SendHandle, RoomSendQueueError> {
308        let Some(room) = self.inner.room.get() else {
309            return Err(RoomSendQueueError::RoomDisappeared);
310        };
311
312        if room.state() != RoomState::Joined {
313            return Err(RoomSendQueueError::RoomNotJoined);
314        }
315
316        if gallery.is_empty() {
317            return Err(RoomSendQueueError::EmptyGallery);
318        }
319
320        let send_event_txn =
321            gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
322
323        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
324
325        let mut item_types = Vec::with_capacity(gallery.len());
326        let mut item_queue_infos = Vec::with_capacity(gallery.len());
327        let mut media_handles = Vec::with_capacity(gallery.len());
328
329        for item_info in gallery.items {
330            let GalleryItemInfo { filename, content_type, data, .. } = item_info;
331
332            let upload_file_txn = TransactionId::new();
333
334            debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
335
336            let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
337
338            let MediaCacheResult {
339                upload_thumbnail_txn,
340                event_thumbnail_info,
341                queue_thumbnail_info,
342            } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
343                .await?;
344
345            item_types.push(Room::make_gallery_item_type(
346                &content_type,
347                filename,
348                file_media_request.source.clone(),
349                item_info.caption,
350                item_info.formatted_caption,
351                Some(item_info.attachment_info),
352                event_thumbnail_info,
353            ));
354
355            item_queue_infos.push(GalleryItemQueueInfo {
356                content_type,
357                upload_file_txn: upload_file_txn.clone(),
358                file_media_request,
359                thumbnail: queue_thumbnail_info,
360            });
361
362            media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
363        }
364
365        // Create the content for the gallery event.
366        let event_content = room
367            .make_media_event(
368                MessageType::Gallery(GalleryMessageEventContent::new(
369                    gallery.caption.unwrap_or_default(),
370                    gallery.formatted_caption,
371                    item_types,
372                )),
373                gallery.mentions,
374                gallery.reply,
375            )
376            .await
377            .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
378
379        let created_at = MilliSecondsSinceUnixEpoch::now();
380
381        // Save requests in the queue storage.
382        self.inner
383            .queue
384            .push_gallery(
385                event_content.clone(),
386                send_event_txn.clone().into(),
387                created_at,
388                item_queue_infos,
389            )
390            .await?;
391
392        trace!("manager sends a gallery to the background task");
393
394        self.inner.notifier.notify_one();
395
396        let send_handle = SendHandle {
397            room: self.clone(),
398            transaction_id: send_event_txn.clone().into(),
399            media_handles,
400            created_at,
401        };
402
403        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
404            transaction_id: send_event_txn.clone().into(),
405            content: LocalEchoContent::Event {
406                serialized_event: SerializableEventContent::new(&event_content.into())
407                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
408                send_handle: send_handle.clone(),
409                send_error: None,
410            },
411        }));
412
413        Ok(send_handle)
414    }
415
416    async fn cache_media(
417        room: &Room,
418        data: Vec<u8>,
419        thumbnail: Option<Thumbnail>,
420        file_media_request: &MediaRequestParameters,
421    ) -> Result<MediaCacheResult, RoomSendQueueError> {
422        let client = room.client();
423        let cache_store = client
424            .event_cache_store()
425            .lock()
426            .await
427            .map_err(RoomSendQueueStorageError::LockError)?;
428
429        // Cache the file itself in the cache store.
430        cache_store
431            .add_media_content(
432                file_media_request,
433                data,
434                // Make sure that the file is stored until it has been uploaded.
435                IgnoreMediaRetentionPolicy::Yes,
436            )
437            .await
438            .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
439
440        // Process the thumbnail, if it's been provided.
441        if let Some(thumbnail) = thumbnail {
442            let txn = TransactionId::new();
443            trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
444
445            // Create the information required for filling the thumbnail section of the
446            // event.
447            let (data, content_type, thumbnail_info) = thumbnail.into_parts();
448            let file_size = data.len();
449
450            // Cache thumbnail in the cache store.
451            let thumbnail_media_request = Media::make_local_file_media_request(&txn);
452            cache_store
453                .add_media_content(
454                    &thumbnail_media_request,
455                    data,
456                    // Make sure that the thumbnail is stored until it has been uploaded.
457                    IgnoreMediaRetentionPolicy::Yes,
458                )
459                .await
460                .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
461
462            Ok(MediaCacheResult {
463                upload_thumbnail_txn: Some(txn.clone()),
464                event_thumbnail_info: Some((
465                    thumbnail_media_request.source.clone(),
466                    thumbnail_info,
467                )),
468                queue_thumbnail_info: Some(QueueThumbnailInfo {
469                    finish_upload_thumbnail_info: FinishUploadThumbnailInfo {
470                        txn,
471                        width: None,
472                        height: None,
473                    },
474                    media_request_parameters: thumbnail_media_request,
475                    content_type,
476                    file_size,
477                }),
478            })
479        } else {
480            Ok(Default::default())
481        }
482    }
483}
484
485impl QueueStorage {
486    /// Consumes a finished upload and queues sending of the final media event.
487    #[allow(clippy::too_many_arguments)]
488    pub(super) async fn handle_dependent_finish_upload(
489        &self,
490        client: &Client,
491        event_txn: OwnedTransactionId,
492        parent_key: SentRequestKey,
493        mut local_echo: RoomMessageEventContent,
494        file_upload_txn: OwnedTransactionId,
495        thumbnail_info: Option<FinishUploadThumbnailInfo>,
496        new_updates: &mut Vec<RoomSendQueueUpdate>,
497    ) -> Result<(), RoomSendQueueError> {
498        // Both uploads are ready: enqueue the event with its final data.
499        let sent_media = parent_key
500            .into_media()
501            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
502
503        update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
504            .await?;
505        update_media_event_after_upload(&mut local_echo, sent_media);
506
507        let new_content = SerializableEventContent::new(&local_echo.into())
508            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
509
510        // Indicates observers that the upload finished, by editing the local echo for
511        // the event into its final form before sending.
512        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
513            transaction_id: event_txn.clone(),
514            new_content: new_content.clone(),
515        });
516
517        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
518
519        client
520            .state_store()
521            .save_send_queue_request(
522                &self.room_id,
523                event_txn,
524                MilliSecondsSinceUnixEpoch::now(),
525                new_content.into(),
526                Self::HIGH_PRIORITY,
527            )
528            .await
529            .map_err(RoomSendQueueStorageError::StateStoreError)?;
530
531        Ok(())
532    }
533
534    /// Consumes a finished gallery upload and queues sending of the final
535    /// gallery event.
536    #[cfg(feature = "unstable-msc4274")]
537    #[allow(clippy::too_many_arguments)]
538    pub(super) async fn handle_dependent_finish_gallery_upload(
539        &self,
540        client: &Client,
541        event_txn: OwnedTransactionId,
542        parent_key: SentRequestKey,
543        mut local_echo: RoomMessageEventContent,
544        item_infos: Vec<FinishGalleryItemInfo>,
545        new_updates: &mut Vec<RoomSendQueueUpdate>,
546    ) -> Result<(), RoomSendQueueError> {
547        // All uploads are ready: enqueue the event with its final data.
548        let sent_gallery = parent_key
549            .into_media()
550            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
551
552        let mut sent_media_vec = sent_gallery.accumulated;
553        sent_media_vec.push(AccumulatedSentMediaInfo {
554            file: sent_gallery.file,
555            thumbnail: sent_gallery.thumbnail,
556        });
557
558        let mut sent_infos = HashMap::new();
559
560        for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
561            let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
562
563            // Store the sent media under the original cache key for later insertion into
564            // the local echo.
565            let from_req = Media::make_local_file_media_request(&file_upload_txn);
566            sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
567
568            update_media_cache_keys_after_upload(
569                client,
570                &file_upload_txn,
571                thumbnail_info,
572                &sent_media.into(),
573            )
574            .await?;
575        }
576
577        update_gallery_event_after_upload(&mut local_echo, sent_infos);
578
579        let new_content = SerializableEventContent::new(&local_echo.into())
580            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
581
582        // Indicates observers that the upload finished, by editing the local echo for
583        // the event into its final form before sending.
584        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
585            transaction_id: event_txn.clone(),
586            new_content: new_content.clone(),
587        });
588
589        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
590
591        client
592            .state_store()
593            .save_send_queue_request(
594                &self.room_id,
595                event_txn,
596                MilliSecondsSinceUnixEpoch::now(),
597                new_content.into(),
598                Self::HIGH_PRIORITY,
599            )
600            .await
601            .map_err(RoomSendQueueStorageError::StateStoreError)?;
602
603        Ok(())
604    }
605
606    /// Consumes a finished file or thumbnail upload and queues the dependent
607    /// file or thumbnail upload.
608    #[allow(clippy::too_many_arguments)]
609    pub(super) async fn handle_dependent_file_or_thumbnail_upload(
610        &self,
611        client: &Client,
612        next_upload_txn: OwnedTransactionId,
613        parent_key: SentRequestKey,
614        content_type: String,
615        cache_key: MediaRequestParameters,
616        event_txn: OwnedTransactionId,
617        parent_is_thumbnail_upload: bool,
618    ) -> Result<(), RoomSendQueueError> {
619        // The previous file or thumbnail has been sent, now transform the dependent
620        // file or thumbnail upload request into a ready one.
621        let sent_media = parent_key
622            .into_media()
623            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
624
625        // If the previous upload was a thumbnail, it shouldn't have
626        // a thumbnail itself.
627        if parent_is_thumbnail_upload {
628            debug_assert!(sent_media.thumbnail.is_none());
629            if sent_media.thumbnail.is_some() {
630                warn!("unexpected thumbnail for a thumbnail!");
631            }
632        }
633
634        trace!(
635            related_to = %event_txn,
636            "done uploading file or thumbnail, now queuing the dependent file \
637             or thumbnail upload request",
638        );
639
640        // If the parent request was a thumbnail upload, don't add it to the list of
641        // accumulated medias yet because its dependent file upload is still
642        // pending. If the parent request was a file upload, we know that both
643        // the file and its thumbnail (if any) have finished uploading and we
644        // can add them to the accumulated sent media.
645        #[cfg(feature = "unstable-msc4274")]
646        let accumulated = if parent_is_thumbnail_upload {
647            sent_media.accumulated
648        } else {
649            let mut accumulated = sent_media.accumulated;
650            accumulated.push(AccumulatedSentMediaInfo {
651                file: sent_media.file.clone(),
652                thumbnail: sent_media.thumbnail,
653            });
654            accumulated
655        };
656
657        let request = QueuedRequestKind::MediaUpload {
658            content_type,
659            cache_key,
660            // If the previous upload was a thumbnail, it becomes the thumbnail source for the next
661            // upload.
662            thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
663            related_to: event_txn,
664            #[cfg(feature = "unstable-msc4274")]
665            accumulated,
666        };
667
668        client
669            .state_store()
670            .save_send_queue_request(
671                &self.room_id,
672                next_upload_txn,
673                MilliSecondsSinceUnixEpoch::now(),
674                request,
675                Self::HIGH_PRIORITY,
676            )
677            .await
678            .map_err(RoomSendQueueStorageError::StateStoreError)?;
679
680        Ok(())
681    }
682
683    /// Try to abort an upload that would be ongoing.
684    ///
685    /// Return true if any media (media itself or its thumbnail) was being
686    /// uploaded. In this case, the media event has also been removed from
687    /// the send queue. If it returns false, then the uploads already
688    /// happened, and the event sending *may* have started.
689    #[instrument(skip(self, handles))]
690    pub(super) async fn abort_upload(
691        &self,
692        event_txn: &TransactionId,
693        handles: &MediaHandles,
694    ) -> Result<bool, RoomSendQueueStorageError> {
695        let mut guard = self.store.lock().await;
696        let client = guard.client()?;
697
698        // Keep the lock until we're done touching the storage.
699        debug!("trying to abort an upload");
700
701        let store = client.state_store();
702
703        let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
704        let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
705
706        let mut removed_dependent_upload = false;
707        let mut removed_dependent_event = false;
708
709        if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn {
710            if store.remove_send_queue_request(&self.room_id, thumbnail_txn).await? {
711                // The thumbnail upload existed as a request: either it was pending (something
712                // else was being sent), or it was actively being sent.
713                trace!("could remove thumbnail request, removing 2 dependent requests now");
714
715                // 1. Try to abort sending using the being_sent info, in case it was active.
716                if let Some(info) = guard.being_sent.as_ref() {
717                    if info.transaction_id == *thumbnail_txn {
718                        // SAFETY: we knew it was Some(), two lines above.
719                        let info = guard.being_sent.take().unwrap();
720                        if info.cancel_upload() {
721                            trace!("aborted ongoing thumbnail upload");
722                        }
723                    }
724                }
725
726                // 2. Remove the dependent requests.
727                removed_dependent_upload = store
728                    .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
729                    .await?;
730
731                if !removed_dependent_upload {
732                    warn!("unable to find the dependent file upload request");
733                }
734
735                removed_dependent_event = store
736                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
737                    .await?;
738
739                if !removed_dependent_event {
740                    warn!("unable to find the dependent media event upload request");
741                }
742            }
743        }
744
745        // If we're here:
746        // - either there was no thumbnail to upload,
747        // - or the thumbnail request has terminated already.
748        //
749        // So the next target is the upload request itself, in both cases.
750
751        if !removed_dependent_upload {
752            if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
753                // The upload existed as a request: either it was pending (something else was
754                // being sent), or it was actively being sent.
755                trace!("could remove file upload request, removing 1 dependent request");
756
757                // 1. Try to abort sending using the being_sent info, in case it was active.
758                if let Some(info) = guard.being_sent.as_ref() {
759                    if info.transaction_id == handles.upload_file_txn {
760                        // SAFETY: we knew it was Some(), two lines above.
761                        let info = guard.being_sent.take().unwrap();
762                        if info.cancel_upload() {
763                            trace!("aborted ongoing file upload");
764                        }
765                    }
766                }
767
768                // 2. Remove the dependent request.
769                if !store
770                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
771                    .await?
772                {
773                    warn!("unable to find the dependent media event upload request");
774                }
775            } else {
776                // The upload was not in the send queue, so it's completed.
777                //
778                // It means the event sending is either still queued as a dependent request, or
779                // it's graduated into a request.
780                if !removed_dependent_event
781                    && !store
782                        .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
783                        .await?
784                {
785                    // The media event has been promoted into a request, or the promoted request
786                    // has been sent already: we couldn't abort, let the caller decide what to do.
787                    debug!("uploads already happened => deferring to aborting an event sending");
788                    return Ok(false);
789                }
790            }
791        }
792
793        // At this point, all the requests and dependent requests have been cleaned up.
794        // Perform the final step: empty the cache from the local items.
795        {
796            let event_cache = client.event_cache_store().lock().await?;
797            event_cache
798                .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
799                .await?;
800            if let Some(txn) = &handles.upload_thumbnail_txn {
801                event_cache.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
802            }
803        }
804
805        debug!("successfully aborted!");
806        Ok(true)
807    }
808
809    #[instrument(skip(self, caption, formatted_caption))]
810    pub(super) async fn edit_media_caption(
811        &self,
812        txn: &TransactionId,
813        caption: Option<String>,
814        formatted_caption: Option<FormattedBody>,
815        mentions: Option<Mentions>,
816    ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
817        // This error will be popular here.
818        use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
819
820        let guard = self.store.lock().await;
821        let client = guard.client()?;
822        let store = client.state_store();
823
824        // The media event can be in one of three states:
825        // - still stored as a dependent request,
826        // - stored as a queued request, active (aka it's being sent).
827        // - stored as a queued request, not active yet (aka it's not being sent yet),
828        //
829        // We'll handle each of these cases one by one.
830
831        {
832            // If the event can be found as a dependent event, update the captions, save it
833            // back into the database, and return early.
834            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
835
836            if let Some(found) =
837                dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
838            {
839                trace!("found the caption to edit in a dependent request");
840
841                let DependentQueuedRequestKind::FinishUpload {
842                    mut local_echo,
843                    file_upload,
844                    thumbnail_info,
845                } = found.kind
846                else {
847                    return Err(InvalidMediaCaptionEdit);
848                };
849
850                if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
851                    return Err(InvalidMediaCaptionEdit);
852                }
853
854                let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
855                    local_echo: local_echo.clone(),
856                    file_upload,
857                    thumbnail_info,
858                };
859                store
860                    .update_dependent_queued_request(
861                        &self.room_id,
862                        &found.own_transaction_id,
863                        new_dependent_request,
864                    )
865                    .await?;
866
867                trace!("caption successfully updated");
868                return Ok(Some((*local_echo).into()));
869            }
870        }
871
872        let requests = store.load_send_queue_requests(&self.room_id).await?;
873        let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
874            // Couldn't be found anymore, it's not possible to update captions.
875            return Ok(None);
876        };
877
878        trace!("found the caption to edit as a request");
879
880        let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
881            return Err(InvalidMediaCaptionEdit);
882        };
883
884        let deserialized = serialized_content.deserialize()?;
885        let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
886            return Err(InvalidMediaCaptionEdit);
887        };
888
889        if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
890            return Err(InvalidMediaCaptionEdit);
891        }
892
893        let any_content: AnyMessageLikeEventContent = content.into();
894        let new_serialized = SerializableEventContent::new(&any_content.clone())?;
895
896        // If the request is active (being sent), send a dependent request.
897        if let Some(being_sent) = guard.being_sent.as_ref() {
898            if being_sent.transaction_id == *txn {
899                // Record a dependent request to edit, and exit.
900                store
901                    .save_dependent_queued_request(
902                        &self.room_id,
903                        txn,
904                        ChildTransactionId::new(),
905                        MilliSecondsSinceUnixEpoch::now(),
906                        DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
907                    )
908                    .await?;
909
910                trace!("media event was being sent, pushed a dependent edit");
911                return Ok(Some(any_content));
912            }
913        }
914
915        // The request is not active: edit the local echo.
916        store
917            .update_send_queue_request(
918                &self.room_id,
919                txn,
920                QueuedRequestKind::Event { content: new_serialized },
921            )
922            .await?;
923
924        trace!("media event was not being sent, updated local echo");
925        Ok(Some(any_content))
926    }
927}
928
929/// Update cache keys in the cache store after uploading a media file /
930/// thumbnail.
931async fn update_media_cache_keys_after_upload(
932    client: &Client,
933    file_upload_txn: &OwnedTransactionId,
934    thumbnail_info: Option<FinishUploadThumbnailInfo>,
935    sent_media: &SentMediaInfo,
936) -> Result<(), RoomSendQueueError> {
937    // Do it for the file itself.
938    let from_req = Media::make_local_file_media_request(file_upload_txn);
939
940    trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
941    let cache_store =
942        client.event_cache_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
943
944    // The media can now be removed during cleanups.
945    cache_store
946        .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
947        .await
948        .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
949
950    cache_store
951        .replace_media_key(
952            &from_req,
953            &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
954        )
955        .await
956        .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
957
958    // Rename the thumbnail too, if needs be.
959    if let Some((info, new_source)) = thumbnail_info.as_ref().zip(sent_media.thumbnail.clone()) {
960        // Previously the media request used `MediaFormat::Thumbnail`. Handle this case
961        // for send queue requests that were in the state store before the change.
962        let from_req = if let Some((height, width)) = info.height.zip(info.width) {
963            Media::make_local_thumbnail_media_request(&info.txn, height, width)
964        } else {
965            Media::make_local_file_media_request(&info.txn)
966        };
967
968        trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
969
970        // The media can now be removed during cleanups.
971        cache_store
972            .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
973            .await
974            .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
975
976        cache_store
977            .replace_media_key(
978                &from_req,
979                &MediaRequestParameters { source: new_source, format: MediaFormat::File },
980            )
981            .await
982            .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
983    }
984
985    Ok(())
986}