matrix_sdk/send_queue/
upload.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Private implementations of the media upload mechanism.
16
17#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21    RoomState,
22    media::{MediaFormat, MediaRequestParameters, store::IgnoreMediaRetentionPolicy},
23    store::{
24        ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
25        QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
26    },
27};
28#[cfg(feature = "unstable-msc4274")]
29use matrix_sdk_base::{
30    media::UniqueKey,
31    store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
32};
33use mime::Mime;
34#[cfg(feature = "unstable-msc4274")]
35use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
36use ruma::{
37    MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
38    events::{
39        AnyMessageLikeEventContent, Mentions,
40        room::{
41            MediaSource, ThumbnailInfo,
42            message::{FormattedBody, MessageType, RoomMessageEventContent},
43        },
44    },
45};
46use tracing::{Span, debug, error, instrument, trace, warn};
47
48use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError};
49use crate::{
50    Client, Media, Room,
51    attachment::{AttachmentConfig, Thumbnail},
52    room::edit::update_media_caption,
53    send_queue::{
54        LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
55        SendHandle,
56    },
57};
58#[cfg(feature = "unstable-msc4274")]
59use crate::{
60    attachment::{GalleryConfig, GalleryItemInfo},
61    send_queue::GalleryItemQueueInfo,
62};
63
64/// Replace the source by the final ones in all the media types handled by
65/// [`Room::make_attachment_type()`].
66fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
67    // Some variants look really similar below, but the `event` and `info` are all
68    // different types…
69    match &mut echo.msgtype {
70        MessageType::Audio(event) => {
71            event.source = sent.file;
72        }
73        MessageType::File(event) => {
74            event.source = sent.file;
75            if let Some(info) = event.info.as_mut() {
76                info.thumbnail_source = sent.thumbnail;
77            }
78        }
79        MessageType::Image(event) => {
80            event.source = sent.file;
81            if let Some(info) = event.info.as_mut() {
82                info.thumbnail_source = sent.thumbnail;
83            }
84        }
85        MessageType::Video(event) => {
86            event.source = sent.file;
87            if let Some(info) = event.info.as_mut() {
88                info.thumbnail_source = sent.thumbnail;
89            }
90        }
91
92        _ => {
93            // All `MessageType` created by `Room::make_attachment_type` should be
94            // handled here. The only way to end up here is that a message type has
95            // been tampered with in the database.
96            error!("Invalid message type in database: {}", echo.msgtype());
97            // Only crash debug builds.
98            debug_assert!(false, "invalid message type in database");
99        }
100    }
101}
102
103/// Replace the sources by the final ones in all the media types handled by
104/// [`Room::make_gallery_item_type()`].
105#[cfg(feature = "unstable-msc4274")]
106fn update_gallery_event_after_upload(
107    echo: &mut RoomMessageEventContent,
108    sent: HashMap<String, AccumulatedSentMediaInfo>,
109) {
110    let MessageType::Gallery(gallery) = &mut echo.msgtype else {
111        // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
112        // handled here. The only way to end up here is that a item type has
113        // been tampered with in the database.
114        error!("Invalid gallery item types in database");
115        // Only crash debug builds.
116        debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
117        return;
118    };
119
120    // Some variants look really similar below, but the `event` and `info` are all
121    // different types…
122    for itemtype in gallery.itemtypes.iter_mut() {
123        match itemtype {
124            GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
125                Some(sent) => event.source = sent.file.clone(),
126                None => error!("key for item {:?} does not exist on gallery event", event.source),
127            },
128            GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
129                Some(sent) => {
130                    event.source = sent.file.clone();
131                    if let Some(info) = event.info.as_mut() {
132                        info.thumbnail_source = sent.thumbnail.clone();
133                    }
134                }
135                None => error!("key for item {:?} does not exist on gallery event", event.source),
136            },
137            GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
138                Some(sent) => {
139                    event.source = sent.file.clone();
140                    if let Some(info) = event.info.as_mut() {
141                        info.thumbnail_source = sent.thumbnail.clone();
142                    }
143                }
144                None => error!("key for item {:?} does not exist on gallery event", event.source),
145            },
146            GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
147                Some(sent) => {
148                    event.source = sent.file.clone();
149                    if let Some(info) = event.info.as_mut() {
150                        info.thumbnail_source = sent.thumbnail.clone();
151                    }
152                }
153                None => error!("key for item {:?} does not exist on gallery event", event.source),
154            },
155
156            _ => {
157                // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
158                // handled here. The only way to end up here is that a item type has
159                // been tampered with in the database.
160                error!("Invalid gallery item types in database");
161                // Only crash debug builds.
162                debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
163            }
164        }
165    }
166}
167
168#[derive(Default)]
169struct MediaCacheResult {
170    upload_thumbnail_txn: Option<OwnedTransactionId>,
171    event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
172    queue_thumbnail_info: Option<QueueThumbnailInfo>,
173}
174
175impl RoomSendQueue {
176    /// Queues an attachment to be sent to the room, using the send queue.
177    ///
178    /// This returns quickly (without sending or uploading anything), and will
179    /// push the event to be sent into a queue, handled in the background.
180    ///
181    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
182    /// the [`Self::subscribe()`] method to get updates about the sending of
183    /// that event.
184    ///
185    /// By default, if sending failed on the first attempt, it will be retried a
186    /// few times. If sending failed after those retries, the entire
187    /// client's sending queue will be disabled, and it will need to be
188    /// manually re-enabled by the caller (e.g. after network is back, or when
189    /// something has been done about the faulty requests).
190    ///
191    /// The attachment and its optional thumbnail are stored in the media cache
192    /// and can be retrieved at any time, by calling
193    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
194    /// in the local or remote echo, and using a `MediaFormat::File`.
195    #[instrument(skip_all, fields(event_txn))]
196    pub async fn send_attachment(
197        &self,
198        filename: impl Into<String>,
199        content_type: Mime,
200        data: Vec<u8>,
201        mut config: AttachmentConfig,
202    ) -> Result<SendHandle, RoomSendQueueError> {
203        let Some(room) = self.inner.room.get() else {
204            return Err(RoomSendQueueError::RoomDisappeared);
205        };
206
207        if room.state() != RoomState::Joined {
208            return Err(RoomSendQueueError::RoomNotJoined);
209        }
210
211        let filename = filename.into();
212        let upload_file_txn = TransactionId::new();
213        let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
214
215        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
216        debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
217
218        let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
219
220        let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
221            RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
222                .await?;
223
224        // Create the content for the media event.
225        let event_content = room
226            .make_media_event(
227                Room::make_attachment_type(
228                    &content_type,
229                    filename,
230                    file_media_request.source.clone(),
231                    config.caption,
232                    config.formatted_caption,
233                    config.info,
234                    event_thumbnail_info,
235                ),
236                config.mentions,
237                config.reply,
238            )
239            .await
240            .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
241
242        let created_at = MilliSecondsSinceUnixEpoch::now();
243
244        // Save requests in the queue storage.
245        self.inner
246            .queue
247            .push_media(
248                event_content.clone(),
249                content_type,
250                send_event_txn.clone().into(),
251                created_at,
252                upload_file_txn.clone(),
253                file_media_request,
254                queue_thumbnail_info,
255            )
256            .await?;
257
258        trace!("manager sends a media to the background task");
259
260        self.inner.notifier.notify_one();
261
262        let send_handle = SendHandle {
263            room: self.clone(),
264            transaction_id: send_event_txn.clone().into(),
265            media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
266            created_at,
267        };
268
269        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
270            transaction_id: send_event_txn.clone().into(),
271            content: LocalEchoContent::Event {
272                serialized_event: SerializableEventContent::new(&event_content.into())
273                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
274                send_handle: send_handle.clone(),
275                send_error: None,
276            },
277        }));
278
279        Ok(send_handle)
280    }
281
282    /// Queues a gallery to be sent to the room, using the send queue.
283    ///
284    /// This returns quickly (without sending or uploading anything), and will
285    /// push the event to be sent into a queue, handled in the background.
286    ///
287    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
288    /// the [`Self::subscribe()`] method to get updates about the sending of
289    /// that event.
290    ///
291    /// By default, if sending failed on the first attempt, it will be retried a
292    /// few times. If sending failed after those retries, the entire
293    /// client's sending queue will be disabled, and it will need to be
294    /// manually re-enabled by the caller (e.g. after network is back, or when
295    /// something has been done about the faulty requests).
296    ///
297    /// The attachments and their optional thumbnails are stored in the media
298    /// cache and can be retrieved at any time, by calling
299    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
300    /// in the local or remote echo, and using a `MediaFormat::File`.
301    #[cfg(feature = "unstable-msc4274")]
302    #[instrument(skip_all, fields(event_txn))]
303    pub async fn send_gallery(
304        &self,
305        gallery: GalleryConfig,
306    ) -> Result<SendHandle, RoomSendQueueError> {
307        let Some(room) = self.inner.room.get() else {
308            return Err(RoomSendQueueError::RoomDisappeared);
309        };
310
311        if room.state() != RoomState::Joined {
312            return Err(RoomSendQueueError::RoomNotJoined);
313        }
314
315        if gallery.is_empty() {
316            return Err(RoomSendQueueError::EmptyGallery);
317        }
318
319        let send_event_txn =
320            gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
321
322        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
323
324        let mut item_types = Vec::with_capacity(gallery.len());
325        let mut item_queue_infos = Vec::with_capacity(gallery.len());
326        let mut media_handles = Vec::with_capacity(gallery.len());
327
328        for item_info in gallery.items {
329            let GalleryItemInfo { filename, content_type, data, .. } = item_info;
330
331            let upload_file_txn = TransactionId::new();
332
333            debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
334
335            let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
336
337            let MediaCacheResult {
338                upload_thumbnail_txn,
339                event_thumbnail_info,
340                queue_thumbnail_info,
341            } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
342                .await?;
343
344            item_types.push(Room::make_gallery_item_type(
345                &content_type,
346                filename,
347                file_media_request.source.clone(),
348                item_info.caption,
349                item_info.formatted_caption,
350                Some(item_info.attachment_info),
351                event_thumbnail_info,
352            ));
353
354            item_queue_infos.push(GalleryItemQueueInfo {
355                content_type,
356                upload_file_txn: upload_file_txn.clone(),
357                file_media_request,
358                thumbnail: queue_thumbnail_info,
359            });
360
361            media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
362        }
363
364        // Create the content for the gallery event.
365        let event_content = room
366            .make_media_event(
367                MessageType::Gallery(GalleryMessageEventContent::new(
368                    gallery.caption.unwrap_or_default(),
369                    gallery.formatted_caption,
370                    item_types,
371                )),
372                gallery.mentions,
373                gallery.reply,
374            )
375            .await
376            .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
377
378        let created_at = MilliSecondsSinceUnixEpoch::now();
379
380        // Save requests in the queue storage.
381        self.inner
382            .queue
383            .push_gallery(
384                event_content.clone(),
385                send_event_txn.clone().into(),
386                created_at,
387                item_queue_infos,
388            )
389            .await?;
390
391        trace!("manager sends a gallery to the background task");
392
393        self.inner.notifier.notify_one();
394
395        let send_handle = SendHandle {
396            room: self.clone(),
397            transaction_id: send_event_txn.clone().into(),
398            media_handles,
399            created_at,
400        };
401
402        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
403            transaction_id: send_event_txn.clone().into(),
404            content: LocalEchoContent::Event {
405                serialized_event: SerializableEventContent::new(&event_content.into())
406                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
407                send_handle: send_handle.clone(),
408                send_error: None,
409            },
410        }));
411
412        Ok(send_handle)
413    }
414
415    async fn cache_media(
416        room: &Room,
417        data: Vec<u8>,
418        thumbnail: Option<Thumbnail>,
419        file_media_request: &MediaRequestParameters,
420    ) -> Result<MediaCacheResult, RoomSendQueueError> {
421        let client = room.client();
422        let media_store =
423            client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
424
425        // Cache the file itself in the cache store.
426        media_store
427            .add_media_content(
428                file_media_request,
429                data,
430                // Make sure that the file is stored until it has been uploaded.
431                IgnoreMediaRetentionPolicy::Yes,
432            )
433            .await
434            .map_err(RoomSendQueueStorageError::MediaStoreError)?;
435
436        // Process the thumbnail, if it's been provided.
437        if let Some(thumbnail) = thumbnail {
438            let txn = TransactionId::new();
439            trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
440
441            // Create the information required for filling the thumbnail section of the
442            // event.
443            let (data, content_type, thumbnail_info) = thumbnail.into_parts();
444            let file_size = data.len();
445
446            // Cache thumbnail in the cache store.
447            let thumbnail_media_request = Media::make_local_file_media_request(&txn);
448            media_store
449                .add_media_content(
450                    &thumbnail_media_request,
451                    data,
452                    // Make sure that the thumbnail is stored until it has been uploaded.
453                    IgnoreMediaRetentionPolicy::Yes,
454                )
455                .await
456                .map_err(RoomSendQueueStorageError::MediaStoreError)?;
457
458            Ok(MediaCacheResult {
459                upload_thumbnail_txn: Some(txn.clone()),
460                event_thumbnail_info: Some((
461                    thumbnail_media_request.source.clone(),
462                    thumbnail_info,
463                )),
464                queue_thumbnail_info: Some(QueueThumbnailInfo {
465                    finish_upload_thumbnail_info: FinishUploadThumbnailInfo {
466                        txn,
467                        width: None,
468                        height: None,
469                    },
470                    media_request_parameters: thumbnail_media_request,
471                    content_type,
472                    file_size,
473                }),
474            })
475        } else {
476            Ok(Default::default())
477        }
478    }
479}
480
481impl QueueStorage {
482    /// Consumes a finished upload and queues sending of the final media event.
483    #[allow(clippy::too_many_arguments)]
484    pub(super) async fn handle_dependent_finish_upload(
485        &self,
486        client: &Client,
487        event_txn: OwnedTransactionId,
488        parent_key: SentRequestKey,
489        mut local_echo: RoomMessageEventContent,
490        file_upload_txn: OwnedTransactionId,
491        thumbnail_info: Option<FinishUploadThumbnailInfo>,
492        new_updates: &mut Vec<RoomSendQueueUpdate>,
493    ) -> Result<(), RoomSendQueueError> {
494        // Both uploads are ready: enqueue the event with its final data.
495        let sent_media = parent_key
496            .into_media()
497            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
498
499        update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
500            .await?;
501        update_media_event_after_upload(&mut local_echo, sent_media);
502
503        let new_content = SerializableEventContent::new(&local_echo.into())
504            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
505
506        // Indicates observers that the upload finished, by editing the local echo for
507        // the event into its final form before sending.
508        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
509            transaction_id: event_txn.clone(),
510            new_content: new_content.clone(),
511        });
512
513        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
514
515        client
516            .state_store()
517            .save_send_queue_request(
518                &self.room_id,
519                event_txn,
520                MilliSecondsSinceUnixEpoch::now(),
521                new_content.into(),
522                Self::HIGH_PRIORITY,
523            )
524            .await
525            .map_err(RoomSendQueueStorageError::StateStoreError)?;
526
527        Ok(())
528    }
529
530    /// Consumes a finished gallery upload and queues sending of the final
531    /// gallery event.
532    #[cfg(feature = "unstable-msc4274")]
533    #[allow(clippy::too_many_arguments)]
534    pub(super) async fn handle_dependent_finish_gallery_upload(
535        &self,
536        client: &Client,
537        event_txn: OwnedTransactionId,
538        parent_key: SentRequestKey,
539        mut local_echo: RoomMessageEventContent,
540        item_infos: Vec<FinishGalleryItemInfo>,
541        new_updates: &mut Vec<RoomSendQueueUpdate>,
542    ) -> Result<(), RoomSendQueueError> {
543        // All uploads are ready: enqueue the event with its final data.
544        let sent_gallery = parent_key
545            .into_media()
546            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
547
548        let mut sent_media_vec = sent_gallery.accumulated;
549        sent_media_vec.push(AccumulatedSentMediaInfo {
550            file: sent_gallery.file,
551            thumbnail: sent_gallery.thumbnail,
552        });
553
554        let mut sent_infos = HashMap::new();
555
556        for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
557            let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
558
559            // Store the sent media under the original cache key for later insertion into
560            // the local echo.
561            let from_req = Media::make_local_file_media_request(&file_upload_txn);
562            sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
563
564            update_media_cache_keys_after_upload(
565                client,
566                &file_upload_txn,
567                thumbnail_info,
568                &sent_media.into(),
569            )
570            .await?;
571        }
572
573        update_gallery_event_after_upload(&mut local_echo, sent_infos);
574
575        let new_content = SerializableEventContent::new(&local_echo.into())
576            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
577
578        // Indicates observers that the upload finished, by editing the local echo for
579        // the event into its final form before sending.
580        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
581            transaction_id: event_txn.clone(),
582            new_content: new_content.clone(),
583        });
584
585        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
586
587        client
588            .state_store()
589            .save_send_queue_request(
590                &self.room_id,
591                event_txn,
592                MilliSecondsSinceUnixEpoch::now(),
593                new_content.into(),
594                Self::HIGH_PRIORITY,
595            )
596            .await
597            .map_err(RoomSendQueueStorageError::StateStoreError)?;
598
599        Ok(())
600    }
601
602    /// Consumes a finished file or thumbnail upload and queues the dependent
603    /// file or thumbnail upload.
604    #[allow(clippy::too_many_arguments)]
605    pub(super) async fn handle_dependent_file_or_thumbnail_upload(
606        &self,
607        client: &Client,
608        next_upload_txn: OwnedTransactionId,
609        parent_key: SentRequestKey,
610        content_type: String,
611        cache_key: MediaRequestParameters,
612        event_txn: OwnedTransactionId,
613        parent_is_thumbnail_upload: bool,
614    ) -> Result<(), RoomSendQueueError> {
615        // The previous file or thumbnail has been sent, now transform the dependent
616        // file or thumbnail upload request into a ready one.
617        let sent_media = parent_key
618            .into_media()
619            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
620
621        // If the previous upload was a thumbnail, it shouldn't have
622        // a thumbnail itself.
623        if parent_is_thumbnail_upload {
624            debug_assert!(sent_media.thumbnail.is_none());
625            if sent_media.thumbnail.is_some() {
626                warn!("unexpected thumbnail for a thumbnail!");
627            }
628        }
629
630        trace!(
631            related_to = %event_txn,
632            "done uploading file or thumbnail, now queuing the dependent file \
633             or thumbnail upload request",
634        );
635
636        // If the parent request was a thumbnail upload, don't add it to the list of
637        // accumulated medias yet because its dependent file upload is still
638        // pending. If the parent request was a file upload, we know that both
639        // the file and its thumbnail (if any) have finished uploading and we
640        // can add them to the accumulated sent media.
641        #[cfg(feature = "unstable-msc4274")]
642        let accumulated = if parent_is_thumbnail_upload {
643            sent_media.accumulated
644        } else {
645            let mut accumulated = sent_media.accumulated;
646            accumulated.push(AccumulatedSentMediaInfo {
647                file: sent_media.file.clone(),
648                thumbnail: sent_media.thumbnail,
649            });
650            accumulated
651        };
652
653        let request = QueuedRequestKind::MediaUpload {
654            content_type,
655            cache_key,
656            // If the previous upload was a thumbnail, it becomes the thumbnail source for the next
657            // upload.
658            thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
659            related_to: event_txn,
660            #[cfg(feature = "unstable-msc4274")]
661            accumulated,
662        };
663
664        client
665            .state_store()
666            .save_send_queue_request(
667                &self.room_id,
668                next_upload_txn,
669                MilliSecondsSinceUnixEpoch::now(),
670                request,
671                Self::HIGH_PRIORITY,
672            )
673            .await
674            .map_err(RoomSendQueueStorageError::StateStoreError)?;
675
676        Ok(())
677    }
678
679    /// Try to abort an upload that would be ongoing.
680    ///
681    /// Return true if any media (media itself or its thumbnail) was being
682    /// uploaded. In this case, the media event has also been removed from
683    /// the send queue. If it returns false, then the uploads already
684    /// happened, and the event sending *may* have started.
685    #[instrument(skip(self, handles))]
686    pub(super) async fn abort_upload(
687        &self,
688        event_txn: &TransactionId,
689        handles: &MediaHandles,
690    ) -> Result<bool, RoomSendQueueStorageError> {
691        let mut guard = self.store.lock().await;
692        let client = guard.client()?;
693
694        // Keep the lock until we're done touching the storage.
695        debug!("trying to abort an upload");
696
697        let store = client.state_store();
698
699        let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
700        let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
701
702        let mut removed_dependent_upload = false;
703        let mut removed_dependent_event = false;
704
705        if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn
706            && store.remove_send_queue_request(&self.room_id, thumbnail_txn).await?
707        {
708            // The thumbnail upload existed as a request: either it was pending (something
709            // else was being sent), or it was actively being sent.
710            trace!("could remove thumbnail request, removing 2 dependent requests now");
711
712            // 1. Try to abort sending using the being_sent info, in case it was active.
713            if let Some(info) = guard.being_sent.as_ref()
714                && info.transaction_id == *thumbnail_txn
715            {
716                // SAFETY: we knew it was Some(), two lines above.
717                let info = guard.being_sent.take().unwrap();
718                if info.cancel_upload() {
719                    trace!("aborted ongoing thumbnail upload");
720                }
721            }
722
723            // 2. Remove the dependent requests.
724            removed_dependent_upload = store
725                .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
726                .await?;
727
728            if !removed_dependent_upload {
729                warn!("unable to find the dependent file upload request");
730            }
731
732            removed_dependent_event =
733                store.remove_dependent_queued_request(&self.room_id, &event_as_dependent).await?;
734
735            if !removed_dependent_event {
736                warn!("unable to find the dependent media event upload request");
737            }
738        }
739
740        // If we're here:
741        // - either there was no thumbnail to upload,
742        // - or the thumbnail request has terminated already.
743        //
744        // So the next target is the upload request itself, in both cases.
745
746        if !removed_dependent_upload {
747            if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
748                // The upload existed as a request: either it was pending (something else was
749                // being sent), or it was actively being sent.
750                trace!("could remove file upload request, removing 1 dependent request");
751
752                // 1. Try to abort sending using the being_sent info, in case it was active.
753                if let Some(info) = guard.being_sent.as_ref()
754                    && info.transaction_id == handles.upload_file_txn
755                {
756                    // SAFETY: we knew it was Some(), two lines above.
757                    let info = guard.being_sent.take().unwrap();
758                    if info.cancel_upload() {
759                        trace!("aborted ongoing file upload");
760                    }
761                }
762
763                // 2. Remove the dependent request.
764                if !store
765                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
766                    .await?
767                {
768                    warn!("unable to find the dependent media event upload request");
769                }
770            } else {
771                // The upload was not in the send queue, so it's completed.
772                //
773                // It means the event sending is either still queued as a dependent request, or
774                // it's graduated into a request.
775                if !removed_dependent_event
776                    && !store
777                        .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
778                        .await?
779                {
780                    // The media event has been promoted into a request, or the promoted request
781                    // has been sent already: we couldn't abort, let the caller decide what to do.
782                    debug!("uploads already happened => deferring to aborting an event sending");
783                    return Ok(false);
784                }
785            }
786        }
787
788        // At this point, all the requests and dependent requests have been cleaned up.
789        // Perform the final step: empty the cache from the local items.
790        {
791            let media_store = client.media_store().lock().await?;
792            media_store
793                .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
794                .await?;
795            if let Some(txn) = &handles.upload_thumbnail_txn {
796                media_store.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
797            }
798        }
799
800        debug!("successfully aborted!");
801        Ok(true)
802    }
803
804    #[instrument(skip(self, caption, formatted_caption))]
805    pub(super) async fn edit_media_caption(
806        &self,
807        txn: &TransactionId,
808        caption: Option<String>,
809        formatted_caption: Option<FormattedBody>,
810        mentions: Option<Mentions>,
811    ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
812        // This error will be popular here.
813        use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
814
815        let guard = self.store.lock().await;
816        let client = guard.client()?;
817        let store = client.state_store();
818
819        // The media event can be in one of three states:
820        // - still stored as a dependent request,
821        // - stored as a queued request, active (aka it's being sent).
822        // - stored as a queued request, not active yet (aka it's not being sent yet),
823        //
824        // We'll handle each of these cases one by one.
825
826        {
827            // If the event can be found as a dependent event, update the captions, save it
828            // back into the database, and return early.
829            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
830
831            if let Some(found) =
832                dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
833            {
834                trace!("found the caption to edit in a dependent request");
835
836                let DependentQueuedRequestKind::FinishUpload {
837                    mut local_echo,
838                    file_upload,
839                    thumbnail_info,
840                } = found.kind
841                else {
842                    return Err(InvalidMediaCaptionEdit);
843                };
844
845                if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
846                    return Err(InvalidMediaCaptionEdit);
847                }
848
849                let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
850                    local_echo: local_echo.clone(),
851                    file_upload,
852                    thumbnail_info,
853                };
854                store
855                    .update_dependent_queued_request(
856                        &self.room_id,
857                        &found.own_transaction_id,
858                        new_dependent_request,
859                    )
860                    .await?;
861
862                trace!("caption successfully updated");
863                return Ok(Some((*local_echo).into()));
864            }
865        }
866
867        let requests = store.load_send_queue_requests(&self.room_id).await?;
868        let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
869            // Couldn't be found anymore, it's not possible to update captions.
870            return Ok(None);
871        };
872
873        trace!("found the caption to edit as a request");
874
875        let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
876            return Err(InvalidMediaCaptionEdit);
877        };
878
879        let deserialized = serialized_content.deserialize()?;
880        let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
881            return Err(InvalidMediaCaptionEdit);
882        };
883
884        if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
885            return Err(InvalidMediaCaptionEdit);
886        }
887
888        let any_content: AnyMessageLikeEventContent = content.into();
889        let new_serialized = SerializableEventContent::new(&any_content.clone())?;
890
891        // If the request is active (being sent), send a dependent request.
892        if let Some(being_sent) = guard.being_sent.as_ref()
893            && being_sent.transaction_id == *txn
894        {
895            // Record a dependent request to edit, and exit.
896            store
897                .save_dependent_queued_request(
898                    &self.room_id,
899                    txn,
900                    ChildTransactionId::new(),
901                    MilliSecondsSinceUnixEpoch::now(),
902                    DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
903                )
904                .await?;
905
906            trace!("media event was being sent, pushed a dependent edit");
907            return Ok(Some(any_content));
908        }
909
910        // The request is not active: edit the local echo.
911        store
912            .update_send_queue_request(
913                &self.room_id,
914                txn,
915                QueuedRequestKind::Event { content: new_serialized },
916            )
917            .await?;
918
919        trace!("media event was not being sent, updated local echo");
920        Ok(Some(any_content))
921    }
922}
923
924/// Update cache keys in the cache store after uploading a media file /
925/// thumbnail.
926async fn update_media_cache_keys_after_upload(
927    client: &Client,
928    file_upload_txn: &OwnedTransactionId,
929    thumbnail_info: Option<FinishUploadThumbnailInfo>,
930    sent_media: &SentMediaInfo,
931) -> Result<(), RoomSendQueueError> {
932    // Do it for the file itself.
933    let from_req = Media::make_local_file_media_request(file_upload_txn);
934
935    trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
936    let media_store =
937        client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
938
939    // The media can now be removed during cleanups.
940    media_store
941        .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
942        .await
943        .map_err(RoomSendQueueStorageError::MediaStoreError)?;
944
945    media_store
946        .replace_media_key(
947            &from_req,
948            &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
949        )
950        .await
951        .map_err(RoomSendQueueStorageError::MediaStoreError)?;
952
953    // Rename the thumbnail too, if needs be.
954    if let Some((info, new_source)) = thumbnail_info.as_ref().zip(sent_media.thumbnail.clone()) {
955        // Previously the media request used `MediaFormat::Thumbnail`. Handle this case
956        // for send queue requests that were in the state store before the change.
957        let from_req = if let Some((height, width)) = info.height.zip(info.width) {
958            Media::make_local_thumbnail_media_request(&info.txn, height, width)
959        } else {
960            Media::make_local_file_media_request(&info.txn)
961        };
962
963        trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
964
965        // The media can now be removed during cleanups.
966        media_store
967            .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
968            .await
969            .map_err(RoomSendQueueStorageError::MediaStoreError)?;
970
971        media_store
972            .replace_media_key(
973                &from_req,
974                &MediaRequestParameters { source: new_source, format: MediaFormat::File },
975            )
976            .await
977            .map_err(RoomSendQueueStorageError::MediaStoreError)?;
978    }
979
980    Ok(())
981}