1use matrix_sdk_base::{
18 event_cache::store::media::IgnoreMediaRetentionPolicy,
19 media::{MediaFormat, MediaRequestParameters},
20 store::{
21 ChildTransactionId, DependentQueuedRequestKind, FinishUploadThumbnailInfo,
22 QueuedRequestKind, SentMediaInfo, SentRequestKey, SerializableEventContent,
23 },
24 RoomState,
25};
26use mime::Mime;
27use ruma::{
28 events::{
29 room::message::{FormattedBody, MessageType, RoomMessageEventContent},
30 AnyMessageLikeEventContent, Mentions,
31 },
32 MilliSecondsSinceUnixEpoch, OwnedTransactionId, TransactionId,
33};
34use tracing::{debug, error, instrument, trace, warn, Span};
35
36use super::{QueueStorage, RoomSendQueue, RoomSendQueueError};
37use crate::{
38 attachment::AttachmentConfig,
39 room::edit::update_media_caption,
40 send_queue::{
41 LocalEcho, LocalEchoContent, MediaHandles, RoomSendQueueStorageError, RoomSendQueueUpdate,
42 SendHandle,
43 },
44 Client, Media, Room,
45};
46
47fn update_media_event_after_upload(echo: &mut RoomMessageEventContent, sent: SentMediaInfo) {
50 match &mut echo.msgtype {
53 MessageType::Audio(event) => {
54 event.source = sent.file;
55 }
56 MessageType::File(event) => {
57 event.source = sent.file;
58 if let Some(info) = event.info.as_mut() {
59 info.thumbnail_source = sent.thumbnail;
60 }
61 }
62 MessageType::Image(event) => {
63 event.source = sent.file;
64 if let Some(info) = event.info.as_mut() {
65 info.thumbnail_source = sent.thumbnail;
66 }
67 }
68 MessageType::Video(event) => {
69 event.source = sent.file;
70 if let Some(info) = event.info.as_mut() {
71 info.thumbnail_source = sent.thumbnail;
72 }
73 }
74
75 _ => {
76 error!("Invalid message type in database: {}", echo.msgtype());
80 debug_assert!(false, "invalid message type in database");
82 }
83 }
84}
85
86impl RoomSendQueue {
87 #[instrument(skip_all, fields(event_txn))]
107 pub async fn send_attachment(
108 &self,
109 filename: impl Into<String>,
110 content_type: Mime,
111 data: Vec<u8>,
112 mut config: AttachmentConfig,
113 ) -> Result<SendHandle, RoomSendQueueError> {
114 let Some(room) = self.inner.room.get() else {
115 return Err(RoomSendQueueError::RoomDisappeared);
116 };
117
118 if room.state() != RoomState::Joined {
119 return Err(RoomSendQueueError::RoomNotJoined);
120 }
121
122 let filename = filename.into();
123 let upload_file_txn = TransactionId::new();
124 let send_event_txn = config.txn_id.map_or_else(ChildTransactionId::new, Into::into);
125
126 Span::current().record("event_txn", tracing::field::display(&*send_event_txn));
127 debug!(filename, %content_type, %upload_file_txn, "sending an attachment");
128
129 let file_media_request = Media::make_local_file_media_request(&upload_file_txn);
130
131 let (upload_thumbnail_txn, event_thumbnail_info, queue_thumbnail_info) = {
132 let client = room.client();
133 let cache_store = client
134 .event_cache_store()
135 .lock()
136 .await
137 .map_err(RoomSendQueueStorageError::LockError)?;
138
139 cache_store
141 .add_media_content(
142 &file_media_request,
143 data.clone(),
144 IgnoreMediaRetentionPolicy::Yes,
146 )
147 .await
148 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
149
150 if let Some(thumbnail) = config.thumbnail.take() {
152 let txn = TransactionId::new();
153 trace!(upload_thumbnail_txn = %txn, "attachment has a thumbnail");
154
155 let (data, content_type, thumbnail_info) = thumbnail.into_parts();
158
159 let thumbnail_media_request = Media::make_local_file_media_request(&txn);
161 cache_store
162 .add_media_content(
163 &thumbnail_media_request,
164 data,
165 IgnoreMediaRetentionPolicy::Yes,
167 )
168 .await
169 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
170
171 (
172 Some(txn.clone()),
173 Some((thumbnail_media_request.source.clone(), thumbnail_info)),
174 Some((
175 FinishUploadThumbnailInfo { txn, width: None, height: None },
176 thumbnail_media_request,
177 content_type,
178 )),
179 )
180 } else {
181 Default::default()
182 }
183 };
184
185 let event_content = Room::make_attachment_event(
187 room.make_attachment_type(
188 &content_type,
189 filename,
190 file_media_request.source.clone(),
191 config.caption,
192 config.formatted_caption,
193 config.info,
194 event_thumbnail_info,
195 ),
196 config.mentions,
197 );
198
199 let created_at = MilliSecondsSinceUnixEpoch::now();
200
201 self.inner
203 .queue
204 .push_media(
205 event_content.clone(),
206 content_type,
207 send_event_txn.clone().into(),
208 created_at,
209 upload_file_txn.clone(),
210 file_media_request,
211 queue_thumbnail_info,
212 )
213 .await?;
214
215 trace!("manager sends a media to the background task");
216
217 self.inner.notifier.notify_one();
218
219 let send_handle = SendHandle {
220 room: self.clone(),
221 transaction_id: send_event_txn.clone().into(),
222 media_handles: Some(MediaHandles { upload_thumbnail_txn, upload_file_txn }),
223 created_at,
224 };
225
226 let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
227 transaction_id: send_event_txn.clone().into(),
228 content: LocalEchoContent::Event {
229 serialized_event: SerializableEventContent::new(&event_content.into())
230 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
231 send_handle: send_handle.clone(),
232 send_error: None,
233 },
234 }));
235
236 Ok(send_handle)
237 }
238}
239
240impl QueueStorage {
241 #[allow(clippy::too_many_arguments)]
243 pub(super) async fn handle_dependent_finish_upload(
244 &self,
245 client: &Client,
246 event_txn: OwnedTransactionId,
247 parent_key: SentRequestKey,
248 mut local_echo: RoomMessageEventContent,
249 file_upload_txn: OwnedTransactionId,
250 thumbnail_info: Option<FinishUploadThumbnailInfo>,
251 new_updates: &mut Vec<RoomSendQueueUpdate>,
252 ) -> Result<(), RoomSendQueueError> {
253 let sent_media = parent_key
255 .into_media()
256 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
257
258 {
260 let from_req = Media::make_local_file_media_request(&file_upload_txn);
262
263 trace!(from = ?from_req.source, to = ?sent_media.file, "renaming media file key in cache store");
264 let cache_store = client
265 .event_cache_store()
266 .lock()
267 .await
268 .map_err(RoomSendQueueStorageError::LockError)?;
269
270 cache_store
272 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
273 .await
274 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
275
276 cache_store
277 .replace_media_key(
278 &from_req,
279 &MediaRequestParameters {
280 source: sent_media.file.clone(),
281 format: MediaFormat::File,
282 },
283 )
284 .await
285 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
286
287 if let Some((info, new_source)) =
289 thumbnail_info.as_ref().zip(sent_media.thumbnail.clone())
290 {
291 let from_req = if let Some((height, width)) = info.height.zip(info.width) {
294 Media::make_local_thumbnail_media_request(&info.txn, height, width)
295 } else {
296 Media::make_local_file_media_request(&info.txn)
297 };
298
299 trace!(from = ?from_req.source, to = ?new_source, "renaming thumbnail file key in cache store");
300
301 cache_store
303 .set_ignore_media_retention_policy(&from_req, IgnoreMediaRetentionPolicy::No)
304 .await
305 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
306
307 cache_store
308 .replace_media_key(
309 &from_req,
310 &MediaRequestParameters { source: new_source, format: MediaFormat::File },
311 )
312 .await
313 .map_err(RoomSendQueueStorageError::EventCacheStoreError)?;
314 }
315 }
316
317 update_media_event_after_upload(&mut local_echo, sent_media);
318
319 let new_content = SerializableEventContent::new(&local_echo.into())
320 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
321
322 new_updates.push(RoomSendQueueUpdate::ReplacedLocalEvent {
325 transaction_id: event_txn.clone(),
326 new_content: new_content.clone(),
327 });
328
329 trace!(%event_txn, "queueing media event after successfully uploading media(s)");
330
331 client
332 .store()
333 .save_send_queue_request(
334 &self.room_id,
335 event_txn,
336 MilliSecondsSinceUnixEpoch::now(),
337 new_content.into(),
338 Self::HIGH_PRIORITY,
339 )
340 .await
341 .map_err(RoomSendQueueStorageError::StateStoreError)?;
342
343 Ok(())
344 }
345
346 pub(super) async fn handle_dependent_file_upload_with_thumbnail(
348 &self,
349 client: &Client,
350 next_upload_txn: OwnedTransactionId,
351 parent_key: SentRequestKey,
352 content_type: String,
353 cache_key: MediaRequestParameters,
354 event_txn: OwnedTransactionId,
355 ) -> Result<(), RoomSendQueueError> {
356 let sent_media = parent_key
359 .into_media()
360 .ok_or(RoomSendQueueError::StorageError(RoomSendQueueStorageError::InvalidParentKey))?;
361
362 debug_assert!(sent_media.thumbnail.is_none());
365 if sent_media.thumbnail.is_some() {
366 warn!("unexpected thumbnail for a thumbnail!");
367 }
368
369 trace!(related_to = %event_txn, "done uploading thumbnail, now queuing a request to send the media file itself");
370
371 let request = QueuedRequestKind::MediaUpload {
372 content_type,
373 cache_key,
374 thumbnail_source: Some(sent_media.file),
376 related_to: event_txn,
377 };
378
379 client
380 .store()
381 .save_send_queue_request(
382 &self.room_id,
383 next_upload_txn,
384 MilliSecondsSinceUnixEpoch::now(),
385 request,
386 Self::HIGH_PRIORITY,
387 )
388 .await
389 .map_err(RoomSendQueueStorageError::StateStoreError)?;
390
391 Ok(())
392 }
393
394 #[instrument(skip(self, handles))]
401 pub(super) async fn abort_upload(
402 &self,
403 event_txn: &TransactionId,
404 handles: &MediaHandles,
405 ) -> Result<bool, RoomSendQueueStorageError> {
406 let mut guard = self.store.lock().await;
407 let client = guard.client()?;
408
409 debug!("trying to abort an upload");
411
412 let store = client.store();
413
414 let upload_file_as_dependent = ChildTransactionId::from(handles.upload_file_txn.clone());
415 let event_as_dependent = ChildTransactionId::from(event_txn.to_owned());
416
417 let mut removed_dependent_upload = false;
418 let mut removed_dependent_event = false;
419
420 if let Some(thumbnail_txn) = &handles.upload_thumbnail_txn {
421 if store.remove_send_queue_request(&self.room_id, thumbnail_txn).await? {
422 trace!("could remove thumbnail request, removing 2 dependent requests now");
425
426 if let Some(info) = guard.being_sent.as_ref() {
428 if info.transaction_id == *thumbnail_txn {
429 let info = guard.being_sent.take().unwrap();
431 if info.cancel_upload() {
432 trace!("aborted ongoing thumbnail upload");
433 }
434 }
435 }
436
437 removed_dependent_upload = store
439 .remove_dependent_queued_request(&self.room_id, &upload_file_as_dependent)
440 .await?;
441
442 if !removed_dependent_upload {
443 warn!("unable to find the dependent file upload request");
444 }
445
446 removed_dependent_event = store
447 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
448 .await?;
449
450 if !removed_dependent_event {
451 warn!("unable to find the dependent media event upload request");
452 }
453 }
454 }
455
456 if !removed_dependent_upload {
463 if store.remove_send_queue_request(&self.room_id, &handles.upload_file_txn).await? {
464 trace!("could remove file upload request, removing 1 dependent request");
467
468 if let Some(info) = guard.being_sent.as_ref() {
470 if info.transaction_id == handles.upload_file_txn {
471 let info = guard.being_sent.take().unwrap();
473 if info.cancel_upload() {
474 trace!("aborted ongoing file upload");
475 }
476 }
477 }
478
479 if !store
481 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
482 .await?
483 {
484 warn!("unable to find the dependent media event upload request");
485 }
486 } else {
487 if !removed_dependent_event
492 && !store
493 .remove_dependent_queued_request(&self.room_id, &event_as_dependent)
494 .await?
495 {
496 debug!("uploads already happened => deferring to aborting an event sending");
499 return Ok(false);
500 }
501 }
502 }
503
504 {
507 let event_cache = client.event_cache_store().lock().await?;
508 event_cache
509 .remove_media_content_for_uri(&Media::make_local_uri(&handles.upload_file_txn))
510 .await?;
511 if let Some(txn) = &handles.upload_thumbnail_txn {
512 event_cache.remove_media_content_for_uri(&Media::make_local_uri(txn)).await?;
513 }
514 }
515
516 debug!("successfully aborted!");
517 Ok(true)
518 }
519
520 #[instrument(skip(self, caption, formatted_caption))]
521 pub(super) async fn edit_media_caption(
522 &self,
523 txn: &TransactionId,
524 caption: Option<String>,
525 formatted_caption: Option<FormattedBody>,
526 mentions: Option<Mentions>,
527 ) -> Result<Option<AnyMessageLikeEventContent>, RoomSendQueueStorageError> {
528 use RoomSendQueueStorageError::InvalidMediaCaptionEdit;
530
531 let guard = self.store.lock().await;
532 let client = guard.client()?;
533 let store = client.store();
534
535 {
543 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
546
547 if let Some(found) =
548 dependent_requests.into_iter().find(|req| *req.own_transaction_id == *txn)
549 {
550 trace!("found the caption to edit in a dependent request");
551
552 let DependentQueuedRequestKind::FinishUpload {
553 mut local_echo,
554 file_upload,
555 thumbnail_info,
556 } = found.kind
557 else {
558 return Err(InvalidMediaCaptionEdit);
559 };
560
561 if !update_media_caption(&mut local_echo, caption, formatted_caption, mentions) {
562 return Err(InvalidMediaCaptionEdit);
563 }
564
565 let new_dependent_request = DependentQueuedRequestKind::FinishUpload {
566 local_echo: local_echo.clone(),
567 file_upload,
568 thumbnail_info,
569 };
570 store
571 .update_dependent_queued_request(
572 &self.room_id,
573 &found.own_transaction_id,
574 new_dependent_request,
575 )
576 .await?;
577
578 trace!("caption successfully updated");
579 return Ok(Some(local_echo.into()));
580 }
581 }
582
583 let requests = store.load_send_queue_requests(&self.room_id).await?;
584 let Some(found) = requests.into_iter().find(|req| req.transaction_id == *txn) else {
585 return Ok(None);
587 };
588
589 trace!("found the caption to edit as a request");
590
591 let QueuedRequestKind::Event { content: serialized_content } = found.kind else {
592 return Err(InvalidMediaCaptionEdit);
593 };
594
595 let deserialized = serialized_content.deserialize()?;
596 let AnyMessageLikeEventContent::RoomMessage(mut content) = deserialized else {
597 return Err(InvalidMediaCaptionEdit);
598 };
599
600 if !update_media_caption(&mut content, caption, formatted_caption, mentions) {
601 return Err(InvalidMediaCaptionEdit);
602 }
603
604 let any_content: AnyMessageLikeEventContent = content.into();
605 let new_serialized = SerializableEventContent::new(&any_content.clone())?;
606
607 if let Some(being_sent) = guard.being_sent.as_ref() {
609 if being_sent.transaction_id == *txn {
610 store
612 .save_dependent_queued_request(
613 &self.room_id,
614 txn,
615 ChildTransactionId::new(),
616 MilliSecondsSinceUnixEpoch::now(),
617 DependentQueuedRequestKind::EditEvent { new_content: new_serialized },
618 )
619 .await?;
620
621 trace!("media event was being sent, pushed a dependent edit");
622 return Ok(Some(any_content));
623 }
624 }
625
626 store
628 .update_send_queue_request(
629 &self.room_id,
630 txn,
631 QueuedRequestKind::Event { content: new_serialized },
632 )
633 .await?;
634
635 trace!("media event was not being sent, updated local echo");
636 Ok(Some(any_content))
637 }
638}