1#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21 RoomState,
22 media::{MediaFormat, MediaRequestParameters, store::IgnoreMediaRetentionPolicy},
23 store::{
24 ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
25 QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
26 },
27};
28#[cfg(feature = "unstable-msc4274")]
29use matrix_sdk_base::{
30 media::UniqueKey,
31 store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
32};
33use mime::Mime;
34#[cfg(feature = "unstable-msc4274")]
35use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
36use ruma::{
37 MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
38 events::{
39 AnyMessageLikeEventContent, Mentions,
40 room::{
41 MediaSource, ThumbnailInfo,
42 message::{FormattedBody, MessageType, RoomMessageEventContent},
43 },
44 },
45};
46use tracing::{Span, debug, error, instrument, trace, warn};
47
48use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError};
49use crate::{
50 Client, Media, Room,
51 attachment::{AttachmentConfig, Thumbnail},
52 room::edit::update_media_caption,
53 send_queue::{
54 LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
55 SendHandle,
56 },
57};
58#[cfg(feature = "unstable-msc4274")]
59use crate::{
60 attachment::{GalleryConfig, GalleryItemInfo},
61 send_queue::GalleryItemQueueInfo,
62};
63
64fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
67 match &mut echo.msgtype {
70 MessageType::Audio(event) => {
71 event.source = sent.file;
72 }
73 MessageType::File(event) => {
74 event.source = sent.file;
75 if let Some(info) = event.info.as_mut() {
76 info.thumbnail_source = sent.thumbnail;
77 }
78 }
79 MessageType::Image(event) => {
80 event.source = sent.file;
81 if let Some(info) = event.info.as_mut() {
82 info.thumbnail_source = sent.thumbnail;
83 }
84 }
85 MessageType::Video(event) => {
86 event.source = sent.file;
87 if let Some(info) = event.info.as_mut() {
88 info.thumbnail_source = sent.thumbnail;
89 }
90 }
91
92 _ => {
93 error!("Invalid message type in database: {}", echo.msgtype());
97 debug_assert!(false, "invalid message type in database");
99 }
100 }
101}
102
103#[cfg(feature = "unstable-msc4274")]
106fn update_gallery_event_after_upload(
107 echo: &mut RoomMessageEventContent,
108 sent: HashMap<String, AccumulatedSentMediaInfo>,
109) {
110 let MessageType::Gallery(gallery) = &mut echo.msgtype else {
111 error!("Invalid gallery item types in database");
115 debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
117 return;
118 };
119
120 for itemtype in gallery.itemtypes.iter_mut() {
123 match itemtype {
124 GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
125 Some(sent) => event.source = sent.file.clone(),
126 None => error!("key for item {:?} does not exist on gallery event", event.source),
127 },
128 GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
129 Some(sent) => {
130 event.source = sent.file.clone();
131 if let Some(info) = event.info.as_mut() {
132 info.thumbnail_source = sent.thumbnail.clone();
133 }
134 }
135 None => error!("key for item {:?} does not exist on gallery event", event.source),
136 },
137 GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
138 Some(sent) => {
139 event.source = sent.file.clone();
140 if let Some(info) = event.info.as_mut() {
141 info.thumbnail_source = sent.thumbnail.clone();
142 }
143 }
144 None => error!("key for item {:?} does not exist on gallery event", event.source),
145 },
146 GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
147 Some(sent) => {
148 event.source = sent.file.clone();
149 if let Some(info) = event.info.as_mut() {
150 info.thumbnail_source = sent.thumbnail.clone();
151 }
152 }
153 None => error!("key for item {:?} does not exist on gallery event", event.source),
154 },
155
156 _ => {
157 error!("Invalid gallery item types in database");
161 debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
163 }
164 }
165 }
166}
167
168#[derive(Default)]
169struct MediaCacheResult {
170 upload_thumbnail_txn: Option<OwnedTransactionId>,
171 event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
172 queue_thumbnail_info: Option<QueueThumbnailInfo>,
173}
174
175impl RoomSendQueue {
176 #[instrument(skip_all, fields(event_txn))]
196 pub async fn send_attachment(
197 &self,
198 filename: impl Into<String>,
199 content_type: Mime,
200 data: Vec<u8>,
201 mut config: AttachmentConfig,
202 ) -> Result<SendHandle, RoomSendQueueError> {
203 let Some(room) = self.inner.room.get() else {
204 return Err(RoomSendQueueError::RoomDisappeared);
205 };
206
207 if room.state() != RoomState::Joined {
208 return Err(RoomSendQueueError::RoomNotJoined);
209 }
210
211 let filename = filename.into();
212 let upload_file_txn = TransactionId::new();
213 let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
214
215 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
216 debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
217
218 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
219
220 let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
221 RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
222 .await?;
223
224 let event_content = room
226 .make_media_event(
227 Room::make_attachment_type(
228 &content_type,
229 filename,
230 file_media_request.source.clone(),
231 config.caption,
232 config.info,
233 event_thumbnail_info,
234 ),
235 config.mentions,
236 config.reply,
237 )
238 .await
239 .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
240
241 let created_at = MilliSecondsSinceUnixEpoch::now();
242
243 self.inner
245 .queue
246 .push_media(
247 event_content.clone(),
248 content_type,
249 send_event_txn.clone().into(),
250 created_at,
251 upload_file_txn.clone(),
252 file_media_request,
253 queue_thumbnail_info,
254 )
255 .await?;
256
257 trace!("manager sends a media to the background task");
258
259 self.inner.notifier.notify_one();
260
261 let send_handle = SendHandle {
262 room: self.clone(),
263 transaction_id: send_event_txn.clone().into(),
264 media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
265 created_at,
266 };
267
268 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
269 transaction_id: send_event_txn.clone().into(),
270 content: LocalEchoContent::Event {
271 serialized_event: SerializableEventContent::new(&event_content.into())
272 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
273 send_handle: send_handle.clone(),
274 send_error: None,
275 },
276 }));
277
278 Ok(send_handle)
279 }
280
281 #[cfg(feature = "unstable-msc4274")]
301 #[instrument(skip_all, fields(event_txn))]
302 pub async fn send_gallery(
303 &self,
304 gallery: GalleryConfig,
305 ) -> Result<SendHandle, RoomSendQueueError> {
306 let Some(room) = self.inner.room.get() else {
307 return Err(RoomSendQueueError::RoomDisappeared);
308 };
309
310 if room.state() != RoomState::Joined {
311 return Err(RoomSendQueueError::RoomNotJoined);
312 }
313
314 if gallery.is_empty() {
315 return Err(RoomSendQueueError::EmptyGallery);
316 }
317
318 let send_event_txn =
319 gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
320
321 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
322
323 let mut item_types = Vec::with_capacity(gallery.len());
324 let mut item_queue_infos = Vec::with_capacity(gallery.len());
325 let mut media_handles = Vec::with_capacity(gallery.len());
326
327 for item_info in gallery.items {
328 let GalleryItemInfo { filename, content_type, data, .. } = item_info;
329
330 let upload_file_txn = TransactionId::new();
331
332 debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
333
334 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
335
336 let MediaCacheResult {
337 upload_thumbnail_txn,
338 event_thumbnail_info,
339 queue_thumbnail_info,
340 } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
341 .await?;
342
343 item_types.push(Room::make_gallery_item_type(
344 &content_type,
345 filename,
346 file_media_request.source.clone(),
347 item_info.caption,
348 Some(item_info.attachment_info),
349 event_thumbnail_info,
350 ));
351
352 item_queue_infos.push(GalleryItemQueueInfo {
353 content_type,
354 upload_file_txn: upload_file_txn.clone(),
355 file_media_request,
356 thumbnail: queue_thumbnail_info,
357 });
358
359 media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
360 }
361
362 let (body, formatted) =
364 gallery.caption.map(|caption| (caption.body, caption.formatted)).unwrap_or_default();
365 let event_content = room
366 .make_media_event(
367 MessageType::Gallery(GalleryMessageEventContent::new(body, formatted, item_types)),
368 gallery.mentions,
369 gallery.reply,
370 )
371 .await
372 .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
373
374 let created_at = MilliSecondsSinceUnixEpoch::now();
375
376 self.inner
378 .queue
379 .push_gallery(
380 event_content.clone(),
381 send_event_txn.clone().into(),
382 created_at,
383 item_queue_infos,
384 )
385 .await?;
386
387 trace!("manager sends a gallery to the background task");
388
389 self.inner.notifier.notify_one();
390
391 let send_handle = SendHandle {
392 room: self.clone(),
393 transaction_id: send_event_txn.clone().into(),
394 media_handles,
395 created_at,
396 };
397
398 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
399 transaction_id: send_event_txn.clone().into(),
400 content: LocalEchoContent::Event {
401 serialized_event: SerializableEventContent::new(&event_content.into())
402 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
403 send_handle: send_handle.clone(),
404 send_error: None,
405 },
406 }));
407
408 Ok(send_handle)
409 }
410
411 async fn cache_media(
412 room: &Room,
413 data: Vec<u8>,
414 thumbnail: Option<Thumbnail>,
415 file_media_request: &MediaRequestParameters,
416 ) -> Result<MediaCacheResult, RoomSendQueueError> {
417 let client = room.client();
418 let media_store =
419 client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
420
421 media_store
423 .add_media_content(
424 file_media_request,
425 data,
426 IgnoreMediaRetentionPolicy::Yes,
428 )
429 .await
430 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
431
432 if let Some(thumbnail) = thumbnail {
434 let txn = TransactionId::new();
435 trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
436
437 let (data, content_type, thumbnail_info) = thumbnail.into_parts();
440 let file_size = data.len();
441
442 let thumbnail_media_request = Media::make_local_file_media_request(&txn);
444 media_store
445 .add_media_content(
446 &thumbnail_media_request,
447 data,
448 IgnoreMediaRetentionPolicy::Yes,
450 )
451 .await
452 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
453
454 Ok(MediaCacheResult {
455 upload_thumbnail_txn: Some(txn.clone()),
456 event_thumbnail_info: Some((
457 thumbnail_media_request.source.clone(),
458 thumbnail_info,
459 )),
460 queue_thumbnail_info: Some(QueueThumbnailInfo {
461 finish_upload_thumbnail_info: FinishUploadThumbnailInfo {
462 txn,
463 width: None,
464 height: None,
465 },
466 media_request_parameters: thumbnail_media_request,
467 content_type,
468 file_size,
469 }),
470 })
471 } else {
472 Ok(Default::default())
473 }
474 }
475}
476
477impl QueueStorage {
478 #[allow(clippy::too_many_arguments)]
480 pub(super) async fn handle_dependent_finish_upload(
481 &self,
482 client: &Client,
483 event_txn: OwnedTransactionId,
484 parent_key: SentRequestKey,
485 mut local_echo: RoomMessageEventContent,
486 file_upload_txn: OwnedTransactionId,
487 thumbnail_info: Option<FinishUploadThumbnailInfo>,
488 new_updates: &mut Vec<RoomSendQueueUpdate>,
489 ) -> Result<(), RoomSendQueueError> {
490 let sent_media = parent_key
492 .into_media()
493 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
494
495 update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
496 .await?;
497 update_media_event_after_upload(&mut local_echo, sent_media);
498
499 let new_content = SerializableEventContent::new(&local_echo.into())
500 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
501
502 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
505 transaction_id: event_txn.clone(),
506 new_content: new_content.clone(),
507 });
508
509 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
510
511 client
512 .state_store()
513 .save_send_queue_request(
514 &self.room_id,
515 event_txn,
516 MilliSecondsSinceUnixEpoch::now(),
517 new_content.into(),
518 Self::HIGH_PRIORITY,
519 )
520 .await
521 .map_err(RoomSendQueueStorageError::StateStoreError)?;
522
523 Ok(())
524 }
525
526 #[cfg(feature = "unstable-msc4274")]
529 #[allow(clippy::too_many_arguments)]
530 pub(super) async fn handle_dependent_finish_gallery_upload(
531 &self,
532 client: &Client,
533 event_txn: OwnedTransactionId,
534 parent_key: SentRequestKey,
535 mut local_echo: RoomMessageEventContent,
536 item_infos: Vec<FinishGalleryItemInfo>,
537 new_updates: &mut Vec<RoomSendQueueUpdate>,
538 ) -> Result<(), RoomSendQueueError> {
539 let sent_gallery = parent_key
541 .into_media()
542 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
543
544 let mut sent_media_vec = sent_gallery.accumulated;
545 sent_media_vec.push(AccumulatedSentMediaInfo {
546 file: sent_gallery.file,
547 thumbnail: sent_gallery.thumbnail,
548 });
549
550 let mut sent_infos = HashMap::new();
551
552 for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
553 let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
554
555 let from_req = Media::make_local_file_media_request(&file_upload_txn);
558 sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
559
560 update_media_cache_keys_after_upload(
561 client,
562 &file_upload_txn,
563 thumbnail_info,
564 &sent_media.into(),
565 )
566 .await?;
567 }
568
569 update_gallery_event_after_upload(&mut local_echo, sent_infos);
570
571 let new_content = SerializableEventContent::new(&local_echo.into())
572 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
573
574 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
577 transaction_id: event_txn.clone(),
578 new_content: new_content.clone(),
579 });
580
581 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
582
583 client
584 .state_store()
585 .save_send_queue_request(
586 &self.room_id,
587 event_txn,
588 MilliSecondsSinceUnixEpoch::now(),
589 new_content.into(),
590 Self::HIGH_PRIORITY,
591 )
592 .await
593 .map_err(RoomSendQueueStorageError::StateStoreError)?;
594
595 Ok(())
596 }
597
598 #[allow(clippy::too_many_arguments)]
601 pub(super) async fn handle_dependent_file_or_thumbnail_upload(
602 &self,
603 client: &Client,
604 next_upload_txn: OwnedTransactionId,
605 parent_key: SentRequestKey,
606 content_type: String,
607 cache_key: MediaRequestParameters,
608 event_txn: OwnedTransactionId,
609 parent_is_thumbnail_upload: bool,
610 ) -> Result<(), RoomSendQueueError> {
611 let sent_media = parent_key
614 .into_media()
615 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
616
617 if parent_is_thumbnail_upload {
620 debug_assert!(sent_media.thumbnail.is_none());
621 if sent_media.thumbnail.is_some() {
622 warn!("unexpected thumbnail for a thumbnail!");
623 }
624 }
625
626 trace!(
627 related_to = %event_txn,
628 "done uploading file or thumbnail, now queuing the dependent file \
629 or thumbnail upload request",
630 );
631
632 #[cfg(feature = "unstable-msc4274")]
638 let accumulated = if parent_is_thumbnail_upload {
639 sent_media.accumulated
640 } else {
641 let mut accumulated = sent_media.accumulated;
642 accumulated.push(AccumulatedSentMediaInfo {
643 file: sent_media.file.clone(),
644 thumbnail: sent_media.thumbnail,
645 });
646 accumulated
647 };
648
649 let request = QueuedRequestKind::MediaUpload {
650 content_type,
651 cache_key,
652 thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
655 related_to: event_txn,
656 #[cfg(feature = "unstable-msc4274")]
657 accumulated,
658 };
659
660 client
661 .state_store()
662 .save_send_queue_request(
663 &self.room_id,
664 next_upload_txn,
665 MilliSecondsSinceUnixEpoch::now(),
666 request,
667 Self::HIGH_PRIORITY,
668 )
669 .await
670 .map_err(RoomSendQueueStorageError::StateStoreError)?;
671
672 Ok(())
673 }
674
675 #[instrument(skip(self, handles))]
682 pub(super) async fn abort_upload(
683 &self,
684 event_txn: &TransactionId,
685 handles: &MediaHandles,
686 ) -> Result<bool, RoomSendQueueStorageError> {
687 let mut guard = self.store.lock().await;
688 let client = guard.client()?;
689
690 debug!("trying to abort an upload");
692
693 let store = client.state_store();
694
695 let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
696 let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
697
698 let mut removed_dependent_upload = false;
699 let mut removed_dependent_event = false;
700
701 if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn
702 && store.remove_send_queue_request(&self.room_id, thumbnail_txn).await?
703 {
704 trace!("could remove thumbnail request, removing 2 dependent requests now");
707
708 if let Some(info) = guard.being_sent.as_ref()
710 && info.transaction_id == *thumbnail_txn
711 {
712 let info = guard.being_sent.take().unwrap();
714 if info.cancel_upload() {
715 trace!("aborted ongoing thumbnail upload");
716 }
717 }
718
719 removed_dependent_upload = store
721 .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
722 .await?;
723
724 if !removed_dependent_upload {
725 warn!("unable to find the dependent file upload request");
726 }
727
728 removed_dependent_event =
729 store.remove_dependent_queued_request(&self.room_id, &event_as_dependent).await?;
730
731 if !removed_dependent_event {
732 warn!("unable to find the dependent media event upload request");
733 }
734 }
735
736 if !removed_dependent_upload {
743 if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
744 trace!("could remove file upload request, removing 1 dependent request");
747
748 if let Some(info) = guard.being_sent.as_ref()
750 && info.transaction_id == handles.upload_file_txn
751 {
752 let info = guard.being_sent.take().unwrap();
754 if info.cancel_upload() {
755 trace!("aborted ongoing file upload");
756 }
757 }
758
759 if !store
761 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
762 .await?
763 {
764 warn!("unable to find the dependent media event upload request");
765 }
766 } else {
767 if !removed_dependent_event
772 && !store
773 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
774 .await?
775 {
776 debug!("uploads already happened => deferring to aborting an event sending");
779 return Ok(false);
780 }
781 }
782 }
783
784 {
787 let media_store = client.media_store().lock().await?;
788 media_store
789 .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
790 .await?;
791 if let Some(txn) = &handles.upload_thumbnail_txn {
792 media_store.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
793 }
794 }
795
796 debug!("successfully aborted!");
797 Ok(true)
798 }
799
800 #[instrument(skip(self, caption, formatted_caption))]
801 pub(super) async fn edit_media_caption(
802 &self,
803 txn: &TransactionId,
804 caption: Option<String>,
805 formatted_caption: Option<FormattedBody>,
806 mentions: Option<Mentions>,
807 ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
808 use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
810
811 let guard = self.store.lock().await;
812 let client = guard.client()?;
813 let store = client.state_store();
814
815 {
823 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
826
827 if let Some(found) =
828 dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
829 {
830 trace!("found the caption to edit in a dependent request");
831
832 let DependentQueuedRequestKind::FinishUpload {
833 mut local_echo,
834 file_upload,
835 thumbnail_info,
836 } = found.kind
837 else {
838 return Err(InvalidMediaCaptionEdit);
839 };
840
841 if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
842 return Err(InvalidMediaCaptionEdit);
843 }
844
845 let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
846 local_echo: local_echo.clone(),
847 file_upload,
848 thumbnail_info,
849 };
850 store
851 .update_dependent_queued_request(
852 &self.room_id,
853 &found.own_transaction_id,
854 new_dependent_request,
855 )
856 .await?;
857
858 trace!("caption successfully updated");
859 return Ok(Some((*local_echo).into()));
860 }
861 }
862
863 let requests = store.load_send_queue_requests(&self.room_id).await?;
864 let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
865 return Ok(None);
867 };
868
869 trace!("found the caption to edit as a request");
870
871 let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
872 return Err(InvalidMediaCaptionEdit);
873 };
874
875 let deserialized = serialized_content.deserialize()?;
876 let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
877 return Err(InvalidMediaCaptionEdit);
878 };
879
880 if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
881 return Err(InvalidMediaCaptionEdit);
882 }
883
884 let any_content: AnyMessageLikeEventContent = content.into();
885 let new_serialized = SerializableEventContent::new(&any_content.clone())?;
886
887 if let Some(being_sent) = guard.being_sent.as_ref()
889 && being_sent.transaction_id == *txn
890 {
891 store
893 .save_dependent_queued_request(
894 &self.room_id,
895 txn,
896 ChildTransactionId::new(),
897 MilliSecondsSinceUnixEpoch::now(),
898 DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
899 )
900 .await?;
901
902 trace!("media event was being sent, pushed a dependent edit");
903 return Ok(Some(any_content));
904 }
905
906 store
908 .update_send_queue_request(
909 &self.room_id,
910 txn,
911 QueuedRequestKind::Event { content: new_serialized },
912 )
913 .await?;
914
915 trace!("media event was not being sent, updated local echo");
916 Ok(Some(any_content))
917 }
918}
919
920async fn update_media_cache_keys_after_upload(
923 client: &Client,
924 file_upload_txn: &OwnedTransactionId,
925 thumbnail_info: Option<FinishUploadThumbnailInfo>,
926 sent_media: &SentMediaInfo,
927) -> Result<(), RoomSendQueueError> {
928 let from_req = Media::make_local_file_media_request(file_upload_txn);
930
931 trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
932 let media_store =
933 client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
934
935 media_store
937 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
938 .await
939 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
940
941 media_store
942 .replace_media_key(
943 &from_req,
944 &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
945 )
946 .await
947 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
948
949 if let Some((info, new_source)) = thumbnail_info.as_ref().zip(sent_media.thumbnail.clone()) {
951 let from_req = if let Some((height, width)) = info.height.zip(info.width) {
954 Media::make_local_thumbnail_media_request(&info.txn, height, width)
955 } else {
956 Media::make_local_file_media_request(&info.txn)
957 };
958
959 trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
960
961 media_store
963 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
964 .await
965 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
966
967 media_store
968 .replace_media_key(
969 &from_req,
970 &MediaRequestParameters { source: new_source, format: MediaFormat::File },
971 )
972 .await
973 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
974 }
975
976 Ok(())
977}