1#[cfg(feature = "unstable-msc4274")]
18use std::{collections::HashMap, iter::zip};
19
20use matrix_sdk_base::{
21 RoomState,
22 media::{
23 MediaFormat, MediaRequestParameters, MediaThumbnailSettings,
24 store::IgnoreMediaRetentionPolicy,
25 },
26 store::{
27 ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
28 QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
29 },
30};
31#[cfg(feature = "unstable-msc4274")]
32use matrix_sdk_base::{
33 media::UniqueKey,
34 store::{AccumulatedSentMediaInfo, FinishGalleryItemInfo},
35};
36use mime::Mime;
37#[cfg(feature = "unstable-msc4274")]
38use ruma::events::room::message::{GalleryItemType, GalleryMessageEventContent};
39use ruma::{
40 MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
41 events::{
42 AnyMessageLikeEventContent, Mentions,
43 room::{
44 MediaSource, ThumbnailInfo,
45 message::{FormattedBody, MessageType, RoomMessageEventContent},
46 },
47 },
48};
49use tracing::{Span, debug, error, instrument, trace, warn};
50
51use super::{QueueStorage, QueueThumbnailInfo, RoomSendQueue, RoomSendQueueError};
52use crate::{
53 Client, Media, Room,
54 attachment::{AttachmentConfig, Thumbnail},
55 room::edit::update_media_caption,
56 send_queue::{
57 LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
58 SendHandle,
59 },
60};
61#[cfg(feature = "unstable-msc4274")]
62use crate::{
63 attachment::{GalleryConfig, GalleryItemInfo},
64 send_queue::GalleryItemQueueInfo,
65};
66
67fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
70 match &mut echo.msgtype {
73 MessageType::Audio(event) => {
74 event.source = sent.file;
75 }
76 MessageType::File(event) => {
77 event.source = sent.file;
78 if let Some(info) = event.info.as_mut() {
79 info.thumbnail_source = sent.thumbnail;
80 }
81 }
82 MessageType::Image(event) => {
83 event.source = sent.file;
84 if let Some(info) = event.info.as_mut() {
85 info.thumbnail_source = sent.thumbnail;
86 }
87 }
88 MessageType::Video(event) => {
89 event.source = sent.file;
90 if let Some(info) = event.info.as_mut() {
91 info.thumbnail_source = sent.thumbnail;
92 }
93 }
94
95 _ => {
96 error!("Invalid message type in database: {}", echo.msgtype());
100 debug_assert!(false, "invalid message type in database");
102 }
103 }
104}
105
106#[cfg(feature = "unstable-msc4274")]
109fn update_gallery_event_after_upload(
110 echo: &mut RoomMessageEventContent,
111 sent: HashMap<String, AccumulatedSentMediaInfo>,
112) {
113 let MessageType::Gallery(gallery) = &mut echo.msgtype else {
114 error!("Invalid gallery item types in database");
118 debug_assert!(false, "invalid item type in database {:?}", echo.msgtype());
120 return;
121 };
122
123 for itemtype in gallery.itemtypes.iter_mut() {
126 match itemtype {
127 GalleryItemType::Audio(event) => match sent.get(&event.source.unique_key()) {
128 Some(sent) => event.source = sent.file.clone(),
129 None => error!("key for item {:?} does not exist on gallery event", event.source),
130 },
131 GalleryItemType::File(event) => match sent.get(&event.source.unique_key()) {
132 Some(sent) => {
133 event.source = sent.file.clone();
134 if let Some(info) = event.info.as_mut() {
135 info.thumbnail_source = sent.thumbnail.clone();
136 }
137 }
138 None => error!("key for item {:?} does not exist on gallery event", event.source),
139 },
140 GalleryItemType::Image(event) => match sent.get(&event.source.unique_key()) {
141 Some(sent) => {
142 event.source = sent.file.clone();
143 if let Some(info) = event.info.as_mut() {
144 info.thumbnail_source = sent.thumbnail.clone();
145 }
146 }
147 None => error!("key for item {:?} does not exist on gallery event", event.source),
148 },
149 GalleryItemType::Video(event) => match sent.get(&event.source.unique_key()) {
150 Some(sent) => {
151 event.source = sent.file.clone();
152 if let Some(info) = event.info.as_mut() {
153 info.thumbnail_source = sent.thumbnail.clone();
154 }
155 }
156 None => error!("key for item {:?} does not exist on gallery event", event.source),
157 },
158
159 _ => {
160 error!("Invalid gallery item types in database");
164 debug_assert!(false, "invalid gallery item type in database {itemtype:?}");
166 }
167 }
168 }
169}
170
171#[derive(Default)]
172struct MediaCacheResult {
173 upload_thumbnail_txn: Option<OwnedTransactionId>,
174 event_thumbnail_info: Option<(MediaSource, Box<ThumbnailInfo>)>,
175 queue_thumbnail_info: Option<QueueThumbnailInfo>,
176}
177
178impl RoomSendQueue {
179 #[instrument(skip_all, fields(event_txn))]
199 pub async fn send_attachment(
200 &self,
201 filename: impl Into<String>,
202 content_type: Mime,
203 data: Vec<u8>,
204 mut config: AttachmentConfig,
205 ) -> Result<SendHandle, RoomSendQueueError> {
206 let Some(room) = self.inner.room.get() else {
207 return Err(RoomSendQueueError::RoomDisappeared);
208 };
209
210 if room.state() != RoomState::Joined {
211 return Err(RoomSendQueueError::RoomNotJoined);
212 }
213
214 let filename = filename.into();
215 let upload_file_txn = TransactionId::new();
216 let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
217
218 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
219 debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
220
221 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
222
223 let MediaCacheResult { upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info } =
224 RoomSendQueue::cache_media(&room, data, config.thumbnail.take(), &file_media_request)
225 .await?;
226
227 let event_content = room
229 .make_media_event(
230 Room::make_attachment_type(
231 &content_type,
232 filename,
233 file_media_request.source.clone(),
234 config.caption,
235 config.info,
236 event_thumbnail_info,
237 ),
238 config.mentions,
239 config.reply,
240 )
241 .await
242 .map_err(|_| RoomSendQueueError::FailedToCreateAttachment)?;
243
244 let created_at = MilliSecondsSinceUnixEpoch::now();
245
246 self.inner
248 .queue
249 .push_media(
250 event_content.clone(),
251 content_type,
252 send_event_txn.clone().into(),
253 created_at,
254 upload_file_txn.clone(),
255 file_media_request,
256 queue_thumbnail_info,
257 )
258 .await?;
259
260 trace!("manager sends a media to the background task");
261
262 self.inner.notifier.notify_one();
263
264 let send_handle = SendHandle {
265 room: self.clone(),
266 transaction_id: send_event_txn.clone().into(),
267 media_handles: vec![MediaHandles { upload_thumbnail_txn, upload_file_txn }],
268 created_at,
269 };
270
271 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
272 transaction_id: send_event_txn.clone().into(),
273 content: LocalEchoContent::Event {
274 serialized_event: SerializableEventContent::new(&event_content.into())
275 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
276 send_handle: send_handle.clone(),
277 send_error: None,
278 },
279 }));
280
281 Ok(send_handle)
282 }
283
284 #[cfg(feature = "unstable-msc4274")]
304 #[instrument(skip_all, fields(event_txn))]
305 pub async fn send_gallery(
306 &self,
307 gallery: GalleryConfig,
308 ) -> Result<SendHandle, RoomSendQueueError> {
309 let Some(room) = self.inner.room.get() else {
310 return Err(RoomSendQueueError::RoomDisappeared);
311 };
312
313 if room.state() != RoomState::Joined {
314 return Err(RoomSendQueueError::RoomNotJoined);
315 }
316
317 if gallery.is_empty() {
318 return Err(RoomSendQueueError::EmptyGallery);
319 }
320
321 let send_event_txn =
322 gallery.txn_id.clone().map_or_else(ChildTransactionId::new, Into::into);
323
324 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
325
326 let mut item_types = Vec::with_capacity(gallery.len());
327 let mut item_queue_infos = Vec::with_capacity(gallery.len());
328 let mut media_handles = Vec::with_capacity(gallery.len());
329
330 for item_info in gallery.items {
331 let GalleryItemInfo { filename, content_type, data, .. } = item_info;
332
333 let upload_file_txn = TransactionId::new();
334
335 debug!(filename, %content_type, %upload_file_txn, "uploading a gallery attachment");
336
337 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
338
339 let MediaCacheResult {
340 upload_thumbnail_txn,
341 event_thumbnail_info,
342 queue_thumbnail_info,
343 } = RoomSendQueue::cache_media(&room, data, item_info.thumbnail, &file_media_request)
344 .await?;
345
346 item_types.push(Room::make_gallery_item_type(
347 &content_type,
348 filename,
349 file_media_request.source.clone(),
350 item_info.caption,
351 Some(item_info.attachment_info),
352 event_thumbnail_info,
353 ));
354
355 item_queue_infos.push(GalleryItemQueueInfo {
356 content_type,
357 upload_file_txn: upload_file_txn.clone(),
358 file_media_request,
359 thumbnail: queue_thumbnail_info,
360 });
361
362 media_handles.push(MediaHandles { upload_file_txn, upload_thumbnail_txn });
363 }
364
365 let (body, formatted) =
367 gallery.caption.map(|caption| (caption.body, caption.formatted)).unwrap_or_default();
368 let event_content = room
369 .make_media_event(
370 MessageType::Gallery(GalleryMessageEventContent::new(body, formatted, item_types)),
371 gallery.mentions,
372 gallery.reply,
373 )
374 .await
375 .map_err(|_| RoomSendQueueError::FailedToCreateGallery)?;
376
377 let created_at = MilliSecondsSinceUnixEpoch::now();
378
379 self.inner
381 .queue
382 .push_gallery(
383 event_content.clone(),
384 send_event_txn.clone().into(),
385 created_at,
386 item_queue_infos,
387 )
388 .await?;
389
390 trace!("manager sends a gallery to the background task");
391
392 self.inner.notifier.notify_one();
393
394 let send_handle = SendHandle {
395 room: self.clone(),
396 transaction_id: send_event_txn.clone().into(),
397 media_handles,
398 created_at,
399 };
400
401 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
402 transaction_id: send_event_txn.clone().into(),
403 content: LocalEchoContent::Event {
404 serialized_event: SerializableEventContent::new(&event_content.into())
405 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
406 send_handle: send_handle.clone(),
407 send_error: None,
408 },
409 }));
410
411 Ok(send_handle)
412 }
413
414 async fn cache_media(
415 room: &Room,
416 data: Vec<u8>,
417 thumbnail: Option<Thumbnail>,
418 file_media_request: &MediaRequestParameters,
419 ) -> Result<MediaCacheResult, RoomSendQueueError> {
420 let client = room.client();
421 let media_store =
422 client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
423
424 media_store
426 .add_media_content(
427 file_media_request,
428 data,
429 IgnoreMediaRetentionPolicy::Yes,
431 )
432 .await
433 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
434
435 if let Some(thumbnail) = thumbnail {
437 let txn = TransactionId::new();
438 trace!(upload_thumbnail_txn = %txn, "media has a thumbnail");
439
440 let (data, content_type, thumbnail_info) = thumbnail.into_parts();
443 let file_size = data.len();
444
445 let thumbnail_height = thumbnail_info.height;
446 let thumbnail_width = thumbnail_info.width;
447
448 let thumbnail_media_request = Media::make_local_file_media_request(&txn);
450 media_store
451 .add_media_content(
452 &thumbnail_media_request,
453 data,
454 IgnoreMediaRetentionPolicy::Yes,
456 )
457 .await
458 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
459
460 Ok(MediaCacheResult {
461 upload_thumbnail_txn: Some(txn.clone()),
462 event_thumbnail_info: Some((
463 thumbnail_media_request.source.clone(),
464 thumbnail_info,
465 )),
466 queue_thumbnail_info: Some(QueueThumbnailInfo {
467 finish_upload_thumbnail_info: FinishUploadThumbnailInfo {
468 txn,
469 width: thumbnail_width,
470 height: thumbnail_height,
471 },
472 media_request_parameters: thumbnail_media_request,
473 content_type,
474 file_size,
475 }),
476 })
477 } else {
478 Ok(Default::default())
479 }
480 }
481}
482
483impl QueueStorage {
484 #[allow(clippy::too_many_arguments)]
486 pub(super) async fn handle_dependent_finish_upload(
487 &self,
488 client: &Client,
489 event_txn: OwnedTransactionId,
490 parent_key: SentRequestKey,
491 mut local_echo: RoomMessageEventContent,
492 file_upload_txn: OwnedTransactionId,
493 thumbnail_info: Option<FinishUploadThumbnailInfo>,
494 new_updates: &mut Vec<RoomSendQueueUpdate>,
495 ) -> Result<(), RoomSendQueueError> {
496 let sent_media = parent_key
498 .into_media()
499 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
500
501 update_media_cache_keys_after_upload(client, &file_upload_txn, thumbnail_info, &sent_media)
502 .await?;
503 update_media_event_after_upload(&mut local_echo, sent_media);
504
505 let new_content = SerializableEventContent::new(&local_echo.into())
506 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
507
508 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
511 transaction_id: event_txn.clone(),
512 new_content: new_content.clone(),
513 });
514
515 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
516
517 client
518 .state_store()
519 .save_send_queue_request(
520 &self.room_id,
521 event_txn,
522 MilliSecondsSinceUnixEpoch::now(),
523 new_content.into(),
524 Self::HIGH_PRIORITY,
525 )
526 .await
527 .map_err(RoomSendQueueStorageError::StateStoreError)?;
528
529 Ok(())
530 }
531
532 #[cfg(feature = "unstable-msc4274")]
535 #[allow(clippy::too_many_arguments)]
536 pub(super) async fn handle_dependent_finish_gallery_upload(
537 &self,
538 client: &Client,
539 event_txn: OwnedTransactionId,
540 parent_key: SentRequestKey,
541 mut local_echo: RoomMessageEventContent,
542 item_infos: Vec<FinishGalleryItemInfo>,
543 new_updates: &mut Vec<RoomSendQueueUpdate>,
544 ) -> Result<(), RoomSendQueueError> {
545 let sent_gallery = parent_key
547 .into_media()
548 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
549
550 let mut sent_media_vec = sent_gallery.accumulated;
551 sent_media_vec.push(AccumulatedSentMediaInfo {
552 file: sent_gallery.file,
553 thumbnail: sent_gallery.thumbnail,
554 });
555
556 let mut sent_infos = HashMap::new();
557
558 for (item_info, sent_media) in zip(item_infos, sent_media_vec) {
559 let FinishGalleryItemInfo { file_upload: file_upload_txn, thumbnail_info } = item_info;
560
561 let from_req = Media::make_local_file_media_request(&file_upload_txn);
564 sent_infos.insert(from_req.source.unique_key(), sent_media.clone());
565
566 update_media_cache_keys_after_upload(
567 client,
568 &file_upload_txn,
569 thumbnail_info,
570 &sent_media.into(),
571 )
572 .await?;
573 }
574
575 update_gallery_event_after_upload(&mut local_echo, sent_infos);
576
577 let new_content = SerializableEventContent::new(&local_echo.into())
578 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
579
580 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
583 transaction_id: event_txn.clone(),
584 new_content: new_content.clone(),
585 });
586
587 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
588
589 client
590 .state_store()
591 .save_send_queue_request(
592 &self.room_id,
593 event_txn,
594 MilliSecondsSinceUnixEpoch::now(),
595 new_content.into(),
596 Self::HIGH_PRIORITY,
597 )
598 .await
599 .map_err(RoomSendQueueStorageError::StateStoreError)?;
600
601 Ok(())
602 }
603
604 #[allow(clippy::too_many_arguments)]
607 pub(super) async fn handle_dependent_file_or_thumbnail_upload(
608 &self,
609 client: &Client,
610 next_upload_txn: OwnedTransactionId,
611 parent_key: SentRequestKey,
612 content_type: String,
613 cache_key: MediaRequestParameters,
614 event_txn: OwnedTransactionId,
615 parent_is_thumbnail_upload: bool,
616 ) -> Result<(), RoomSendQueueError> {
617 let sent_media = parent_key
620 .into_media()
621 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
622
623 if parent_is_thumbnail_upload {
626 debug_assert!(sent_media.thumbnail.is_none());
627 if sent_media.thumbnail.is_some() {
628 warn!("unexpected thumbnail for a thumbnail!");
629 }
630 }
631
632 trace!(
633 related_to = %event_txn,
634 "done uploading file or thumbnail, now queuing the dependent file \
635 or thumbnail upload request",
636 );
637
638 #[cfg(feature = "unstable-msc4274")]
644 let accumulated = if parent_is_thumbnail_upload {
645 sent_media.accumulated
646 } else {
647 let mut accumulated = sent_media.accumulated;
648 accumulated.push(AccumulatedSentMediaInfo {
649 file: sent_media.file.clone(),
650 thumbnail: sent_media.thumbnail,
651 });
652 accumulated
653 };
654
655 let request = QueuedRequestKind::MediaUpload {
656 content_type,
657 cache_key,
658 thumbnail_source: parent_is_thumbnail_upload.then_some(sent_media.file),
661 related_to: event_txn,
662 #[cfg(feature = "unstable-msc4274")]
663 accumulated,
664 };
665
666 client
667 .state_store()
668 .save_send_queue_request(
669 &self.room_id,
670 next_upload_txn,
671 MilliSecondsSinceUnixEpoch::now(),
672 request,
673 Self::HIGH_PRIORITY,
674 )
675 .await
676 .map_err(RoomSendQueueStorageError::StateStoreError)?;
677
678 Ok(())
679 }
680
681 #[instrument(skip(self, handles))]
688 pub(super) async fn abort_upload(
689 &self,
690 event_txn: &TransactionId,
691 handles: &MediaHandles,
692 ) -> Result<bool, RoomSendQueueStorageError> {
693 let mut guard = self.store.lock().await;
694 let client = guard.client()?;
695
696 debug!("trying to abort an upload");
698
699 let store = client.state_store();
700
701 let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
702 let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
703
704 let mut removed_dependent_upload = false;
705 let mut removed_dependent_event = false;
706
707 if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn
708 && store.remove_send_queue_request(&self.room_id, thumbnail_txn).await?
709 {
710 trace!("could remove thumbnail request, removing 2 dependent requests now");
713
714 if let Some(info) = guard.being_sent.as_ref()
716 && info.transaction_id == *thumbnail_txn
717 {
718 let info = guard.being_sent.take().unwrap();
720 if info.cancel_upload() {
721 trace!("aborted ongoing thumbnail upload");
722 }
723 }
724
725 removed_dependent_upload = store
727 .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
728 .await?;
729
730 if !removed_dependent_upload {
731 warn!("unable to find the dependent file upload request");
732 }
733
734 removed_dependent_event =
735 store.remove_dependent_queued_request(&self.room_id, &event_as_dependent).await?;
736
737 if !removed_dependent_event {
738 warn!("unable to find the dependent media event upload request");
739 }
740 }
741
742 if !removed_dependent_upload {
749 if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
750 trace!("could remove file upload request, removing 1 dependent request");
753
754 if let Some(info) = guard.being_sent.as_ref()
756 && info.transaction_id == handles.upload_file_txn
757 {
758 let info = guard.being_sent.take().unwrap();
760 if info.cancel_upload() {
761 trace!("aborted ongoing file upload");
762 }
763 }
764
765 if !store
767 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
768 .await?
769 {
770 warn!("unable to find the dependent media event upload request");
771 }
772 } else {
773 if !removed_dependent_event
778 && !store
779 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
780 .await?
781 {
782 debug!("uploads already happened => deferring to aborting an event sending");
785 return Ok(false);
786 }
787 }
788 }
789
790 {
793 let media_store = client.media_store().lock().await?;
794 media_store
795 .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
796 .await?;
797 if let Some(txn) = &handles.upload_thumbnail_txn {
798 media_store.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
799 }
800 }
801
802 debug!("successfully aborted!");
803 Ok(true)
804 }
805
806 #[instrument(skip(self, caption, formatted_caption))]
807 pub(super) async fn edit_media_caption(
808 &self,
809 txn: &TransactionId,
810 caption: Option<String>,
811 formatted_caption: Option<FormattedBody>,
812 mentions: Option<Mentions>,
813 ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
814 use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
816
817 let guard = self.store.lock().await;
818 let client = guard.client()?;
819 let store = client.state_store();
820
821 {
829 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
832
833 if let Some(found) =
834 dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
835 {
836 trace!("found the caption to edit in a dependent request");
837
838 let DependentQueuedRequestKind::FinishUpload {
839 mut local_echo,
840 file_upload,
841 thumbnail_info,
842 } = found.kind
843 else {
844 return Err(InvalidMediaCaptionEdit);
845 };
846
847 if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
848 return Err(InvalidMediaCaptionEdit);
849 }
850
851 let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
852 local_echo: local_echo.clone(),
853 file_upload,
854 thumbnail_info,
855 };
856 store
857 .update_dependent_queued_request(
858 &self.room_id,
859 &found.own_transaction_id,
860 new_dependent_request,
861 )
862 .await?;
863
864 trace!("caption successfully updated");
865 return Ok(Some((*local_echo).into()));
866 }
867 }
868
869 let requests = store.load_send_queue_requests(&self.room_id).await?;
870 let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
871 return Ok(None);
873 };
874
875 trace!("found the caption to edit as a request");
876
877 let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
878 return Err(InvalidMediaCaptionEdit);
879 };
880
881 let deserialized = serialized_content.deserialize()?;
882 let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
883 return Err(InvalidMediaCaptionEdit);
884 };
885
886 if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
887 return Err(InvalidMediaCaptionEdit);
888 }
889
890 let any_content: AnyMessageLikeEventContent = content.into();
891 let new_serialized = SerializableEventContent::new(&any_content.clone())?;
892
893 if let Some(being_sent) = guard.being_sent.as_ref()
895 && being_sent.transaction_id == *txn
896 {
897 store
899 .save_dependent_queued_request(
900 &self.room_id,
901 txn,
902 ChildTransactionId::new(),
903 MilliSecondsSinceUnixEpoch::now(),
904 DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
905 )
906 .await?;
907
908 trace!("media event was being sent, pushed a dependent edit");
909 return Ok(Some(any_content));
910 }
911
912 store
914 .update_send_queue_request(
915 &self.room_id,
916 txn,
917 QueuedRequestKind::Event { content: new_serialized },
918 )
919 .await?;
920
921 trace!("media event was not being sent, updated local echo");
922 Ok(Some(any_content))
923 }
924}
925
926async fn update_media_cache_keys_after_upload(
929 client: &Client,
930 file_upload_txn: &OwnedTransactionId,
931 thumbnail_info: Option<FinishUploadThumbnailInfo>,
932 sent_media: &SentMediaInfo,
933) -> Result<(), RoomSendQueueError> {
934 let from_req = Media::make_local_file_media_request(file_upload_txn);
936
937 trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
938 let media_store =
939 client.media_store().lock().await.map_err(RoomSendQueueStorageError::LockError)?;
940
941 media_store
943 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
944 .await
945 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
946
947 media_store
948 .replace_media_key(
949 &from_req,
950 &MediaRequestParameters { source: sent_media.file.clone(), format: MediaFormat::File },
951 )
952 .await
953 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
954
955 if let Some((info, remote_thumbnail_source)) =
957 thumbnail_info.as_ref().zip(sent_media.thumbnail.clone())
958 {
959 let from_request_params = Media::make_local_file_media_request(&info.txn);
960
961 if let Some((height, width)) = info.height.zip(info.width) {
962 trace!(
963 from = ?from_req.source,
964 to = ?remote_thumbnail_source,
965 height = u64::from(height),
966 width = u64::from(width),
967 "storing thumbnail as a thumbnail for the uploaded media, and a file in itself, in cache store"
968 );
969
970 match media_store
974 .get_media_content(&MediaRequestParameters {
975 source: from_request_params.source.clone(),
976 format: MediaFormat::File,
977 })
978 .await
979 {
980 Ok(Some(thumbnail_content)) => {
981 media_store
984 .add_media_content(
985 &MediaRequestParameters {
986 source: remote_thumbnail_source.clone(),
987 format: MediaFormat::Thumbnail(MediaThumbnailSettings::new(
988 width, height,
989 )),
990 },
991 thumbnail_content,
992 IgnoreMediaRetentionPolicy::No,
993 )
994 .await
995 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
996 }
997
998 Ok(None) => {
999 warn!(
1000 from = ?from_request_params.source,
1001 "unable to reload thumbnail content from media store: no content found",
1002 );
1003 }
1004
1005 Err(err) => {
1006 error!(
1009 from = ?from_request_params.source,
1010 "unable to reload thumbnail content from media store: {err}"
1011 );
1012 }
1013 }
1014 } else {
1015 trace!(from = ?from_req.source, to = ?remote_thumbnail_source, "only renaming thumbnail key to file in cache store");
1016 }
1017
1018 media_store
1020 .set_ignore_media_retention_policy(&from_request_params, IgnoreMediaRetentionPolicy::No)
1021 .await
1022 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
1023
1024 media_store
1026 .replace_media_key(
1027 &from_request_params,
1028 &MediaRequestParameters {
1029 source: remote_thumbnail_source,
1030 format: MediaFormat::File,
1031 },
1032 )
1033 .await
1034 .map_err(RoomSendQueueStorageError::MediaStoreError)?;
1035 }
1036
1037 Ok(())
1038}