1#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21 event_cache::store::media::IgnoreMediaRetentionPolicy,
22 media::{MediaFormat, MediaRequestParameters},
23 store::{
24 ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
25 QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
26 },
27 RoomState,
28};
29#[cfg(feature = "unstable-msc4274")]
30use matrix_sdk_base::{
31 media::UniqueKey,
32 store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
33};
34use mime::Mime;
35#[cfg(feature = "unstable-msc4274")]
36use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
37use ruma::{
38 events::{
39 room::{
40 message::{FormattedBody, MessageType, RoomMessageEventContent},
41 MediaSource, ThumbnailInfo,
42 },
43 AnyMessageLikeEventContent, Mentions,
44 },
45 MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
46};
47use tracing::{debug, error, instrument, trace, warn, Span};
48
49use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError};
50use crate::{
51 attachment::{AttachmentConfig, Thumbnail},
52 room::edit::update_media_caption,
53 send_queue::{
54 LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
55 SendHandle,
56 },
57 Client, Media, Room,
58};
59#[cfg(feature = "unstable-msc4274")]
60use crate::{
61 attachment::{GalleryConfig, GalleryItemInfo},
62 send_queue::GalleryItemQueueInfo,
63};
64
65fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
68 match &mut echo.msgtype {
71 MessageType::Audio(event) => {
72 event.source = sent.file;
73 }
74 MessageType::File(event) => {
75 event.source = sent.file;
76 if let Some(info) = event.info.as_mut() {
77 info.thumbnail_source = sent.thumbnail;
78 }
79 }
80 MessageType::Image(event) => {
81 event.source = sent.file;
82 if let Some(info) = event.info.as_mut() {
83 info.thumbnail_source = sent.thumbnail;
84 }
85 }
86 MessageType::Video(event) => {
87 event.source = sent.file;
88 if let Some(info) = event.info.as_mut() {
89 info.thumbnail_source = sent.thumbnail;
90 }
91 }
92
93 _ => {
94 error!("Invalid message type in database: {}", echo.msgtype());
98 debug_assert!(false, "invalid message type in database");
100 }
101 }
102}
103
104#[cfg(feature = "unstable-msc4274")]
107fn update_gallery_event_after_upload(
108 echo: &mut RoomMessageEventContent,
109 sent: HashMap<String, AccumulatedSentMediaInfo>,
110) {
111 let MessageType::Gallery(gallery) = &mut echo.msgtype else {
112 error!("Invalid gallery item types in database");
116 debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
118 return;
119 };
120
121 for itemtype in gallery.itemtypes.iter_mut() {
124 match itemtype {
125 GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
126 Some(sent) => event.source = sent.file.clone(),
127 None => error!("key for item {:?} does not exist on gallery event", event.source),
128 },
129 GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
130 Some(sent) => {
131 event.source = sent.file.clone();
132 if let Some(info) = event.info.as_mut() {
133 info.thumbnail_source = sent.thumbnail.clone();
134 }
135 }
136 None => error!("key for item {:?} does not exist on gallery event", event.source),
137 },
138 GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
139 Some(sent) => {
140 event.source = sent.file.clone();
141 if let Some(info) = event.info.as_mut() {
142 info.thumbnail_source = sent.thumbnail.clone();
143 }
144 }
145 None => error!("key for item {:?} does not exist on gallery event", event.source),
146 },
147 GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
148 Some(sent) => {
149 event.source = sent.file.clone();
150 if let Some(info) = event.info.as_mut() {
151 info.thumbnail_source = sent.thumbnail.clone();
152 }
153 }
154 None => error!("key for item {:?} does not exist on gallery event", event.source),
155 },
156
157 _ => {
158 error!("Invalid gallery item types in database");
162 debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
164 }
165 }
166 }
167}
168
169#[derive(Default)]
170struct MediaCacheResult {
171 upload_thumbnail_txn: Option<OwnedTransactionId>,
172 event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
173 queue_thumbnail_info: Option<QueueThumbnailInfo>,
174}
175
176impl RoomSendQueue {
177 #[instrument(skip_all, fields(event_txn))]
197 pub async fn send_attachment(
198 &self,
199 filename: impl Into<String>,
200 content_type: Mime,
201 data: Vec<u8>,
202 mut config: AttachmentConfig,
203 ) -> Result<SendHandle, RoomSendQueueError> {
204 let Some(room) = self.inner.room.get() else {
205 return Err(RoomSendQueueError::RoomDisappeared);
206 };
207
208 if room.state() != RoomState::Joined {
209 return Err(RoomSendQueueError::RoomNotJoined);
210 }
211
212 let filename = filename.into();
213 let upload_file_txn = TransactionId::new();
214 let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
215
216 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
217 debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
218
219 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
220
221 let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
222 RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
223 .await?;
224
225 let event_content = room
227 .make_media_event(
228 Room::make_attachment_type(
229 &content_type,
230 filename,
231 file_media_request.source.clone(),
232 config.caption,
233 config.formatted_caption,
234 config.info,
235 event_thumbnail_info,
236 ),
237 config.mentions,
238 config.reply,
239 )
240 .await
241 .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
242
243 let created_at = MilliSecondsSinceUnixEpoch::now();
244
245 self.inner
247 .queue
248 .push_media(
249 event_content.clone(),
250 content_type,
251 send_event_txn.clone().into(),
252 created_at,
253 upload_file_txn.clone(),
254 file_media_request,
255 queue_thumbnail_info,
256 )
257 .await?;
258
259 trace!("manager sends a media to the background task");
260
261 self.inner.notifier.notify_one();
262
263 let send_handle = SendHandle {
264 room: self.clone(),
265 transaction_id: send_event_txn.clone().into(),
266 media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
267 created_at,
268 };
269
270 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
271 transaction_id: send_event_txn.clone().into(),
272 content: LocalEchoContent::Event {
273 serialized_event: SerializableEventContent::new(&event_content.into())
274 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
275 send_handle: send_handle.clone(),
276 send_error: None,
277 },
278 }));
279
280 Ok(send_handle)
281 }
282
283 #[cfg(feature = "unstable-msc4274")]
303 #[instrument(skip_all, fields(event_txn))]
304 pub async fn send_gallery(
305 &self,
306 gallery: GalleryConfig,
307 ) -> Result<SendHandle, RoomSendQueueError> {
308 let Some(room) = self.inner.room.get() else {
309 return Err(RoomSendQueueError::RoomDisappeared);
310 };
311
312 if room.state() != RoomState::Joined {
313 return Err(RoomSendQueueError::RoomNotJoined);
314 }
315
316 if gallery.is_empty() {
317 return Err(RoomSendQueueError::EmptyGallery);
318 }
319
320 let send_event_txn =
321 gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
322
323 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
324
325 let mut item_types = Vec::with_capacity(gallery.len());
326 let mut item_queue_infos = Vec::with_capacity(gallery.len());
327 let mut media_handles = Vec::with_capacity(gallery.len());
328
329 for item_info in gallery.items {
330 let GalleryItemInfo { filename, content_type, data, .. } = item_info;
331
332 let upload_file_txn = TransactionId::new();
333
334 debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
335
336 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
337
338 let MediaCacheResult {
339 upload_thumbnail_txn,
340 event_thumbnail_info,
341 queue_thumbnail_info,
342 } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
343 .await?;
344
345 item_types.push(Room::make_gallery_item_type(
346 &content_type,
347 filename,
348 file_media_request.source.clone(),
349 item_info.caption,
350 item_info.formatted_caption,
351 Some(item_info.attachment_info),
352 event_thumbnail_info,
353 ));
354
355 item_queue_infos.push(GalleryItemQueueInfo {
356 content_type,
357 upload_file_txn: upload_file_txn.clone(),
358 file_media_request,
359 thumbnail: queue_thumbnail_info,
360 });
361
362 media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
363 }
364
365 let event_content = room
367 .make_media_event(
368 MessageType::Gallery(GalleryMessageEventContent::new(
369 gallery.caption.unwrap_or_default(),
370 gallery.formatted_caption,
371 item_types,
372 )),
373 gallery.mentions,
374 gallery.reply,
375 )
376 .await
377 .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
378
379 let created_at = MilliSecondsSinceUnixEpoch::now();
380
381 self.inner
383 .queue
384 .push_gallery(
385 event_content.clone(),
386 send_event_txn.clone().into(),
387 created_at,
388 item_queue_infos,
389 )
390 .await?;
391
392 trace!("manager sends a gallery to the background task");
393
394 self.inner.notifier.notify_one();
395
396 let send_handle = SendHandle {
397 room: self.clone(),
398 transaction_id: send_event_txn.clone().into(),
399 media_handles,
400 created_at,
401 };
402
403 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
404 transaction_id: send_event_txn.clone().into(),
405 content: LocalEchoContent::Event {
406 serialized_event: SerializableEventContent::new(&event_content.into())
407 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
408 send_handle: send_handle.clone(),
409 send_error: None,
410 },
411 }));
412
413 Ok(send_handle)
414 }
415
416 async fn cache_media(
417 room: &Room,
418 data: Vec<u8>,
419 thumbnail: Option<Thumbnail>,
420 file_media_request: &MediaRequestParameters,
421 ) -> Result<MediaCacheResult, RoomSendQueueError> {
422 let client = room.client();
423 let cache_store = client
424 .event_cache_store()
425 .lock()
426 .await
427 .map_err(RoomSendQueueStorageError::LockError)?;
428
429 cache_store
431 .add_media_content(
432 file_media_request,
433 data,
434 IgnoreMediaRetentionPolicy::Yes,
436 )
437 .await
438 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
439
440 if let Some(thumbnail) = thumbnail {
442 let txn = TransactionId::new();
443 trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
444
445 let (data, content_type, thumbnail_info) = thumbnail.into_parts();
448 let file_size = data.len();
449
450 let thumbnail_media_request = Media::make_local_file_media_request(&txn);
452 cache_store
453 .add_media_content(
454 &thumbnail_media_request,
455 data,
456 IgnoreMediaRetentionPolicy::Yes,
458 )
459 .await
460 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
461
462 Ok(MediaCacheResult {
463 upload_thumbnail_txn: Some(txn.clone()),
464 event_thumbnail_info: Some((
465 thumbnail_media_request.source.clone(),
466 thumbnail_info,
467 )),
468 queue_thumbnail_info: Some(QueueThumbnailInfo {
469 finish_upload_thumbnail_info: FinishUploadThumbnailInfo {
470 txn,
471 width: None,
472 height: None,
473 },
474 media_request_parameters: thumbnail_media_request,
475 content_type,
476 file_size,
477 }),
478 })
479 } else {
480 Ok(Default::default())
481 }
482 }
483}
484
485impl QueueStorage {
486 #[allow(clippy::too_many_arguments)]
488 pub(super) async fn handle_dependent_finish_upload(
489 &self,
490 client: &Client,
491 event_txn: OwnedTransactionId,
492 parent_key: SentRequestKey,
493 mut local_echo: RoomMessageEventContent,
494 file_upload_txn: OwnedTransactionId,
495 thumbnail_info: Option<FinishUploadThumbnailInfo>,
496 new_updates: &mut Vec<RoomSendQueueUpdate>,
497 ) -> Result<(), RoomSendQueueError> {
498 let sent_media = parent_key
500 .into_media()
501 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
502
503 update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
504 .await?;
505 update_media_event_after_upload(&mut local_echo, sent_media);
506
507 let new_content = SerializableEventContent::new(&local_echo.into())
508 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
509
510 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
513 transaction_id: event_txn.clone(),
514 new_content: new_content.clone(),
515 });
516
517 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
518
519 client
520 .state_store()
521 .save_send_queue_request(
522 &self.room_id,
523 event_txn,
524 MilliSecondsSinceUnixEpoch::now(),
525 new_content.into(),
526 Self::HIGH_PRIORITY,
527 )
528 .await
529 .map_err(RoomSendQueueStorageError::StateStoreError)?;
530
531 Ok(())
532 }
533
534 #[cfg(feature = "unstable-msc4274")]
537 #[allow(clippy::too_many_arguments)]
538 pub(super) async fn handle_dependent_finish_gallery_upload(
539 &self,
540 client: &Client,
541 event_txn: OwnedTransactionId,
542 parent_key: SentRequestKey,
543 mut local_echo: RoomMessageEventContent,
544 item_infos: Vec<FinishGalleryItemInfo>,
545 new_updates: &mut Vec<RoomSendQueueUpdate>,
546 ) -> Result<(), RoomSendQueueError> {
547 let sent_gallery = parent_key
549 .into_media()
550 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
551
552 let mut sent_media_vec = sent_gallery.accumulated;
553 sent_media_vec.push(AccumulatedSentMediaInfo {
554 file: sent_gallery.file,
555 thumbnail: sent_gallery.thumbnail,
556 });
557
558 let mut sent_infos = HashMap::new();
559
560 for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
561 let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
562
563 let from_req = Media::make_local_file_media_request(&file_upload_txn);
566 sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
567
568 update_media_cache_keys_after_upload(
569 client,
570 &file_upload_txn,
571 thumbnail_info,
572 &sent_media.into(),
573 )
574 .await?;
575 }
576
577 update_gallery_event_after_upload(&mut local_echo, sent_infos);
578
579 let new_content = SerializableEventContent::new(&local_echo.into())
580 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
581
582 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
585 transaction_id: event_txn.clone(),
586 new_content: new_content.clone(),
587 });
588
589 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
590
591 client
592 .state_store()
593 .save_send_queue_request(
594 &self.room_id,
595 event_txn,
596 MilliSecondsSinceUnixEpoch::now(),
597 new_content.into(),
598 Self::HIGH_PRIORITY,
599 )
600 .await
601 .map_err(RoomSendQueueStorageError::StateStoreError)?;
602
603 Ok(())
604 }
605
606 #[allow(clippy::too_many_arguments)]
609 pub(super) async fn handle_dependent_file_or_thumbnail_upload(
610 &self,
611 client: &Client,
612 next_upload_txn: OwnedTransactionId,
613 parent_key: SentRequestKey,
614 content_type: String,
615 cache_key: MediaRequestParameters,
616 event_txn: OwnedTransactionId,
617 parent_is_thumbnail_upload: bool,
618 ) -> Result<(), RoomSendQueueError> {
619 let sent_media = parent_key
622 .into_media()
623 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
624
625 if parent_is_thumbnail_upload {
628 debug_assert!(sent_media.thumbnail.is_none());
629 if sent_media.thumbnail.is_some() {
630 warn!("unexpected thumbnail for a thumbnail!");
631 }
632 }
633
634 trace!(
635 related_to = %event_txn,
636 "done uploading file or thumbnail, now queuing the dependent file \
637 or thumbnail upload request",
638 );
639
640 #[cfg(feature = "unstable-msc4274")]
646 let accumulated = if parent_is_thumbnail_upload {
647 sent_media.accumulated
648 } else {
649 let mut accumulated = sent_media.accumulated;
650 accumulated.push(AccumulatedSentMediaInfo {
651 file: sent_media.file.clone(),
652 thumbnail: sent_media.thumbnail,
653 });
654 accumulated
655 };
656
657 let request = QueuedRequestKind::MediaUpload {
658 content_type,
659 cache_key,
660 thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
663 related_to: event_txn,
664 #[cfg(feature = "unstable-msc4274")]
665 accumulated,
666 };
667
668 client
669 .state_store()
670 .save_send_queue_request(
671 &self.room_id,
672 next_upload_txn,
673 MilliSecondsSinceUnixEpoch::now(),
674 request,
675 Self::HIGH_PRIORITY,
676 )
677 .await
678 .map_err(RoomSendQueueStorageError::StateStoreError)?;
679
680 Ok(())
681 }
682
683 #[instrument(skip(self, handles))]
690 pub(super) async fn abort_upload(
691 &self,
692 event_txn: &TransactionId,
693 handles: &MediaHandles,
694 ) -> Result<bool, RoomSendQueueStorageError> {
695 let mut guard = self.store.lock().await;
696 let client = guard.client()?;
697
698 debug!("trying to abort an upload");
700
701 let store = client.state_store();
702
703 let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
704 let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
705
706 let mut removed_dependent_upload = false;
707 let mut removed_dependent_event = false;
708
709 if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn {
710 if store.remove_send_queue_request(&self.room_id, thumbnail_txn).await? {
711 trace!("could remove thumbnail request, removing 2 dependent requests now");
714
715 if let Some(info) = guard.being_sent.as_ref() {
717 if info.transaction_id == *thumbnail_txn {
718 let info = guard.being_sent.take().unwrap();
720 if info.cancel_upload() {
721 trace!("aborted ongoing thumbnail upload");
722 }
723 }
724 }
725
726 removed_dependent_upload = store
728 .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
729 .await?;
730
731 if !removed_dependent_upload {
732 warn!("unable to find the dependent file upload request");
733 }
734
735 removed_dependent_event = store
736 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
737 .await?;
738
739 if !removed_dependent_event {
740 warn!("unable to find the dependent media event upload request");
741 }
742 }
743 }
744
745 if !removed_dependent_upload {
752 if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
753 trace!("could remove file upload request, removing 1 dependent request");
756
757 if let Some(info) = guard.being_sent.as_ref() {
759 if info.transaction_id == handles.upload_file_txn {
760 let info = guard.being_sent.take().unwrap();
762 if info.cancel_upload() {
763 trace!("aborted ongoing file upload");
764 }
765 }
766 }
767
768 if !store
770 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
771 .await?
772 {
773 warn!("unable to find the dependent media event upload request");
774 }
775 } else {
776 if !removed_dependent_event
781 && !store
782 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
783 .await?
784 {
785 debug!("uploads already happened => deferring to aborting an event sending");
788 return Ok(false);
789 }
790 }
791 }
792
793 {
796 let event_cache = client.event_cache_store().lock().await?;
797 event_cache
798 .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
799 .await?;
800 if let Some(txn) = &handles.upload_thumbnail_txn {
801 event_cache.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
802 }
803 }
804
805 debug!("successfully aborted!");
806 Ok(true)
807 }
808
809 #[instrument(skip(self, caption, formatted_caption))]
810 pub(super) async fn edit_media_caption(
811 &self,
812 txn: &TransactionId,
813 caption: Option<String>,
814 formatted_caption: Option<FormattedBody>,
815 mentions: Option<Mentions>,
816 ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
817 use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
819
820 let guard = self.store.lock().await;
821 let client = guard.client()?;
822 let store = client.state_store();
823
824 {
832 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
835
836 if let Some(found) =
837 dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
838 {
839 trace!("found the caption to edit in a dependent request");
840
841 let DependentQueuedRequestKind::FinishUpload {
842 mut local_echo,
843 file_upload,
844 thumbnail_info,
845 } = found.kind
846 else {
847 return Err(InvalidMediaCaptionEdit);
848 };
849
850 if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
851 return Err(InvalidMediaCaptionEdit);
852 }
853
854 let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
855 local_echo: local_echo.clone(),
856 file_upload,
857 thumbnail_info,
858 };
859 store
860 .update_dependent_queued_request(
861 &self.room_id,
862 &found.own_transaction_id,
863 new_dependent_request,
864 )
865 .await?;
866
867 trace!("caption successfully updated");
868 return Ok(Some((*local_echo).into()));
869 }
870 }
871
872 let requests = store.load_send_queue_requests(&self.room_id).await?;
873 let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
874 return Ok(None);
876 };
877
878 trace!("found the caption to edit as a request");
879
880 let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
881 return Err(InvalidMediaCaptionEdit);
882 };
883
884 let deserialized = serialized_content.deserialize()?;
885 let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
886 return Err(InvalidMediaCaptionEdit);
887 };
888
889 if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
890 return Err(InvalidMediaCaptionEdit);
891 }
892
893 let any_content: AnyMessageLikeEventContent = content.into();
894 let new_serialized = SerializableEventContent::new(&any_content.clone())?;
895
896 if let Some(being_sent) = guard.being_sent.as_ref() {
898 if being_sent.transaction_id == *txn {
899 store
901 .save_dependent_queued_request(
902 &self.room_id,
903 txn,
904 ChildTransactionId::new(),
905 MilliSecondsSinceUnixEpoch::now(),
906 DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
907 )
908 .await?;
909
910 trace!("media event was being sent, pushed a dependent edit");
911 return Ok(Some(any_content));
912 }
913 }
914
915 store
917 .update_send_queue_request(
918 &self.room_id,
919 txn,
920 QueuedRequestKind::Event { content: new_serialized },
921 )
922 .await?;
923
924 trace!("media event was not being sent, updated local echo");
925 Ok(Some(any_content))
926 }
927}
928
929async fn update_media_cache_keys_after_upload(
932 client: &Client,
933 file_upload_txn: &OwnedTransactionId,
934 thumbnail_info: Option<FinishUploadThumbnailInfo>,
935 sent_media: &SentMediaInfo,
936) -> Result<(), RoomSendQueueError> {
937 let from_req = Media::make_local_file_media_request(file_upload_txn);
939
940 trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
941 let cache_store =
942 client.event_cache_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
943
944 cache_store
946 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
947 .await
948 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
949
950 cache_store
951 .replace_media_key(
952 &from_req,
953 &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
954 )
955 .await
956 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
957
958 if let Some((info, new_source)) = thumbnail_info.as_ref().zip(sent_media.thumbnail.clone()) {
960 let from_req = if let Some((height, width)) = info.height.zip(info.width) {
963 Media::make_local_thumbnail_media_request(&info.txn, height, width)
964 } else {
965 Media::make_local_file_media_request(&info.txn)
966 };
967
968 trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
969
970 cache_store
972 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
973 .await
974 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
975
976 cache_store
977 .replace_media_key(
978 &from_req,
979 &MediaRequestParameters { source: new_source, format: MediaFormat::File },
980 )
981 .await
982 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
983 }
984
985 Ok(())
986}