1#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21 RoomState,
22 media::{MediaFormat, MediaRequestParameters, store::IgnoreMediaRetentionPolicy},
23 store::{
24 ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
25 QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
26 },
27};
28#[cfg(feature = "unstable-msc4274")]
29use matrix_sdk_base::{
30 media::UniqueKey,
31 store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
32};
33use mime::Mime;
34#[cfg(feature = "unstable-msc4274")]
35use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
36use ruma::{
37 MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
38 events::{
39 AnyMessageLikeEventContent, Mentions,
40 room::{
41 MediaSource, ThumbnailInfo,
42 message::{FormattedBody, MessageType, RoomMessageEventContent},
43 },
44 },
45};
46use tracing::{Span, debug, error, instrument, trace, warn};
47
48use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError};
49use crate::{
50 Client, Media, Room,
51 attachment::{AttachmentConfig, Thumbnail},
52 room::edit::update_media_caption,
53 send_queue::{
54 LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
55 SendHandle,
56 },
57};
58#[cfg(feature = "unstable-msc4274")]
59use crate::{
60 attachment::{GalleryConfig, GalleryItemInfo},
61 send_queue::GalleryItemQueueInfo,
62};
63
64fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
67 match &mut echo.msgtype {
70 MessageType::Audio(event) => {
71 event.source = sent.file;
72 }
73 MessageType::File(event) => {
74 event.source = sent.file;
75 if let Some(info) = event.info.as_mut() {
76 info.thumbnail_source = sent.thumbnail;
77 }
78 }
79 MessageType::Image(event) => {
80 event.source = sent.file;
81 if let Some(info) = event.info.as_mut() {
82 info.thumbnail_source = sent.thumbnail;
83 }
84 }
85 MessageType::Video(event) => {
86 event.source = sent.file;
87 if let Some(info) = event.info.as_mut() {
88 info.thumbnail_source = sent.thumbnail;
89 }
90 }
91
92 _ => {
93 error!("Invalid message type in database: {}", echo.msgtype());
97 debug_assert!(false, "invalid message type in database");
99 }
100 }
101}
102
103#[cfg(feature = "unstable-msc4274")]
106fn update_gallery_event_after_upload(
107 echo: &mut RoomMessageEventContent,
108 sent: HashMap<String, AccumulatedSentMediaInfo>,
109) {
110 let MessageType::Gallery(gallery) = &mut echo.msgtype else {
111 error!("Invalid gallery item types in database");
115 debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
117 return;
118 };
119
120 for itemtype in gallery.itemtypes.iter_mut() {
123 match itemtype {
124 GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
125 Some(sent) => event.source = sent.file.clone(),
126 None => error!("key for item {:?} does not exist on gallery event", event.source),
127 },
128 GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
129 Some(sent) => {
130 event.source = sent.file.clone();
131 if let Some(info) = event.info.as_mut() {
132 info.thumbnail_source = sent.thumbnail.clone();
133 }
134 }
135 None => error!("key for item {:?} does not exist on gallery event", event.source),
136 },
137 GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
138 Some(sent) => {
139 event.source = sent.file.clone();
140 if let Some(info) = event.info.as_mut() {
141 info.thumbnail_source = sent.thumbnail.clone();
142 }
143 }
144 None => error!("key for item {:?} does not exist on gallery event", event.source),
145 },
146 GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
147 Some(sent) => {
148 event.source = sent.file.clone();
149 if let Some(info) = event.info.as_mut() {
150 info.thumbnail_source = sent.thumbnail.clone();
151 }
152 }
153 None => error!("key for item {:?} does not exist on gallery event", event.source),
154 },
155
156 _ => {
157 error!("Invalid gallery item types in database");
161 debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
163 }
164 }
165 }
166}
167
168#[derive(Default)]
169struct MediaCacheResult {
170 upload_thumbnail_txn: Option<OwnedTransactionId>,
171 event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
172 queue_thumbnail_info: Option<QueueThumbnailInfo>,
173}
174
175impl RoomSendQueue {
176 #[instrument(skip_all, fields(event_txn))]
196 pub async fn send_attachment(
197 &self,
198 filename: impl Into<String>,
199 content_type: Mime,
200 data: Vec<u8>,
201 mut config: AttachmentConfig,
202 ) -> Result<SendHandle, RoomSendQueueError> {
203 let Some(room) = self.inner.room.get() else {
204 return Err(RoomSendQueueError::RoomDisappeared);
205 };
206
207 if room.state() != RoomState::Joined {
208 return Err(RoomSendQueueError::RoomNotJoined);
209 }
210
211 let filename = filename.into();
212 let upload_file_txn = TransactionId::new();
213 let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
214
215 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
216 debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
217
218 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
219
220 let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
221 RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
222 .await?;
223
224 let event_content = room
226 .make_media_event(
227 Room::make_attachment_type(
228 &content_type,
229 filename,
230 file_media_request.source.clone(),
231 config.caption,
232 config.formatted_caption,
233 config.info,
234 event_thumbnail_info,
235 ),
236 config.mentions,
237 config.reply,
238 )
239 .await
240 .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
241
242 let created_at = MilliSecondsSinceUnixEpoch::now();
243
244 self.inner
246 .queue
247 .push_media(
248 event_content.clone(),
249 content_type,
250 send_event_txn.clone().into(),
251 created_at,
252 upload_file_txn.clone(),
253 file_media_request,
254 queue_thumbnail_info,
255 )
256 .await?;
257
258 trace!("manager sends a media to the background task");
259
260 self.inner.notifier.notify_one();
261
262 let send_handle = SendHandle {
263 room: self.clone(),
264 transaction_id: send_event_txn.clone().into(),
265 media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
266 created_at,
267 };
268
269 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
270 transaction_id: send_event_txn.clone().into(),
271 content: LocalEchoContent::Event {
272 serialized_event: SerializableEventContent::new(&event_content.into())
273 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
274 send_handle: send_handle.clone(),
275 send_error: None,
276 },
277 }));
278
279 Ok(send_handle)
280 }
281
282 #[cfg(feature = "unstable-msc4274")]
302 #[instrument(skip_all, fields(event_txn))]
303 pub async fn send_gallery(
304 &self,
305 gallery: GalleryConfig,
306 ) -> Result<SendHandle, RoomSendQueueError> {
307 let Some(room) = self.inner.room.get() else {
308 return Err(RoomSendQueueError::RoomDisappeared);
309 };
310
311 if room.state() != RoomState::Joined {
312 return Err(RoomSendQueueError::RoomNotJoined);
313 }
314
315 if gallery.is_empty() {
316 return Err(RoomSendQueueError::EmptyGallery);
317 }
318
319 let send_event_txn =
320 gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
321
322 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
323
324 let mut item_types = Vec::with_capacity(gallery.len());
325 let mut item_queue_infos = Vec::with_capacity(gallery.len());
326 let mut media_handles = Vec::with_capacity(gallery.len());
327
328 for item_info in gallery.items {
329 let GalleryItemInfo { filename, content_type, data, .. } = item_info;
330
331 let upload_file_txn = TransactionId::new();
332
333 debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
334
335 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
336
337 let MediaCacheResult {
338 upload_thumbnail_txn,
339 event_thumbnail_info,
340 queue_thumbnail_info,
341 } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
342 .await?;
343
344 item_types.push(Room::make_gallery_item_type(
345 &content_type,
346 filename,
347 file_media_request.source.clone(),
348 item_info.caption,
349 item_info.formatted_caption,
350 Some(item_info.attachment_info),
351 event_thumbnail_info,
352 ));
353
354 item_queue_infos.push(GalleryItemQueueInfo {
355 content_type,
356 upload_file_txn: upload_file_txn.clone(),
357 file_media_request,
358 thumbnail: queue_thumbnail_info,
359 });
360
361 media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
362 }
363
364 let event_content = room
366 .make_media_event(
367 MessageType::Gallery(GalleryMessageEventContent::new(
368 gallery.caption.unwrap_or_default(),
369 gallery.formatted_caption,
370 item_types,
371 )),
372 gallery.mentions,
373 gallery.reply,
374 )
375 .await
376 .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
377
378 let created_at = MilliSecondsSinceUnixEpoch::now();
379
380 self.inner
382 .queue
383 .push_gallery(
384 event_content.clone(),
385 send_event_txn.clone().into(),
386 created_at,
387 item_queue_infos,
388 )
389 .await?;
390
391 trace!("manager sends a gallery to the background task");
392
393 self.inner.notifier.notify_one();
394
395 let send_handle = SendHandle {
396 room: self.clone(),
397 transaction_id: send_event_txn.clone().into(),
398 media_handles,
399 created_at,
400 };
401
402 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
403 transaction_id: send_event_txn.clone().into(),
404 content: LocalEchoContent::Event {
405 serialized_event: SerializableEventContent::new(&event_content.into())
406 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
407 send_handle: send_handle.clone(),
408 send_error: None,
409 },
410 }));
411
412 Ok(send_handle)
413 }
414
415 async fn cache_media(
416 room: &Room,
417 data: Vec<u8>,
418 thumbnail: Option<Thumbnail>,
419 file_media_request: &MediaRequestParameters,
420 ) -> Result<MediaCacheResult, RoomSendQueueError> {
421 let client = room.client();
422 let media_store =
423 client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
424
425 media_store
427 .add_media_content(
428 file_media_request,
429 data,
430 IgnoreMediaRetentionPolicy::Yes,
432 )
433 .await
434 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
435
436 if let Some(thumbnail) = thumbnail {
438 let txn = TransactionId::new();
439 trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
440
441 let (data, content_type, thumbnail_info) = thumbnail.into_parts();
444 let file_size = data.len();
445
446 let thumbnail_media_request = Media::make_local_file_media_request(&txn);
448 media_store
449 .add_media_content(
450 &thumbnail_media_request,
451 data,
452 IgnoreMediaRetentionPolicy::Yes,
454 )
455 .await
456 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
457
458 Ok(MediaCacheResult {
459 upload_thumbnail_txn: Some(txn.clone()),
460 event_thumbnail_info: Some((
461 thumbnail_media_request.source.clone(),
462 thumbnail_info,
463 )),
464 queue_thumbnail_info: Some(QueueThumbnailInfo {
465 finish_upload_thumbnail_info: FinishUploadThumbnailInfo {
466 txn,
467 width: None,
468 height: None,
469 },
470 media_request_parameters: thumbnail_media_request,
471 content_type,
472 file_size,
473 }),
474 })
475 } else {
476 Ok(Default::default())
477 }
478 }
479}
480
481impl QueueStorage {
482 #[allow(clippy::too_many_arguments)]
484 pub(super) async fn handle_dependent_finish_upload(
485 &self,
486 client: &Client,
487 event_txn: OwnedTransactionId,
488 parent_key: SentRequestKey,
489 mut local_echo: RoomMessageEventContent,
490 file_upload_txn: OwnedTransactionId,
491 thumbnail_info: Option<FinishUploadThumbnailInfo>,
492 new_updates: &mut Vec<RoomSendQueueUpdate>,
493 ) -> Result<(), RoomSendQueueError> {
494 let sent_media = parent_key
496 .into_media()
497 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
498
499 update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
500 .await?;
501 update_media_event_after_upload(&mut local_echo, sent_media);
502
503 let new_content = SerializableEventContent::new(&local_echo.into())
504 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
505
506 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
509 transaction_id: event_txn.clone(),
510 new_content: new_content.clone(),
511 });
512
513 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
514
515 client
516 .state_store()
517 .save_send_queue_request(
518 &self.room_id,
519 event_txn,
520 MilliSecondsSinceUnixEpoch::now(),
521 new_content.into(),
522 Self::HIGH_PRIORITY,
523 )
524 .await
525 .map_err(RoomSendQueueStorageError::StateStoreError)?;
526
527 Ok(())
528 }
529
530 #[cfg(feature = "unstable-msc4274")]
533 #[allow(clippy::too_many_arguments)]
534 pub(super) async fn handle_dependent_finish_gallery_upload(
535 &self,
536 client: &Client,
537 event_txn: OwnedTransactionId,
538 parent_key: SentRequestKey,
539 mut local_echo: RoomMessageEventContent,
540 item_infos: Vec<FinishGalleryItemInfo>,
541 new_updates: &mut Vec<RoomSendQueueUpdate>,
542 ) -> Result<(), RoomSendQueueError> {
543 let sent_gallery = parent_key
545 .into_media()
546 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
547
548 let mut sent_media_vec = sent_gallery.accumulated;
549 sent_media_vec.push(AccumulatedSentMediaInfo {
550 file: sent_gallery.file,
551 thumbnail: sent_gallery.thumbnail,
552 });
553
554 let mut sent_infos = HashMap::new();
555
556 for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
557 let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
558
559 let from_req = Media::make_local_file_media_request(&file_upload_txn);
562 sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
563
564 update_media_cache_keys_after_upload(
565 client,
566 &file_upload_txn,
567 thumbnail_info,
568 &sent_media.into(),
569 )
570 .await?;
571 }
572
573 update_gallery_event_after_upload(&mut local_echo, sent_infos);
574
575 let new_content = SerializableEventContent::new(&local_echo.into())
576 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
577
578 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
581 transaction_id: event_txn.clone(),
582 new_content: new_content.clone(),
583 });
584
585 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
586
587 client
588 .state_store()
589 .save_send_queue_request(
590 &self.room_id,
591 event_txn,
592 MilliSecondsSinceUnixEpoch::now(),
593 new_content.into(),
594 Self::HIGH_PRIORITY,
595 )
596 .await
597 .map_err(RoomSendQueueStorageError::StateStoreError)?;
598
599 Ok(())
600 }
601
602 #[allow(clippy::too_many_arguments)]
605 pub(super) async fn handle_dependent_file_or_thumbnail_upload(
606 &self,
607 client: &Client,
608 next_upload_txn: OwnedTransactionId,
609 parent_key: SentRequestKey,
610 content_type: String,
611 cache_key: MediaRequestParameters,
612 event_txn: OwnedTransactionId,
613 parent_is_thumbnail_upload: bool,
614 ) -> Result<(), RoomSendQueueError> {
615 let sent_media = parent_key
618 .into_media()
619 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
620
621 if parent_is_thumbnail_upload {
624 debug_assert!(sent_media.thumbnail.is_none());
625 if sent_media.thumbnail.is_some() {
626 warn!("unexpected thumbnail for a thumbnail!");
627 }
628 }
629
630 trace!(
631 related_to = %event_txn,
632 "done uploading file or thumbnail, now queuing the dependent file \
633 or thumbnail upload request",
634 );
635
636 #[cfg(feature = "unstable-msc4274")]
642 let accumulated = if parent_is_thumbnail_upload {
643 sent_media.accumulated
644 } else {
645 let mut accumulated = sent_media.accumulated;
646 accumulated.push(AccumulatedSentMediaInfo {
647 file: sent_media.file.clone(),
648 thumbnail: sent_media.thumbnail,
649 });
650 accumulated
651 };
652
653 let request = QueuedRequestKind::MediaUpload {
654 content_type,
655 cache_key,
656 thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
659 related_to: event_txn,
660 #[cfg(feature = "unstable-msc4274")]
661 accumulated,
662 };
663
664 client
665 .state_store()
666 .save_send_queue_request(
667 &self.room_id,
668 next_upload_txn,
669 MilliSecondsSinceUnixEpoch::now(),
670 request,
671 Self::HIGH_PRIORITY,
672 )
673 .await
674 .map_err(RoomSendQueueStorageError::StateStoreError)?;
675
676 Ok(())
677 }
678
679 #[instrument(skip(self, handles))]
686 pub(super) async fn abort_upload(
687 &self,
688 event_txn: &TransactionId,
689 handles: &MediaHandles,
690 ) -> Result<bool, RoomSendQueueStorageError> {
691 let mut guard = self.store.lock().await;
692 let client = guard.client()?;
693
694 debug!("trying to abort an upload");
696
697 let store = client.state_store();
698
699 let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
700 let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
701
702 let mut removed_dependent_upload = false;
703 let mut removed_dependent_event = false;
704
705 if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn
706 && store.remove_send_queue_request(&self.room_id, thumbnail_txn).await?
707 {
708 trace!("could remove thumbnail request, removing 2 dependent requests now");
711
712 if let Some(info) = guard.being_sent.as_ref()
714 && info.transaction_id == *thumbnail_txn
715 {
716 let info = guard.being_sent.take().unwrap();
718 if info.cancel_upload() {
719 trace!("aborted ongoing thumbnail upload");
720 }
721 }
722
723 removed_dependent_upload = store
725 .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
726 .await?;
727
728 if !removed_dependent_upload {
729 warn!("unable to find the dependent file upload request");
730 }
731
732 removed_dependent_event =
733 store.remove_dependent_queued_request(&self.room_id, &event_as_dependent).await?;
734
735 if !removed_dependent_event {
736 warn!("unable to find the dependent media event upload request");
737 }
738 }
739
740 if !removed_dependent_upload {
747 if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
748 trace!("could remove file upload request, removing 1 dependent request");
751
752 if let Some(info) = guard.being_sent.as_ref()
754 && info.transaction_id == handles.upload_file_txn
755 {
756 let info = guard.being_sent.take().unwrap();
758 if info.cancel_upload() {
759 trace!("aborted ongoing file upload");
760 }
761 }
762
763 if !store
765 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
766 .await?
767 {
768 warn!("unable to find the dependent media event upload request");
769 }
770 } else {
771 if !removed_dependent_event
776 && !store
777 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
778 .await?
779 {
780 debug!("uploads already happened => deferring to aborting an event sending");
783 return Ok(false);
784 }
785 }
786 }
787
788 {
791 let media_store = client.media_store().lock().await?;
792 media_store
793 .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
794 .await?;
795 if let Some(txn) = &handles.upload_thumbnail_txn {
796 media_store.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
797 }
798 }
799
800 debug!("successfully aborted!");
801 Ok(true)
802 }
803
804 #[instrument(skip(self, caption, formatted_caption))]
805 pub(super) async fn edit_media_caption(
806 &self,
807 txn: &TransactionId,
808 caption: Option<String>,
809 formatted_caption: Option<FormattedBody>,
810 mentions: Option<Mentions>,
811 ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
812 use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
814
815 let guard = self.store.lock().await;
816 let client = guard.client()?;
817 let store = client.state_store();
818
819 {
827 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
830
831 if let Some(found) =
832 dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
833 {
834 trace!("found the caption to edit in a dependent request");
835
836 let DependentQueuedRequestKind::FinishUpload {
837 mut local_echo,
838 file_upload,
839 thumbnail_info,
840 } = found.kind
841 else {
842 return Err(InvalidMediaCaptionEdit);
843 };
844
845 if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
846 return Err(InvalidMediaCaptionEdit);
847 }
848
849 let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
850 local_echo: local_echo.clone(),
851 file_upload,
852 thumbnail_info,
853 };
854 store
855 .update_dependent_queued_request(
856 &self.room_id,
857 &found.own_transaction_id,
858 new_dependent_request,
859 )
860 .await?;
861
862 trace!("caption successfully updated");
863 return Ok(Some((*local_echo).into()));
864 }
865 }
866
867 let requests = store.load_send_queue_requests(&self.room_id).await?;
868 let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
869 return Ok(None);
871 };
872
873 trace!("found the caption to edit as a request");
874
875 let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
876 return Err(InvalidMediaCaptionEdit);
877 };
878
879 let deserialized = serialized_content.deserialize()?;
880 let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
881 return Err(InvalidMediaCaptionEdit);
882 };
883
884 if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
885 return Err(InvalidMediaCaptionEdit);
886 }
887
888 let any_content: AnyMessageLikeEventContent = content.into();
889 let new_serialized = SerializableEventContent::new(&any_content.clone())?;
890
891 if let Some(being_sent) = guard.being_sent.as_ref()
893 && being_sent.transaction_id == *txn
894 {
895 store
897 .save_dependent_queued_request(
898 &self.room_id,
899 txn,
900 ChildTransactionId::new(),
901 MilliSecondsSinceUnixEpoch::now(),
902 DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
903 )
904 .await?;
905
906 trace!("media event was being sent, pushed a dependent edit");
907 return Ok(Some(any_content));
908 }
909
910 store
912 .update_send_queue_request(
913 &self.room_id,
914 txn,
915 QueuedRequestKind::Event { content: new_serialized },
916 )
917 .await?;
918
919 trace!("media event was not being sent, updated local echo");
920 Ok(Some(any_content))
921 }
922}
923
924async fn update_media_cache_keys_after_upload(
927 client: &Client,
928 file_upload_txn: &OwnedTransactionId,
929 thumbnail_info: Option<FinishUploadThumbnailInfo>,
930 sent_media: &SentMediaInfo,
931) -> Result<(), RoomSendQueueError> {
932 let from_req = Media::make_local_file_media_request(file_upload_txn);
934
935 trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
936 let media_store =
937 client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
938
939 media_store
941 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
942 .await
943 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
944
945 media_store
946 .replace_media_key(
947 &from_req,
948 &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
949 )
950 .await
951 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
952
953 if let Some((info, new_source)) = thumbnail_info.as_ref().zip(sent_media.thumbnail.clone()) {
955 let from_req = if let Some((height, width)) = info.height.zip(info.width) {
958 Media::make_local_thumbnail_media_request(&info.txn, height, width)
959 } else {
960 Media::make_local_file_media_request(&info.txn)
961 };
962
963 trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
964
965 media_store
967 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
968 .await
969 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
970
971 media_store
972 .replace_media_key(
973 &from_req,
974 &MediaRequestParameters { source: new_source, format: MediaFormat::File },
975 )
976 .await
977 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
978 }
979
980 Ok(())
981}