matrix_sdk/send_queue/
upload.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Private implementations of the media upload mechanism.
16
17#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21    RoomState,
22    media::{MediaFormat, MediaRequestParameters, store::IgnoreMediaRetentionPolicy},
23    store::{
24        ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
25        QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
26    },
27};
28#[cfg(feature = "unstable-msc4274")]
29use matrix_sdk_base::{
30    media::UniqueKey,
31    store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
32};
33use mime::Mime;
34#[cfg(feature = "unstable-msc4274")]
35use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
36use ruma::{
37    MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
38    events::{
39        AnyMessageLikeEventContent, Mentions,
40        room::{
41            MediaSource, ThumbnailInfo,
42            message::{FormattedBody, MessageType, RoomMessageEventContent},
43        },
44    },
45};
46use tracing::{Span, debug, error, instrument, trace, warn};
47
48use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError};
49use crate::{
50    Client, Media, Room,
51    attachment::{AttachmentConfig, Thumbnail},
52    room::edit::update_media_caption,
53    send_queue::{
54        LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
55        SendHandle,
56    },
57};
58#[cfg(feature = "unstable-msc4274")]
59use crate::{
60    attachment::{GalleryConfig, GalleryItemInfo},
61    send_queue::GalleryItemQueueInfo,
62};
63
64/// Replace the source by the final ones in all the media types handled by
65/// [`Room::make_attachment_type()`].
66fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
67    // Some variants look really similar below, but the `event` and `info` are all
68    // different types…
69    match &mut echo.msgtype {
70        MessageType::Audio(event) => {
71            event.source = sent.file;
72        }
73        MessageType::File(event) => {
74            event.source = sent.file;
75            if let Some(info) = event.info.as_mut() {
76                info.thumbnail_source = sent.thumbnail;
77            }
78        }
79        MessageType::Image(event) => {
80            event.source = sent.file;
81            if let Some(info) = event.info.as_mut() {
82                info.thumbnail_source = sent.thumbnail;
83            }
84        }
85        MessageType::Video(event) => {
86            event.source = sent.file;
87            if let Some(info) = event.info.as_mut() {
88                info.thumbnail_source = sent.thumbnail;
89            }
90        }
91
92        _ => {
93            // All `MessageType` created by `Room::make_attachment_type` should be
94            // handled here. The only way to end up here is that a message type has
95            // been tampered with in the database.
96            error!("Invalid message type in database: {}", echo.msgtype());
97            // Only crash debug builds.
98            debug_assert!(false, "invalid message type in database");
99        }
100    }
101}
102
103/// Replace the sources by the final ones in all the media types handled by
104/// [`Room::make_gallery_item_type()`].
105#[cfg(feature = "unstable-msc4274")]
106fn update_gallery_event_after_upload(
107    echo: &mut RoomMessageEventContent,
108    sent: HashMap<String, AccumulatedSentMediaInfo>,
109) {
110    let MessageType::Gallery(gallery) = &mut echo.msgtype else {
111        // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
112        // handled here. The only way to end up here is that a item type has
113        // been tampered with in the database.
114        error!("Invalid gallery item types in database");
115        // Only crash debug builds.
116        debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
117        return;
118    };
119
120    // Some variants look really similar below, but the `event` and `info` are all
121    // different types…
122    for itemtype in gallery.itemtypes.iter_mut() {
123        match itemtype {
124            GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
125                Some(sent) => event.source = sent.file.clone(),
126                None => error!("key for item {:?} does not exist on gallery event", event.source),
127            },
128            GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
129                Some(sent) => {
130                    event.source = sent.file.clone();
131                    if let Some(info) = event.info.as_mut() {
132                        info.thumbnail_source = sent.thumbnail.clone();
133                    }
134                }
135                None => error!("key for item {:?} does not exist on gallery event", event.source),
136            },
137            GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
138                Some(sent) => {
139                    event.source = sent.file.clone();
140                    if let Some(info) = event.info.as_mut() {
141                        info.thumbnail_source = sent.thumbnail.clone();
142                    }
143                }
144                None => error!("key for item {:?} does not exist on gallery event", event.source),
145            },
146            GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
147                Some(sent) => {
148                    event.source = sent.file.clone();
149                    if let Some(info) = event.info.as_mut() {
150                        info.thumbnail_source = sent.thumbnail.clone();
151                    }
152                }
153                None => error!("key for item {:?} does not exist on gallery event", event.source),
154            },
155
156            _ => {
157                // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
158                // handled here. The only way to end up here is that a item type has
159                // been tampered with in the database.
160                error!("Invalid gallery item types in database");
161                // Only crash debug builds.
162                debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
163            }
164        }
165    }
166}
167
168#[derive(Default)]
169struct MediaCacheResult {
170    upload_thumbnail_txn: Option<OwnedTransactionId>,
171    event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
172    queue_thumbnail_info: Option<QueueThumbnailInfo>,
173}
174
175impl RoomSendQueue {
176    /// Queues an attachment to be sent to the room, using the send queue.
177    ///
178    /// This returns quickly (without sending or uploading anything), and will
179    /// push the event to be sent into a queue, handled in the background.
180    ///
181    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
182    /// the [`Self::subscribe()`] method to get updates about the sending of
183    /// that event.
184    ///
185    /// By default, if sending failed on the first attempt, it will be retried a
186    /// few times. If sending failed after those retries, the entire
187    /// client's sending queue will be disabled, and it will need to be
188    /// manually re-enabled by the caller (e.g. after network is back, or when
189    /// something has been done about the faulty requests).
190    ///
191    /// The attachment and its optional thumbnail are stored in the media cache
192    /// and can be retrieved at any time, by calling
193    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
194    /// in the local or remote echo, and using a `MediaFormat::File`.
195    #[instrument(skip_all, fields(event_txn))]
196    pub async fn send_attachment(
197        &self,
198        filename: impl Into<String>,
199        content_type: Mime,
200        data: Vec<u8>,
201        mut config: AttachmentConfig,
202    ) -> Result<SendHandle, RoomSendQueueError> {
203        let Some(room) = self.inner.room.get() else {
204            return Err(RoomSendQueueError::RoomDisappeared);
205        };
206
207        if room.state() != RoomState::Joined {
208            return Err(RoomSendQueueError::RoomNotJoined);
209        }
210
211        let filename = filename.into();
212        let upload_file_txn = TransactionId::new();
213        let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
214
215        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
216        debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
217
218        let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
219
220        let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
221            RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
222                .await?;
223
224        // Create the content for the media event.
225        let event_content = room
226            .make_media_event(
227                Room::make_attachment_type(
228                    &content_type,
229                    filename,
230                    file_media_request.source.clone(),
231                    config.caption,
232                    config.info,
233                    event_thumbnail_info,
234                ),
235                config.mentions,
236                config.reply,
237            )
238            .await
239            .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
240
241        let created_at = MilliSecondsSinceUnixEpoch::now();
242
243        // Save requests in the queue storage.
244        self.inner
245            .queue
246            .push_media(
247                event_content.clone(),
248                content_type,
249                send_event_txn.clone().into(),
250                created_at,
251                upload_file_txn.clone(),
252                file_media_request,
253                queue_thumbnail_info,
254            )
255            .await?;
256
257        trace!("manager sends a media to the background task");
258
259        self.inner.notifier.notify_one();
260
261        let send_handle = SendHandle {
262            room: self.clone(),
263            transaction_id: send_event_txn.clone().into(),
264            media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
265            created_at,
266        };
267
268        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
269            transaction_id: send_event_txn.clone().into(),
270            content: LocalEchoContent::Event {
271                serialized_event: SerializableEventContent::new(&event_content.into())
272                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
273                send_handle: send_handle.clone(),
274                send_error: None,
275            },
276        }));
277
278        Ok(send_handle)
279    }
280
281    /// Queues a gallery to be sent to the room, using the send queue.
282    ///
283    /// This returns quickly (without sending or uploading anything), and will
284    /// push the event to be sent into a queue, handled in the background.
285    ///
286    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
287    /// the [`Self::subscribe()`] method to get updates about the sending of
288    /// that event.
289    ///
290    /// By default, if sending failed on the first attempt, it will be retried a
291    /// few times. If sending failed after those retries, the entire
292    /// client's sending queue will be disabled, and it will need to be
293    /// manually re-enabled by the caller (e.g. after network is back, or when
294    /// something has been done about the faulty requests).
295    ///
296    /// The attachments and their optional thumbnails are stored in the media
297    /// cache and can be retrieved at any time, by calling
298    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
299    /// in the local or remote echo, and using a `MediaFormat::File`.
300    #[cfg(feature = "unstable-msc4274")]
301    #[instrument(skip_all, fields(event_txn))]
302    pub async fn send_gallery(
303        &self,
304        gallery: GalleryConfig,
305    ) -> Result<SendHandle, RoomSendQueueError> {
306        let Some(room) = self.inner.room.get() else {
307            return Err(RoomSendQueueError::RoomDisappeared);
308        };
309
310        if room.state() != RoomState::Joined {
311            return Err(RoomSendQueueError::RoomNotJoined);
312        }
313
314        if gallery.is_empty() {
315            return Err(RoomSendQueueError::EmptyGallery);
316        }
317
318        let send_event_txn =
319            gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
320
321        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
322
323        let mut item_types = Vec::with_capacity(gallery.len());
324        let mut item_queue_infos = Vec::with_capacity(gallery.len());
325        let mut media_handles = Vec::with_capacity(gallery.len());
326
327        for item_info in gallery.items {
328            let GalleryItemInfo { filename, content_type, data, .. } = item_info;
329
330            let upload_file_txn = TransactionId::new();
331
332            debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
333
334            let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
335
336            let MediaCacheResult {
337                upload_thumbnail_txn,
338                event_thumbnail_info,
339                queue_thumbnail_info,
340            } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
341                .await?;
342
343            item_types.push(Room::make_gallery_item_type(
344                &content_type,
345                filename,
346                file_media_request.source.clone(),
347                item_info.caption,
348                Some(item_info.attachment_info),
349                event_thumbnail_info,
350            ));
351
352            item_queue_infos.push(GalleryItemQueueInfo {
353                content_type,
354                upload_file_txn: upload_file_txn.clone(),
355                file_media_request,
356                thumbnail: queue_thumbnail_info,
357            });
358
359            media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
360        }
361
362        // Create the content for the gallery event.
363        let (body, formatted) =
364            gallery.caption.map(|caption| (caption.body, caption.formatted)).unwrap_or_default();
365        let event_content = room
366            .make_media_event(
367                MessageType::Gallery(GalleryMessageEventContent::new(body, formatted, item_types)),
368                gallery.mentions,
369                gallery.reply,
370            )
371            .await
372            .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
373
374        let created_at = MilliSecondsSinceUnixEpoch::now();
375
376        // Save requests in the queue storage.
377        self.inner
378            .queue
379            .push_gallery(
380                event_content.clone(),
381                send_event_txn.clone().into(),
382                created_at,
383                item_queue_infos,
384            )
385            .await?;
386
387        trace!("manager sends a gallery to the background task");
388
389        self.inner.notifier.notify_one();
390
391        let send_handle = SendHandle {
392            room: self.clone(),
393            transaction_id: send_event_txn.clone().into(),
394            media_handles,
395            created_at,
396        };
397
398        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
399            transaction_id: send_event_txn.clone().into(),
400            content: LocalEchoContent::Event {
401                serialized_event: SerializableEventContent::new(&event_content.into())
402                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
403                send_handle: send_handle.clone(),
404                send_error: None,
405            },
406        }));
407
408        Ok(send_handle)
409    }
410
411    async fn cache_media(
412        room: &Room,
413        data: Vec<u8>,
414        thumbnail: Option<Thumbnail>,
415        file_media_request: &MediaRequestParameters,
416    ) -> Result<MediaCacheResult, RoomSendQueueError> {
417        let client = room.client();
418        let media_store =
419            client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
420
421        // Cache the file itself in the cache store.
422        media_store
423            .add_media_content(
424                file_media_request,
425                data,
426                // Make sure that the file is stored until it has been uploaded.
427                IgnoreMediaRetentionPolicy::Yes,
428            )
429            .await
430            .map_err(RoomSendQueueStorageError::MediaStoreError)?;
431
432        // Process the thumbnail, if it's been provided.
433        if let Some(thumbnail) = thumbnail {
434            let txn = TransactionId::new();
435            trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
436
437            // Create the information required for filling the thumbnail section of the
438            // event.
439            let (data, content_type, thumbnail_info) = thumbnail.into_parts();
440            let file_size = data.len();
441
442            // Cache thumbnail in the cache store.
443            let thumbnail_media_request = Media::make_local_file_media_request(&txn);
444            media_store
445                .add_media_content(
446                    &thumbnail_media_request,
447                    data,
448                    // Make sure that the thumbnail is stored until it has been uploaded.
449                    IgnoreMediaRetentionPolicy::Yes,
450                )
451                .await
452                .map_err(RoomSendQueueStorageError::MediaStoreError)?;
453
454            Ok(MediaCacheResult {
455                upload_thumbnail_txn: Some(txn.clone()),
456                event_thumbnail_info: Some((
457                    thumbnail_media_request.source.clone(),
458                    thumbnail_info,
459                )),
460                queue_thumbnail_info: Some(QueueThumbnailInfo {
461                    finish_upload_thumbnail_info: FinishUploadThumbnailInfo {
462                        txn,
463                        width: None,
464                        height: None,
465                    },
466                    media_request_parameters: thumbnail_media_request,
467                    content_type,
468                    file_size,
469                }),
470            })
471        } else {
472            Ok(Default::default())
473        }
474    }
475}
476
477impl QueueStorage {
478    /// Consumes a finished upload and queues sending of the final media event.
479    #[allow(clippy::too_many_arguments)]
480    pub(super) async fn handle_dependent_finish_upload(
481        &self,
482        client: &Client,
483        event_txn: OwnedTransactionId,
484        parent_key: SentRequestKey,
485        mut local_echo: RoomMessageEventContent,
486        file_upload_txn: OwnedTransactionId,
487        thumbnail_info: Option<FinishUploadThumbnailInfo>,
488        new_updates: &mut Vec<RoomSendQueueUpdate>,
489    ) -> Result<(), RoomSendQueueError> {
490        // Both uploads are ready: enqueue the event with its final data.
491        let sent_media = parent_key
492            .into_media()
493            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
494
495        update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
496            .await?;
497        update_media_event_after_upload(&mut local_echo, sent_media);
498
499        let new_content = SerializableEventContent::new(&local_echo.into())
500            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
501
502        // Indicates observers that the upload finished, by editing the local echo for
503        // the event into its final form before sending.
504        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
505            transaction_id: event_txn.clone(),
506            new_content: new_content.clone(),
507        });
508
509        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
510
511        client
512            .state_store()
513            .save_send_queue_request(
514                &self.room_id,
515                event_txn,
516                MilliSecondsSinceUnixEpoch::now(),
517                new_content.into(),
518                Self::HIGH_PRIORITY,
519            )
520            .await
521            .map_err(RoomSendQueueStorageError::StateStoreError)?;
522
523        Ok(())
524    }
525
526    /// Consumes a finished gallery upload and queues sending of the final
527    /// gallery event.
528    #[cfg(feature = "unstable-msc4274")]
529    #[allow(clippy::too_many_arguments)]
530    pub(super) async fn handle_dependent_finish_gallery_upload(
531        &self,
532        client: &Client,
533        event_txn: OwnedTransactionId,
534        parent_key: SentRequestKey,
535        mut local_echo: RoomMessageEventContent,
536        item_infos: Vec<FinishGalleryItemInfo>,
537        new_updates: &mut Vec<RoomSendQueueUpdate>,
538    ) -> Result<(), RoomSendQueueError> {
539        // All uploads are ready: enqueue the event with its final data.
540        let sent_gallery = parent_key
541            .into_media()
542            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
543
544        let mut sent_media_vec = sent_gallery.accumulated;
545        sent_media_vec.push(AccumulatedSentMediaInfo {
546            file: sent_gallery.file,
547            thumbnail: sent_gallery.thumbnail,
548        });
549
550        let mut sent_infos = HashMap::new();
551
552        for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
553            let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
554
555            // Store the sent media under the original cache key for later insertion into
556            // the local echo.
557            let from_req = Media::make_local_file_media_request(&file_upload_txn);
558            sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
559
560            update_media_cache_keys_after_upload(
561                client,
562                &file_upload_txn,
563                thumbnail_info,
564                &sent_media.into(),
565            )
566            .await?;
567        }
568
569        update_gallery_event_after_upload(&mut local_echo, sent_infos);
570
571        let new_content = SerializableEventContent::new(&local_echo.into())
572            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
573
574        // Indicates observers that the upload finished, by editing the local echo for
575        // the event into its final form before sending.
576        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
577            transaction_id: event_txn.clone(),
578            new_content: new_content.clone(),
579        });
580
581        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
582
583        client
584            .state_store()
585            .save_send_queue_request(
586                &self.room_id,
587                event_txn,
588                MilliSecondsSinceUnixEpoch::now(),
589                new_content.into(),
590                Self::HIGH_PRIORITY,
591            )
592            .await
593            .map_err(RoomSendQueueStorageError::StateStoreError)?;
594
595        Ok(())
596    }
597
598    /// Consumes a finished file or thumbnail upload and queues the dependent
599    /// file or thumbnail upload.
600    #[allow(clippy::too_many_arguments)]
601    pub(super) async fn handle_dependent_file_or_thumbnail_upload(
602        &self,
603        client: &Client,
604        next_upload_txn: OwnedTransactionId,
605        parent_key: SentRequestKey,
606        content_type: String,
607        cache_key: MediaRequestParameters,
608        event_txn: OwnedTransactionId,
609        parent_is_thumbnail_upload: bool,
610    ) -> Result<(), RoomSendQueueError> {
611        // The previous file or thumbnail has been sent, now transform the dependent
612        // file or thumbnail upload request into a ready one.
613        let sent_media = parent_key
614            .into_media()
615            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
616
617        // If the previous upload was a thumbnail, it shouldn't have
618        // a thumbnail itself.
619        if parent_is_thumbnail_upload {
620            debug_assert!(sent_media.thumbnail.is_none());
621            if sent_media.thumbnail.is_some() {
622                warn!("unexpected thumbnail for a thumbnail!");
623            }
624        }
625
626        trace!(
627            related_to = %event_txn,
628            "done uploading file or thumbnail, now queuing the dependent file \
629             or thumbnail upload request",
630        );
631
632        // If the parent request was a thumbnail upload, don't add it to the list of
633        // accumulated medias yet because its dependent file upload is still
634        // pending. If the parent request was a file upload, we know that both
635        // the file and its thumbnail (if any) have finished uploading and we
636        // can add them to the accumulated sent media.
637        #[cfg(feature = "unstable-msc4274")]
638        let accumulated = if parent_is_thumbnail_upload {
639            sent_media.accumulated
640        } else {
641            let mut accumulated = sent_media.accumulated;
642            accumulated.push(AccumulatedSentMediaInfo {
643                file: sent_media.file.clone(),
644                thumbnail: sent_media.thumbnail,
645            });
646            accumulated
647        };
648
649        let request = QueuedRequestKind::MediaUpload {
650            content_type,
651            cache_key,
652            // If the previous upload was a thumbnail, it becomes the thumbnail source for the next
653            // upload.
654            thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
655            related_to: event_txn,
656            #[cfg(feature = "unstable-msc4274")]
657            accumulated,
658        };
659
660        client
661            .state_store()
662            .save_send_queue_request(
663                &self.room_id,
664                next_upload_txn,
665                MilliSecondsSinceUnixEpoch::now(),
666                request,
667                Self::HIGH_PRIORITY,
668            )
669            .await
670            .map_err(RoomSendQueueStorageError::StateStoreError)?;
671
672        Ok(())
673    }
674
675    /// Try to abort an upload that would be ongoing.
676    ///
677    /// Return true if any media (media itself or its thumbnail) was being
678    /// uploaded. In this case, the media event has also been removed from
679    /// the send queue. If it returns false, then the uploads already
680    /// happened, and the event sending *may* have started.
681    #[instrument(skip(self, handles))]
682    pub(super) async fn abort_upload(
683        &self,
684        event_txn: &TransactionId,
685        handles: &MediaHandles,
686    ) -> Result<bool, RoomSendQueueStorageError> {
687        let mut guard = self.store.lock().await;
688        let client = guard.client()?;
689
690        // Keep the lock until we're done touching the storage.
691        debug!("trying to abort an upload");
692
693        let store = client.state_store();
694
695        let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
696        let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
697
698        let mut removed_dependent_upload = false;
699        let mut removed_dependent_event = false;
700
701        if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn
702            && store.remove_send_queue_request(&self.room_id, thumbnail_txn).await?
703        {
704            // The thumbnail upload existed as a request: either it was pending (something
705            // else was being sent), or it was actively being sent.
706            trace!("could remove thumbnail request, removing 2 dependent requests now");
707
708            // 1. Try to abort sending using the being_sent info, in case it was active.
709            if let Some(info) = guard.being_sent.as_ref()
710                && info.transaction_id == *thumbnail_txn
711            {
712                // SAFETY: we knew it was Some(), two lines above.
713                let info = guard.being_sent.take().unwrap();
714                if info.cancel_upload() {
715                    trace!("aborted ongoing thumbnail upload");
716                }
717            }
718
719            // 2. Remove the dependent requests.
720            removed_dependent_upload = store
721                .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
722                .await?;
723
724            if !removed_dependent_upload {
725                warn!("unable to find the dependent file upload request");
726            }
727
728            removed_dependent_event =
729                store.remove_dependent_queued_request(&self.room_id, &event_as_dependent).await?;
730
731            if !removed_dependent_event {
732                warn!("unable to find the dependent media event upload request");
733            }
734        }
735
736        // If we're here:
737        // - either there was no thumbnail to upload,
738        // - or the thumbnail request has terminated already.
739        //
740        // So the next target is the upload request itself, in both cases.
741
742        if !removed_dependent_upload {
743            if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
744                // The upload existed as a request: either it was pending (something else was
745                // being sent), or it was actively being sent.
746                trace!("could remove file upload request, removing 1 dependent request");
747
748                // 1. Try to abort sending using the being_sent info, in case it was active.
749                if let Some(info) = guard.being_sent.as_ref()
750                    && info.transaction_id == handles.upload_file_txn
751                {
752                    // SAFETY: we knew it was Some(), two lines above.
753                    let info = guard.being_sent.take().unwrap();
754                    if info.cancel_upload() {
755                        trace!("aborted ongoing file upload");
756                    }
757                }
758
759                // 2. Remove the dependent request.
760                if !store
761                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
762                    .await?
763                {
764                    warn!("unable to find the dependent media event upload request");
765                }
766            } else {
767                // The upload was not in the send queue, so it's completed.
768                //
769                // It means the event sending is either still queued as a dependent request, or
770                // it's graduated into a request.
771                if !removed_dependent_event
772                    && !store
773                        .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
774                        .await?
775                {
776                    // The media event has been promoted into a request, or the promoted request
777                    // has been sent already: we couldn't abort, let the caller decide what to do.
778                    debug!("uploads already happened => deferring to aborting an event sending");
779                    return Ok(false);
780                }
781            }
782        }
783
784        // At this point, all the requests and dependent requests have been cleaned up.
785        // Perform the final step: empty the cache from the local items.
786        {
787            let media_store = client.media_store().lock().await?;
788            media_store
789                .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
790                .await?;
791            if let Some(txn) = &handles.upload_thumbnail_txn {
792                media_store.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
793            }
794        }
795
796        debug!("successfully aborted!");
797        Ok(true)
798    }
799
800    #[instrument(skip(self, caption, formatted_caption))]
801    pub(super) async fn edit_media_caption(
802        &self,
803        txn: &TransactionId,
804        caption: Option<String>,
805        formatted_caption: Option<FormattedBody>,
806        mentions: Option<Mentions>,
807    ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
808        // This error will be popular here.
809        use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
810
811        let guard = self.store.lock().await;
812        let client = guard.client()?;
813        let store = client.state_store();
814
815        // The media event can be in one of three states:
816        // - still stored as a dependent request,
817        // - stored as a queued request, active (aka it's being sent).
818        // - stored as a queued request, not active yet (aka it's not being sent yet),
819        //
820        // We'll handle each of these cases one by one.
821
822        {
823            // If the event can be found as a dependent event, update the captions, save it
824            // back into the database, and return early.
825            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
826
827            if let Some(found) =
828                dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
829            {
830                trace!("found the caption to edit in a dependent request");
831
832                let DependentQueuedRequestKind::FinishUpload {
833                    mut local_echo,
834                    file_upload,
835                    thumbnail_info,
836                } = found.kind
837                else {
838                    return Err(InvalidMediaCaptionEdit);
839                };
840
841                if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
842                    return Err(InvalidMediaCaptionEdit);
843                }
844
845                let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
846                    local_echo: local_echo.clone(),
847                    file_upload,
848                    thumbnail_info,
849                };
850                store
851                    .update_dependent_queued_request(
852                        &self.room_id,
853                        &found.own_transaction_id,
854                        new_dependent_request,
855                    )
856                    .await?;
857
858                trace!("caption successfully updated");
859                return Ok(Some((*local_echo).into()));
860            }
861        }
862
863        let requests = store.load_send_queue_requests(&self.room_id).await?;
864        let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
865            // Couldn't be found anymore, it's not possible to update captions.
866            return Ok(None);
867        };
868
869        trace!("found the caption to edit as a request");
870
871        let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
872            return Err(InvalidMediaCaptionEdit);
873        };
874
875        let deserialized = serialized_content.deserialize()?;
876        let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
877            return Err(InvalidMediaCaptionEdit);
878        };
879
880        if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
881            return Err(InvalidMediaCaptionEdit);
882        }
883
884        let any_content: AnyMessageLikeEventContent = content.into();
885        let new_serialized = SerializableEventContent::new(&any_content.clone())?;
886
887        // If the request is active (being sent), send a dependent request.
888        if let Some(being_sent) = guard.being_sent.as_ref()
889            && being_sent.transaction_id == *txn
890        {
891            // Record a dependent request to edit, and exit.
892            store
893                .save_dependent_queued_request(
894                    &self.room_id,
895                    txn,
896                    ChildTransactionId::new(),
897                    MilliSecondsSinceUnixEpoch::now(),
898                    DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
899                )
900                .await?;
901
902            trace!("media event was being sent, pushed a dependent edit");
903            return Ok(Some(any_content));
904        }
905
906        // The request is not active: edit the local echo.
907        store
908            .update_send_queue_request(
909                &self.room_id,
910                txn,
911                QueuedRequestKind::Event { content: new_serialized },
912            )
913            .await?;
914
915        trace!("media event was not being sent, updated local echo");
916        Ok(Some(any_content))
917    }
918}
919
920/// Update cache keys in the cache store after uploading a media file /
921/// thumbnail.
922async fn update_media_cache_keys_after_upload(
923    client: &Client,
924    file_upload_txn: &OwnedTransactionId,
925    thumbnail_info: Option<FinishUploadThumbnailInfo>,
926    sent_media: &SentMediaInfo,
927) -> Result<(), RoomSendQueueError> {
928    // Do it for the file itself.
929    let from_req = Media::make_local_file_media_request(file_upload_txn);
930
931    trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
932    let media_store =
933        client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
934
935    // The media can now be removed during cleanups.
936    media_store
937        .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
938        .await
939        .map_err(RoomSendQueueStorageError::MediaStoreError)?;
940
941    media_store
942        .replace_media_key(
943            &from_req,
944            &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
945        )
946        .await
947        .map_err(RoomSendQueueStorageError::MediaStoreError)?;
948
949    // Rename the thumbnail too, if needs be.
950    if let Some((info, new_source)) = thumbnail_info.as_ref().zip(sent_media.thumbnail.clone()) {
951        // Previously the media request used `MediaFormat::Thumbnail`. Handle this case
952        // for send queue requests that were in the state store before the change.
953        let from_req = if let Some((height, width)) = info.height.zip(info.width) {
954            Media::make_local_thumbnail_media_request(&info.txn, height, width)
955        } else {
956            Media::make_local_file_media_request(&info.txn)
957        };
958
959        trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
960
961        // The media can now be removed during cleanups.
962        media_store
963            .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
964            .await
965            .map_err(RoomSendQueueStorageError::MediaStoreError)?;
966
967        media_store
968            .replace_media_key(
969                &from_req,
970                &MediaRequestParameters { source: new_source, format: MediaFormat::File },
971            )
972            .await
973            .map_err(RoomSendQueueStorageError::MediaStoreError)?;
974    }
975
976    Ok(())
977}