matrix_sdk/send_queue/
upload.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Private implementations of the media upload mechanism.
16
17#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21    RoomState,
22    media::{
23        MediaFormat, MediaRequestParameters, MediaThumbnailSettings,
24        store::IgnoreMediaRetentionPolicy,
25    },
26    store::{
27        ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
28        QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
29    },
30};
31#[cfg(feature = "unstable-msc4274")]
32use matrix_sdk_base::{
33    media::UniqueKey,
34    store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
35};
36use mime::Mime;
37#[cfg(feature = "unstable-msc4274")]
38use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
39use ruma::{
40    MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
41    events::{
42        AnyMessageLikeEventContent, Mentions,
43        room::{
44            MediaSource, ThumbnailInfo,
45            message::{FormattedBody, MessageType, RoomMessageEventContent},
46        },
47    },
48};
49use tracing::{Span, debug, error, instrument, trace, warn};
50
51use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError};
52use crate::{
53    Client, Media, Room,
54    attachment::{AttachmentConfig, Thumbnail},
55    room::edit::update_media_caption,
56    send_queue::{
57        LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
58        SendHandle,
59    },
60};
61#[cfg(feature = "unstable-msc4274")]
62use crate::{
63    attachment::{GalleryConfig, GalleryItemInfo},
64    send_queue::GalleryItemQueueInfo,
65};
66
67/// Replace the source by the final ones in all the media types handled by
68/// [`Room::make_attachment_type()`].
69fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
70    // Some variants look really similar below, but the `event` and `info` are all
71    // different types…
72    match &mut echo.msgtype {
73        MessageType::Audio(event) => {
74            event.source = sent.file;
75        }
76        MessageType::File(event) => {
77            event.source = sent.file;
78            if let Some(info) = event.info.as_mut() {
79                info.thumbnail_source = sent.thumbnail;
80            }
81        }
82        MessageType::Image(event) => {
83            event.source = sent.file;
84            if let Some(info) = event.info.as_mut() {
85                info.thumbnail_source = sent.thumbnail;
86            }
87        }
88        MessageType::Video(event) => {
89            event.source = sent.file;
90            if let Some(info) = event.info.as_mut() {
91                info.thumbnail_source = sent.thumbnail;
92            }
93        }
94
95        _ => {
96            // All `MessageType` created by `Room::make_attachment_type` should be
97            // handled here. The only way to end up here is that a message type has
98            // been tampered with in the database.
99            error!("Invalid message type in database: {}", echo.msgtype());
100            // Only crash debug builds.
101            debug_assert!(false, "invalid message type in database");
102        }
103    }
104}
105
106/// Replace the sources by the final ones in all the media types handled by
107/// [`Room::make_gallery_item_type()`].
108#[cfg(feature = "unstable-msc4274")]
109fn update_gallery_event_after_upload(
110    echo: &mut RoomMessageEventContent,
111    sent: HashMap<String, AccumulatedSentMediaInfo>,
112) {
113    let MessageType::Gallery(gallery) = &mut echo.msgtype else {
114        // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
115        // handled here. The only way to end up here is that a item type has
116        // been tampered with in the database.
117        error!("Invalid gallery item types in database");
118        // Only crash debug builds.
119        debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
120        return;
121    };
122
123    // Some variants look really similar below, but the `event` and `info` are all
124    // different types…
125    for itemtype in gallery.itemtypes.iter_mut() {
126        match itemtype {
127            GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
128                Some(sent) => event.source = sent.file.clone(),
129                None => error!("key for item {:?} does not exist on gallery event", event.source),
130            },
131            GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
132                Some(sent) => {
133                    event.source = sent.file.clone();
134                    if let Some(info) = event.info.as_mut() {
135                        info.thumbnail_source = sent.thumbnail.clone();
136                    }
137                }
138                None => error!("key for item {:?} does not exist on gallery event", event.source),
139            },
140            GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
141                Some(sent) => {
142                    event.source = sent.file.clone();
143                    if let Some(info) = event.info.as_mut() {
144                        info.thumbnail_source = sent.thumbnail.clone();
145                    }
146                }
147                None => error!("key for item {:?} does not exist on gallery event", event.source),
148            },
149            GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
150                Some(sent) => {
151                    event.source = sent.file.clone();
152                    if let Some(info) = event.info.as_mut() {
153                        info.thumbnail_source = sent.thumbnail.clone();
154                    }
155                }
156                None => error!("key for item {:?} does not exist on gallery event", event.source),
157            },
158
159            _ => {
160                // All `GalleryItemType` created by `Room::make_gallery_item_type` should be
161                // handled here. The only way to end up here is that a item type has
162                // been tampered with in the database.
163                error!("Invalid gallery item types in database");
164                // Only crash debug builds.
165                debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
166            }
167        }
168    }
169}
170
171#[derive(Default)]
172struct MediaCacheResult {
173    upload_thumbnail_txn: Option<OwnedTransactionId>,
174    event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
175    queue_thumbnail_info: Option<QueueThumbnailInfo>,
176}
177
178impl RoomSendQueue {
179    /// Queues an attachment to be sent to the room, using the send queue.
180    ///
181    /// This returns quickly (without sending or uploading anything), and will
182    /// push the event to be sent into a queue, handled in the background.
183    ///
184    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
185    /// the [`Self::subscribe()`] method to get updates about the sending of
186    /// that event.
187    ///
188    /// By default, if sending failed on the first attempt, it will be retried a
189    /// few times. If sending failed after those retries, the entire
190    /// client's sending queue will be disabled, and it will need to be
191    /// manually re-enabled by the caller (e.g. after network is back, or when
192    /// something has been done about the faulty requests).
193    ///
194    /// The attachment and its optional thumbnail are stored in the media cache
195    /// and can be retrieved at any time, by calling
196    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
197    /// in the local or remote echo, and using a `MediaFormat::File`.
198    #[instrument(skip_all, fields(event_txn))]
199    pub async fn send_attachment(
200        &self,
201        filename: impl Into<String>,
202        content_type: Mime,
203        data: Vec<u8>,
204        mut config: AttachmentConfig,
205    ) -> Result<SendHandle, RoomSendQueueError> {
206        let Some(room) = self.inner.room.get() else {
207            return Err(RoomSendQueueError::RoomDisappeared);
208        };
209
210        if room.state() != RoomState::Joined {
211            return Err(RoomSendQueueError::RoomNotJoined);
212        }
213
214        let filename = filename.into();
215        let upload_file_txn = TransactionId::new();
216        let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
217
218        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
219        debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
220
221        let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
222
223        let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
224            RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
225                .await?;
226
227        // Create the content for the media event.
228        let event_content = room
229            .make_media_event(
230                Room::make_attachment_type(
231                    &content_type,
232                    filename,
233                    file_media_request.source.clone(),
234                    config.caption,
235                    config.info,
236                    event_thumbnail_info,
237                ),
238                config.mentions,
239                config.reply,
240            )
241            .await
242            .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
243
244        let created_at = MilliSecondsSinceUnixEpoch::now();
245
246        // Save requests in the queue storage.
247        self.inner
248            .queue
249            .push_media(
250                event_content.clone(),
251                content_type,
252                send_event_txn.clone().into(),
253                created_at,
254                upload_file_txn.clone(),
255                file_media_request,
256                queue_thumbnail_info,
257            )
258            .await?;
259
260        trace!("manager sends a media to the background task");
261
262        self.inner.notifier.notify_one();
263
264        let send_handle = SendHandle {
265            room: self.clone(),
266            transaction_id: send_event_txn.clone().into(),
267            media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
268            created_at,
269        };
270
271        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
272            transaction_id: send_event_txn.clone().into(),
273            content: LocalEchoContent::Event {
274                serialized_event: SerializableEventContent::new(&event_content.into())
275                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
276                send_handle: send_handle.clone(),
277                send_error: None,
278            },
279        }));
280
281        Ok(send_handle)
282    }
283
284    /// Queues a gallery to be sent to the room, using the send queue.
285    ///
286    /// This returns quickly (without sending or uploading anything), and will
287    /// push the event to be sent into a queue, handled in the background.
288    ///
289    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
290    /// the [`Self::subscribe()`] method to get updates about the sending of
291    /// that event.
292    ///
293    /// By default, if sending failed on the first attempt, it will be retried a
294    /// few times. If sending failed after those retries, the entire
295    /// client's sending queue will be disabled, and it will need to be
296    /// manually re-enabled by the caller (e.g. after network is back, or when
297    /// something has been done about the faulty requests).
298    ///
299    /// The attachments and their optional thumbnails are stored in the media
300    /// cache and can be retrieved at any time, by calling
301    /// [`Media::get_media_content()`] with the `MediaSource` that can be found
302    /// in the local or remote echo, and using a `MediaFormat::File`.
303    #[cfg(feature = "unstable-msc4274")]
304    #[instrument(skip_all, fields(event_txn))]
305    pub async fn send_gallery(
306        &self,
307        gallery: GalleryConfig,
308    ) -> Result<SendHandle, RoomSendQueueError> {
309        let Some(room) = self.inner.room.get() else {
310            return Err(RoomSendQueueError::RoomDisappeared);
311        };
312
313        if room.state() != RoomState::Joined {
314            return Err(RoomSendQueueError::RoomNotJoined);
315        }
316
317        if gallery.is_empty() {
318            return Err(RoomSendQueueError::EmptyGallery);
319        }
320
321        let send_event_txn =
322            gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
323
324        Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
325
326        let mut item_types = Vec::with_capacity(gallery.len());
327        let mut item_queue_infos = Vec::with_capacity(gallery.len());
328        let mut media_handles = Vec::with_capacity(gallery.len());
329
330        for item_info in gallery.items {
331            let GalleryItemInfo { filename, content_type, data, .. } = item_info;
332
333            let upload_file_txn = TransactionId::new();
334
335            debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
336
337            let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
338
339            let MediaCacheResult {
340                upload_thumbnail_txn,
341                event_thumbnail_info,
342                queue_thumbnail_info,
343            } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
344                .await?;
345
346            item_types.push(Room::make_gallery_item_type(
347                &content_type,
348                filename,
349                file_media_request.source.clone(),
350                item_info.caption,
351                Some(item_info.attachment_info),
352                event_thumbnail_info,
353            ));
354
355            item_queue_infos.push(GalleryItemQueueInfo {
356                content_type,
357                upload_file_txn: upload_file_txn.clone(),
358                file_media_request,
359                thumbnail: queue_thumbnail_info,
360            });
361
362            media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
363        }
364
365        // Create the content for the gallery event.
366        let (body, formatted) =
367            gallery.caption.map(|caption| (caption.body, caption.formatted)).unwrap_or_default();
368        let event_content = room
369            .make_media_event(
370                MessageType::Gallery(GalleryMessageEventContent::new(body, formatted, item_types)),
371                gallery.mentions,
372                gallery.reply,
373            )
374            .await
375            .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
376
377        let created_at = MilliSecondsSinceUnixEpoch::now();
378
379        // Save requests in the queue storage.
380        self.inner
381            .queue
382            .push_gallery(
383                event_content.clone(),
384                send_event_txn.clone().into(),
385                created_at,
386                item_queue_infos,
387            )
388            .await?;
389
390        trace!("manager sends a gallery to the background task");
391
392        self.inner.notifier.notify_one();
393
394        let send_handle = SendHandle {
395            room: self.clone(),
396            transaction_id: send_event_txn.clone().into(),
397            media_handles,
398            created_at,
399        };
400
401        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
402            transaction_id: send_event_txn.clone().into(),
403            content: LocalEchoContent::Event {
404                serialized_event: SerializableEventContent::new(&event_content.into())
405                    .map_err(RoomSendQueueStorageError::JsonSerialization)?,
406                send_handle: send_handle.clone(),
407                send_error: None,
408            },
409        }));
410
411        Ok(send_handle)
412    }
413
414    async fn cache_media(
415        room: &Room,
416        data: Vec<u8>,
417        thumbnail: Option<Thumbnail>,
418        file_media_request: &MediaRequestParameters,
419    ) -> Result<MediaCacheResult, RoomSendQueueError> {
420        let client = room.client();
421        let media_store =
422            client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
423
424        // Cache the file itself in the cache store.
425        media_store
426            .add_media_content(
427                file_media_request,
428                data,
429                // Make sure that the file is stored until it has been uploaded.
430                IgnoreMediaRetentionPolicy::Yes,
431            )
432            .await
433            .map_err(RoomSendQueueStorageError::MediaStoreError)?;
434
435        // Process the thumbnail, if it's been provided.
436        if let Some(thumbnail) = thumbnail {
437            let txn = TransactionId::new();
438            trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
439
440            // Create the information required for filling the thumbnail section of the
441            // event.
442            let (data, content_type, thumbnail_info) = thumbnail.into_parts();
443            let file_size = data.len();
444
445            let thumbnail_height = thumbnail_info.height;
446            let thumbnail_width = thumbnail_info.width;
447
448            // Cache thumbnail in the cache store.
449            let thumbnail_media_request = Media::make_local_file_media_request(&txn);
450            media_store
451                .add_media_content(
452                    &thumbnail_media_request,
453                    data,
454                    // Make sure that the thumbnail is stored until it has been uploaded.
455                    IgnoreMediaRetentionPolicy::Yes,
456                )
457                .await
458                .map_err(RoomSendQueueStorageError::MediaStoreError)?;
459
460            Ok(MediaCacheResult {
461                upload_thumbnail_txn: Some(txn.clone()),
462                event_thumbnail_info: Some((
463                    thumbnail_media_request.source.clone(),
464                    thumbnail_info,
465                )),
466                queue_thumbnail_info: Some(QueueThumbnailInfo {
467                    finish_upload_thumbnail_info: FinishUploadThumbnailInfo {
468                        txn,
469                        width: thumbnail_width,
470                        height: thumbnail_height,
471                    },
472                    media_request_parameters: thumbnail_media_request,
473                    content_type,
474                    file_size,
475                }),
476            })
477        } else {
478            Ok(Default::default())
479        }
480    }
481}
482
483impl QueueStorage {
484    /// Consumes a finished upload and queues sending of the final media event.
485    #[allow(clippy::too_many_arguments)]
486    pub(super) async fn handle_dependent_finish_upload(
487        &self,
488        client: &Client,
489        event_txn: OwnedTransactionId,
490        parent_key: SentRequestKey,
491        mut local_echo: RoomMessageEventContent,
492        file_upload_txn: OwnedTransactionId,
493        thumbnail_info: Option<FinishUploadThumbnailInfo>,
494        new_updates: &mut Vec<RoomSendQueueUpdate>,
495    ) -> Result<(), RoomSendQueueError> {
496        // Both uploads are ready: enqueue the event with its final data.
497        let sent_media = parent_key
498            .into_media()
499            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
500
501        update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
502            .await?;
503        update_media_event_after_upload(&mut local_echo, sent_media);
504
505        let new_content = SerializableEventContent::new(&local_echo.into())
506            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
507
508        // Indicates observers that the upload finished, by editing the local echo for
509        // the event into its final form before sending.
510        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
511            transaction_id: event_txn.clone(),
512            new_content: new_content.clone(),
513        });
514
515        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
516
517        client
518            .state_store()
519            .save_send_queue_request(
520                &self.room_id,
521                event_txn,
522                MilliSecondsSinceUnixEpoch::now(),
523                new_content.into(),
524                Self::HIGH_PRIORITY,
525            )
526            .await
527            .map_err(RoomSendQueueStorageError::StateStoreError)?;
528
529        Ok(())
530    }
531
532    /// Consumes a finished gallery upload and queues sending of the final
533    /// gallery event.
534    #[cfg(feature = "unstable-msc4274")]
535    #[allow(clippy::too_many_arguments)]
536    pub(super) async fn handle_dependent_finish_gallery_upload(
537        &self,
538        client: &Client,
539        event_txn: OwnedTransactionId,
540        parent_key: SentRequestKey,
541        mut local_echo: RoomMessageEventContent,
542        item_infos: Vec<FinishGalleryItemInfo>,
543        new_updates: &mut Vec<RoomSendQueueUpdate>,
544    ) -> Result<(), RoomSendQueueError> {
545        // All uploads are ready: enqueue the event with its final data.
546        let sent_gallery = parent_key
547            .into_media()
548            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
549
550        let mut sent_media_vec = sent_gallery.accumulated;
551        sent_media_vec.push(AccumulatedSentMediaInfo {
552            file: sent_gallery.file,
553            thumbnail: sent_gallery.thumbnail,
554        });
555
556        let mut sent_infos = HashMap::new();
557
558        for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
559            let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
560
561            // Store the sent media under the original cache key for later insertion into
562            // the local echo.
563            let from_req = Media::make_local_file_media_request(&file_upload_txn);
564            sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
565
566            update_media_cache_keys_after_upload(
567                client,
568                &file_upload_txn,
569                thumbnail_info,
570                &sent_media.into(),
571            )
572            .await?;
573        }
574
575        update_gallery_event_after_upload(&mut local_echo, sent_infos);
576
577        let new_content = SerializableEventContent::new(&local_echo.into())
578            .map_err(RoomSendQueueStorageError::JsonSerialization)?;
579
580        // Indicates observers that the upload finished, by editing the local echo for
581        // the event into its final form before sending.
582        new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
583            transaction_id: event_txn.clone(),
584            new_content: new_content.clone(),
585        });
586
587        trace!(%event_txn, "queueing media event after successfully uploading media(s)");
588
589        client
590            .state_store()
591            .save_send_queue_request(
592                &self.room_id,
593                event_txn,
594                MilliSecondsSinceUnixEpoch::now(),
595                new_content.into(),
596                Self::HIGH_PRIORITY,
597            )
598            .await
599            .map_err(RoomSendQueueStorageError::StateStoreError)?;
600
601        Ok(())
602    }
603
604    /// Consumes a finished file or thumbnail upload and queues the dependent
605    /// file or thumbnail upload.
606    #[allow(clippy::too_many_arguments)]
607    pub(super) async fn handle_dependent_file_or_thumbnail_upload(
608        &self,
609        client: &Client,
610        next_upload_txn: OwnedTransactionId,
611        parent_key: SentRequestKey,
612        content_type: String,
613        cache_key: MediaRequestParameters,
614        event_txn: OwnedTransactionId,
615        parent_is_thumbnail_upload: bool,
616    ) -> Result<(), RoomSendQueueError> {
617        // The previous file or thumbnail has been sent, now transform the dependent
618        // file or thumbnail upload request into a ready one.
619        let sent_media = parent_key
620            .into_media()
621            .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
622
623        // If the previous upload was a thumbnail, it shouldn't have
624        // a thumbnail itself.
625        if parent_is_thumbnail_upload {
626            debug_assert!(sent_media.thumbnail.is_none());
627            if sent_media.thumbnail.is_some() {
628                warn!("unexpected thumbnail for a thumbnail!");
629            }
630        }
631
632        trace!(
633            related_to = %event_txn,
634            "done uploading file or thumbnail, now queuing the dependent file \
635             or thumbnail upload request",
636        );
637
638        // If the parent request was a thumbnail upload, don't add it to the list of
639        // accumulated medias yet because its dependent file upload is still
640        // pending. If the parent request was a file upload, we know that both
641        // the file and its thumbnail (if any) have finished uploading and we
642        // can add them to the accumulated sent media.
643        #[cfg(feature = "unstable-msc4274")]
644        let accumulated = if parent_is_thumbnail_upload {
645            sent_media.accumulated
646        } else {
647            let mut accumulated = sent_media.accumulated;
648            accumulated.push(AccumulatedSentMediaInfo {
649                file: sent_media.file.clone(),
650                thumbnail: sent_media.thumbnail,
651            });
652            accumulated
653        };
654
655        let request = QueuedRequestKind::MediaUpload {
656            content_type,
657            cache_key,
658            // If the previous upload was a thumbnail, it becomes the thumbnail source for the next
659            // upload.
660            thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
661            related_to: event_txn,
662            #[cfg(feature = "unstable-msc4274")]
663            accumulated,
664        };
665
666        client
667            .state_store()
668            .save_send_queue_request(
669                &self.room_id,
670                next_upload_txn,
671                MilliSecondsSinceUnixEpoch::now(),
672                request,
673                Self::HIGH_PRIORITY,
674            )
675            .await
676            .map_err(RoomSendQueueStorageError::StateStoreError)?;
677
678        Ok(())
679    }
680
681    /// Try to abort an upload that would be ongoing.
682    ///
683    /// Return true if any media (media itself or its thumbnail) was being
684    /// uploaded. In this case, the media event has also been removed from
685    /// the send queue. If it returns false, then the uploads already
686    /// happened, and the event sending *may* have started.
687    #[instrument(skip(self, handles))]
688    pub(super) async fn abort_upload(
689        &self,
690        event_txn: &TransactionId,
691        handles: &MediaHandles,
692    ) -> Result<bool, RoomSendQueueStorageError> {
693        let mut guard = self.store.lock().await;
694        let client = guard.client()?;
695
696        // Keep the lock until we're done touching the storage.
697        debug!("trying to abort an upload");
698
699        let store = client.state_store();
700
701        let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
702        let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
703
704        let mut removed_dependent_upload = false;
705        let mut removed_dependent_event = false;
706
707        if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn
708            && store.remove_send_queue_request(&self.room_id, thumbnail_txn).await?
709        {
710            // The thumbnail upload existed as a request: either it was pending (something
711            // else was being sent), or it was actively being sent.
712            trace!("could remove thumbnail request, removing 2 dependent requests now");
713
714            // 1. Try to abort sending using the being_sent info, in case it was active.
715            if let Some(info) = guard.being_sent.as_ref()
716                && info.transaction_id == *thumbnail_txn
717            {
718                // SAFETY: we knew it was Some(), two lines above.
719                let info = guard.being_sent.take().unwrap();
720                if info.cancel_upload() {
721                    trace!("aborted ongoing thumbnail upload");
722                }
723            }
724
725            // 2. Remove the dependent requests.
726            removed_dependent_upload = store
727                .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
728                .await?;
729
730            if !removed_dependent_upload {
731                warn!("unable to find the dependent file upload request");
732            }
733
734            removed_dependent_event =
735                store.remove_dependent_queued_request(&self.room_id, &event_as_dependent).await?;
736
737            if !removed_dependent_event {
738                warn!("unable to find the dependent media event upload request");
739            }
740        }
741
742        // If we're here:
743        // - either there was no thumbnail to upload,
744        // - or the thumbnail request has terminated already.
745        //
746        // So the next target is the upload request itself, in both cases.
747
748        if !removed_dependent_upload {
749            if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
750                // The upload existed as a request: either it was pending (something else was
751                // being sent), or it was actively being sent.
752                trace!("could remove file upload request, removing 1 dependent request");
753
754                // 1. Try to abort sending using the being_sent info, in case it was active.
755                if let Some(info) = guard.being_sent.as_ref()
756                    && info.transaction_id == handles.upload_file_txn
757                {
758                    // SAFETY: we knew it was Some(), two lines above.
759                    let info = guard.being_sent.take().unwrap();
760                    if info.cancel_upload() {
761                        trace!("aborted ongoing file upload");
762                    }
763                }
764
765                // 2. Remove the dependent request.
766                if !store
767                    .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
768                    .await?
769                {
770                    warn!("unable to find the dependent media event upload request");
771                }
772            } else {
773                // The upload was not in the send queue, so it's completed.
774                //
775                // It means the event sending is either still queued as a dependent request, or
776                // it's graduated into a request.
777                if !removed_dependent_event
778                    && !store
779                        .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
780                        .await?
781                {
782                    // The media event has been promoted into a request, or the promoted request
783                    // has been sent already: we couldn't abort, let the caller decide what to do.
784                    debug!("uploads already happened => deferring to aborting an event sending");
785                    return Ok(false);
786                }
787            }
788        }
789
790        // At this point, all the requests and dependent requests have been cleaned up.
791        // Perform the final step: empty the cache from the local items.
792        {
793            let media_store = client.media_store().lock().await?;
794            media_store
795                .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
796                .await?;
797            if let Some(txn) = &handles.upload_thumbnail_txn {
798                media_store.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
799            }
800        }
801
802        debug!("successfully aborted!");
803        Ok(true)
804    }
805
806    #[instrument(skip(self, caption, formatted_caption))]
807    pub(super) async fn edit_media_caption(
808        &self,
809        txn: &TransactionId,
810        caption: Option<String>,
811        formatted_caption: Option<FormattedBody>,
812        mentions: Option<Mentions>,
813    ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
814        // This error will be popular here.
815        use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
816
817        let guard = self.store.lock().await;
818        let client = guard.client()?;
819        let store = client.state_store();
820
821        // The media event can be in one of three states:
822        // - still stored as a dependent request,
823        // - stored as a queued request, active (aka it's being sent).
824        // - stored as a queued request, not active yet (aka it's not being sent yet),
825        //
826        // We'll handle each of these cases one by one.
827
828        {
829            // If the event can be found as a dependent event, update the captions, save it
830            // back into the database, and return early.
831            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
832
833            if let Some(found) =
834                dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
835            {
836                trace!("found the caption to edit in a dependent request");
837
838                let DependentQueuedRequestKind::FinishUpload {
839                    mut local_echo,
840                    file_upload,
841                    thumbnail_info,
842                } = found.kind
843                else {
844                    return Err(InvalidMediaCaptionEdit);
845                };
846
847                if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
848                    return Err(InvalidMediaCaptionEdit);
849                }
850
851                let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
852                    local_echo: local_echo.clone(),
853                    file_upload,
854                    thumbnail_info,
855                };
856                store
857                    .update_dependent_queued_request(
858                        &self.room_id,
859                        &found.own_transaction_id,
860                        new_dependent_request,
861                    )
862                    .await?;
863
864                trace!("caption successfully updated");
865                return Ok(Some((*local_echo).into()));
866            }
867        }
868
869        let requests = store.load_send_queue_requests(&self.room_id).await?;
870        let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
871            // Couldn't be found anymore, it's not possible to update captions.
872            return Ok(None);
873        };
874
875        trace!("found the caption to edit as a request");
876
877        let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
878            return Err(InvalidMediaCaptionEdit);
879        };
880
881        let deserialized = serialized_content.deserialize()?;
882        let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
883            return Err(InvalidMediaCaptionEdit);
884        };
885
886        if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
887            return Err(InvalidMediaCaptionEdit);
888        }
889
890        let any_content: AnyMessageLikeEventContent = content.into();
891        let new_serialized = SerializableEventContent::new(&any_content.clone())?;
892
893        // If the request is active (being sent), send a dependent request.
894        if let Some(being_sent) = guard.being_sent.as_ref()
895            && being_sent.transaction_id == *txn
896        {
897            // Record a dependent request to edit, and exit.
898            store
899                .save_dependent_queued_request(
900                    &self.room_id,
901                    txn,
902                    ChildTransactionId::new(),
903                    MilliSecondsSinceUnixEpoch::now(),
904                    DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
905                )
906                .await?;
907
908            trace!("media event was being sent, pushed a dependent edit");
909            return Ok(Some(any_content));
910        }
911
912        // The request is not active: edit the local echo.
913        store
914            .update_send_queue_request(
915                &self.room_id,
916                txn,
917                QueuedRequestKind::Event { content: new_serialized },
918            )
919            .await?;
920
921        trace!("media event was not being sent, updated local echo");
922        Ok(Some(any_content))
923    }
924}
925
926/// Update cache keys in the cache store after uploading a media file /
927/// thumbnail.
928async fn update_media_cache_keys_after_upload(
929    client: &Client,
930    file_upload_txn: &OwnedTransactionId,
931    thumbnail_info: Option<FinishUploadThumbnailInfo>,
932    sent_media: &SentMediaInfo,
933) -> Result<(), RoomSendQueueError> {
934    // Do it for the file itself.
935    let from_req = Media::make_local_file_media_request(file_upload_txn);
936
937    trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
938    let media_store =
939        client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
940
941    // The media file can now be removed during cleanups.
942    media_store
943        .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
944        .await
945        .map_err(RoomSendQueueStorageError::MediaStoreError)?;
946
947    media_store
948        .replace_media_key(
949            &from_req,
950            &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
951        )
952        .await
953        .map_err(RoomSendQueueStorageError::MediaStoreError)?;
954
955    // Rename the thumbnail too, if needs be.
956    if let Some((info, remote_thumbnail_source)) =
957        thumbnail_info.as_ref().zip(sent_media.thumbnail.clone())
958    {
959        let from_request_params = Media::make_local_file_media_request(&info.txn);
960
961        if let Some((height, width)) = info.height.zip(info.width) {
962            trace!(
963                from = ?from_req.source,
964                to = ?remote_thumbnail_source,
965                height = u64::from(height),
966                width = u64::from(width),
967                "storing thumbnail as a thumbnail for the uploaded media, and a file in itself, in cache store"
968            );
969
970            // Try to reload the content of the thumbnail from the media store. As it's not
971            // a strong requirement to store the thumbnail a second time, we
972            // silently log errors instead of propagating them to the caller.
973            match media_store
974                .get_media_content(&MediaRequestParameters {
975                    source: from_request_params.source.clone(),
976                    format: MediaFormat::File,
977                })
978                .await
979            {
980                Ok(Some(thumbnail_content)) => {
981                    // Also cache this as a thumbnail for the thumbnail, in the media store; the
982                    // ElementX apps expect that specific format for thumbnails.
983                    media_store
984                        .add_media_content(
985                            &MediaRequestParameters {
986                                source: remote_thumbnail_source.clone(),
987                                format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(
988                                    width, height,
989                                )),
990                            },
991                            thumbnail_content,
992                            IgnoreMediaRetentionPolicy::No,
993                        )
994                        .await
995                        .map_err(RoomSendQueueStorageError::MediaStoreError)?;
996                }
997
998                Ok(None) => {
999                    warn!(
1000                        from = ?from_request_params.source,
1001                        "unable to reload thumbnail content from media store: no content found",
1002                    );
1003                }
1004
1005                Err(err) => {
1006                    // Silently log the error, but proceed, as storing the thumbnail as such isn't
1007                    // a strong requirement.
1008                    error!(
1009                        from = ?from_request_params.source,
1010                        "unable to reload thumbnail content from media store: {err}"
1011                    );
1012                }
1013            }
1014        } else {
1015            trace!(from = ?from_req.source, to = ?remote_thumbnail_source, "only renaming thumbnail key to file in cache store");
1016        }
1017
1018        // The thumbnail file can now be removed during cleanups.
1019        media_store
1020            .set_ignore_media_retention_policy(&from_request_params, IgnoreMediaRetentionPolicy::No)
1021            .await
1022            .map_err(RoomSendQueueStorageError::MediaStoreError)?;
1023
1024        // Save the thumbnail as a file as well.
1025        media_store
1026            .replace_media_key(
1027                &from_request_params,
1028                &MediaRequestParameters {
1029                    source: remote_thumbnail_source,
1030                    format: MediaFormat::File,
1031                },
1032            )
1033            .await
1034            .map_err(RoomSendQueueStorageError::MediaStoreError)?;
1035    }
1036
1037    Ok(())
1038}