1use std::{
132 collections::{BTreeMap, HashMap},
133 str::FromStr as _,
134 sync::{
135 atomic::{AtomicBool, Ordering},
136 Arc, RwLock,
137 },
138};
139
140use as_variant::as_variant;
141#[cfg(feature = "unstable-msc4274")]
142use matrix_sdk_base::store::FinishGalleryItemInfo;
143use matrix_sdk_base::{
144 event_cache::store::EventCacheStoreError,
145 media::MediaRequestParameters,
146 store::{
147 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
148 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
149 SentMediaInfo, SentRequestKey, SerializableEventContent,
150 },
151 store_locks::LockStoreError,
152 RoomState, StoreError,
153};
154use matrix_sdk_common::executor::{spawn, JoinHandle};
155use mime::Mime;
156use ruma::{
157 events::{
158 reaction::ReactionEventContent,
159 relation::Annotation,
160 room::{
161 message::{FormattedBody, RoomMessageEventContent},
162 MediaSource,
163 },
164 AnyMessageLikeEventContent, EventContent as _, Mentions,
165 },
166 serde::Raw,
167 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
168};
169use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
170use tracing::{debug, error, info, instrument, trace, warn};
171
172#[cfg(feature = "e2e-encryption")]
173use crate::crypto::{OlmError, SessionRecipientCollectionError};
174use crate::{
175 client::WeakClient,
176 config::RequestConfig,
177 error::RetryKind,
178 room::{edit::EditedContent, WeakRoom},
179 Client, Media, Room,
180};
181
182mod upload;
183
184pub struct SendQueue {
186 client: Client,
187}
188
189#[cfg(not(tarpaulin_include))]
190impl std::fmt::Debug for SendQueue {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 f.debug_struct("SendQueue").finish_non_exhaustive()
193 }
194}
195
196impl SendQueue {
197 pub(super) fn new(client: Client) -> Self {
198 Self { client }
199 }
200
201 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
204 if !self.is_enabled() {
205 return;
206 }
207
208 let room_ids =
209 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
210 |err| {
211 warn!("error when loading rooms with unsent requests: {err}");
212 Vec::new()
213 },
214 );
215
216 for room_id in room_ids {
218 if let Some(room) = self.client.get_room(&room_id) {
219 let _ = self.for_room(room);
220 }
221 }
222 }
223
224 #[inline(always)]
226 fn data(&self) -> &SendQueueData {
227 &self.client.inner.send_queue_data
228 }
229
230 fn for_room(&self, room: Room) -> RoomSendQueue {
233 let data = self.data();
234
235 let mut map = data.rooms.write().unwrap();
236
237 let room_id = room.room_id();
238 if let Some(room_q) = map.get(room_id).cloned() {
239 return room_q;
240 }
241
242 let owned_room_id = room_id.to_owned();
243 let room_q = RoomSendQueue::new(
244 self.is_enabled(),
245 data.error_reporter.clone(),
246 data.is_dropping.clone(),
247 &self.client,
248 owned_room_id.clone(),
249 );
250
251 map.insert(owned_room_id, room_q.clone());
252
253 room_q
254 }
255
256 pub async fn set_enabled(&self, enabled: bool) {
266 debug!(?enabled, "setting global send queue enablement");
267
268 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
269
270 for room in self.data().rooms.read().unwrap().values() {
272 room.set_enabled(enabled);
273 }
274
275 self.respawn_tasks_for_rooms_with_unsent_requests().await;
278 }
279
280 pub fn is_enabled(&self) -> bool {
283 self.data().globally_enabled.load(Ordering::SeqCst)
284 }
285
286 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
289 self.data().error_reporter.subscribe()
290 }
291}
292
293#[derive(Clone, Debug)]
295pub struct SendQueueRoomError {
296 pub room_id: OwnedRoomId,
298
299 pub error: Arc<crate::Error>,
301
302 pub is_recoverable: bool,
308}
309
310impl Client {
311 pub fn send_queue(&self) -> SendQueue {
314 SendQueue::new(self.clone())
315 }
316}
317
318pub(super) struct SendQueueData {
319 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
321
322 globally_enabled: AtomicBool,
327
328 error_reporter: broadcast::Sender<SendQueueRoomError>,
330
331 is_dropping: Arc<AtomicBool>,
333}
334
335impl SendQueueData {
336 pub fn new(globally_enabled: bool) -> Self {
338 let (sender, _) = broadcast::channel(32);
339
340 Self {
341 rooms: Default::default(),
342 globally_enabled: AtomicBool::new(globally_enabled),
343 error_reporter: sender,
344 is_dropping: Arc::new(false.into()),
345 }
346 }
347}
348
349impl Drop for SendQueueData {
350 fn drop(&mut self) {
351 debug!("globally dropping the send queue");
354 self.is_dropping.store(true, Ordering::SeqCst);
355
356 let rooms = self.rooms.read().unwrap();
357 for room in rooms.values() {
358 room.inner.notifier.notify_one();
359 }
360 }
361}
362
363impl Room {
364 pub fn send_queue(&self) -> RoomSendQueue {
366 self.client.send_queue().for_room(self.clone())
367 }
368}
369
370#[derive(Clone)]
374pub struct RoomSendQueue {
375 inner: Arc<RoomSendQueueInner>,
376}
377
378#[cfg(not(tarpaulin_include))]
379impl std::fmt::Debug for RoomSendQueue {
380 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
382 }
383}
384
385impl RoomSendQueue {
386 fn new(
387 globally_enabled: bool,
388 global_error_reporter: broadcast::Sender<SendQueueRoomError>,
389 is_dropping: Arc<AtomicBool>,
390 client: &Client,
391 room_id: OwnedRoomId,
392 ) -> Self {
393 let (updates_sender, _) = broadcast::channel(32);
394
395 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
396 let notifier = Arc::new(Notify::new());
397
398 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
399 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
400
401 let task = spawn(Self::sending_task(
402 weak_room.clone(),
403 queue.clone(),
404 notifier.clone(),
405 updates_sender.clone(),
406 locally_enabled.clone(),
407 global_error_reporter,
408 is_dropping,
409 ));
410
411 Self {
412 inner: Arc::new(RoomSendQueueInner {
413 room: weak_room,
414 updates: updates_sender,
415 _task: task,
416 queue,
417 notifier,
418 locally_enabled,
419 }),
420 }
421 }
422
423 pub async fn send_raw(
438 &self,
439 content: Raw<AnyMessageLikeEventContent>,
440 event_type: String,
441 ) -> Result<SendHandle, RoomSendQueueError> {
442 let Some(room) = self.inner.room.get() else {
443 return Err(RoomSendQueueError::RoomDisappeared);
444 };
445 if room.state() != RoomState::Joined {
446 return Err(RoomSendQueueError::RoomNotJoined);
447 }
448
449 let content = SerializableEventContent::from_raw(content, event_type);
450
451 let created_at = MilliSecondsSinceUnixEpoch::now();
452 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
453 trace!(%transaction_id, "manager sends a raw event to the background task");
454
455 self.inner.notifier.notify_one();
456
457 let send_handle = SendHandle {
458 room: self.clone(),
459 transaction_id: transaction_id.clone(),
460 media_handles: vec![],
461 created_at,
462 };
463
464 let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
465 transaction_id,
466 content: LocalEchoContent::Event {
467 serialized_event: content,
468 send_handle: send_handle.clone(),
469 send_error: None,
470 },
471 }));
472
473 Ok(send_handle)
474 }
475
476 pub async fn send(
491 &self,
492 content: AnyMessageLikeEventContent,
493 ) -> Result<SendHandle, RoomSendQueueError> {
494 self.send_raw(
495 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
496 content.event_type().to_string(),
497 )
498 .await
499 }
500
501 pub async fn subscribe(
504 &self,
505 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
506 {
507 let local_echoes = self.inner.queue.local_echoes(self).await?;
508
509 Ok((local_echoes, self.inner.updates.subscribe()))
510 }
511
512 #[instrument(skip_all, fields(room_id = %room.room_id()))]
518 async fn sending_task(
519 room: WeakRoom,
520 queue: QueueStorage,
521 notifier: Arc<Notify>,
522 updates: broadcast::Sender<RoomSendQueueUpdate>,
523 locally_enabled: Arc<AtomicBool>,
524 global_error_reporter: broadcast::Sender<SendQueueRoomError>,
525 is_dropping: Arc<AtomicBool>,
526 ) {
527 trace!("spawned the sending task");
528
529 loop {
530 if is_dropping.load(Ordering::SeqCst) {
532 trace!("shutting down!");
533 break;
534 }
535
536 let mut new_updates = Vec::new();
539 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
540 warn!("errors when applying dependent requests: {err}");
541 }
542
543 for up in new_updates {
544 let _ = updates.send(up);
545 }
546
547 if !locally_enabled.load(Ordering::SeqCst) {
548 trace!("not enabled, sleeping");
549 notifier.notified().await;
551 continue;
552 }
553
554 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
555 Ok(Some(request)) => request,
556
557 Ok(None) => {
558 trace!("queue is empty, sleeping");
559 notifier.notified().await;
561 continue;
562 }
563
564 Err(err) => {
565 warn!("error when loading next request to send: {err}");
566 continue;
567 }
568 };
569
570 let txn_id = queued_request.transaction_id.clone();
571 trace!(txn_id = %txn_id, "received a request to send!");
572
573 let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::MediaUpload { related_to, .. } => related_to.clone());
574
575 let Some(room) = room.get() else {
576 if is_dropping.load(Ordering::SeqCst) {
577 break;
578 }
579 error!("the weak room couldn't be upgraded but we're not shutting down?");
580 continue;
581 };
582
583 match Self::handle_request(&room, queued_request, cancel_upload_rx).await {
584 Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
585 {
586 Ok(()) => match parent_key {
587 SentRequestKey::Event(event_id) => {
588 let _ = updates.send(RoomSendQueueUpdate::SentEvent {
589 transaction_id: txn_id,
590 event_id,
591 });
592 }
593
594 SentRequestKey::Media(media_info) => {
595 let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
596 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
597 file: media_info.file,
598 });
599 }
600 },
601
602 Err(err) => {
603 warn!("unable to mark queued request as sent: {err}");
604 }
605 },
606
607 Ok(None) => {
608 debug!("Request has been aborted while running, continuing.");
609 }
610
611 Err(err) => {
612 let is_recoverable = match err {
613 crate::Error::Http(ref http_err) => {
614 matches!(
616 http_err.retry_kind(),
617 RetryKind::Transient { .. } | RetryKind::NetworkFailure
618 )
619 }
620
621 crate::Error::ConcurrentRequestFailed => true,
626
627 _ => false,
629 };
630
631 locally_enabled.store(false, Ordering::SeqCst);
633
634 if is_recoverable {
635 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
636
637 queue.mark_as_not_being_sent(&txn_id).await;
640
641 } else {
647 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
648
649 if let Err(storage_error) =
651 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
652 {
653 warn!("unable to mark request as wedged: {storage_error}");
654 }
655 }
656
657 let error = Arc::new(err);
658
659 let _ = global_error_reporter.send(SendQueueRoomError {
660 room_id: room.room_id().to_owned(),
661 error: error.clone(),
662 is_recoverable,
663 });
664
665 let _ = updates.send(RoomSendQueueUpdate::SendError {
666 transaction_id: related_txn_id.unwrap_or(txn_id),
667 error,
668 is_recoverable,
669 });
670 }
671 }
672 }
673
674 info!("exited sending task");
675 }
676
677 async fn handle_request(
681 room: &Room,
682 request: QueuedRequest,
683 cancel_upload_rx: Option<oneshot::Receiver<()>>,
684 ) -> Result<Option<SentRequestKey>, crate::Error> {
685 match request.kind {
686 QueuedRequestKind::Event { content } => {
687 let (event, event_type) = content.raw();
688
689 let res = room
690 .send_raw(event_type, event)
691 .with_transaction_id(&request.transaction_id)
692 .with_request_config(RequestConfig::short_retry())
693 .await?;
694
695 trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
696 Ok(Some(SentRequestKey::Event(res.event_id)))
697 }
698
699 QueuedRequestKind::MediaUpload {
700 content_type,
701 cache_key,
702 thumbnail_source,
703 related_to: relates_to,
704 #[cfg(feature = "unstable-msc4274")]
705 accumulated,
706 } => {
707 trace!(%relates_to, "uploading media related to event");
708
709 let fut = async move {
710 let data = room
711 .client()
712 .event_cache_store()
713 .lock()
714 .await?
715 .get_media_content(&cache_key)
716 .await?
717 .ok_or(crate::Error::SendQueueWedgeError(Box::new(
718 QueueWedgeError::MissingMediaContent,
719 )))?;
720
721 let mime = Mime::from_str(&content_type).map_err(|_| {
722 crate::Error::SendQueueWedgeError(Box::new(
723 QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
724 ))
725 })?;
726
727 #[cfg(feature = "e2e-encryption")]
728 let media_source = if room.latest_encryption_state().await?.is_encrypted() {
729 trace!("upload will be encrypted (encrypted room)");
730 let mut cursor = std::io::Cursor::new(data);
731 let encrypted_file = room
732 .client()
733 .upload_encrypted_file(&mut cursor)
734 .with_request_config(RequestConfig::short_retry())
735 .await?;
736 MediaSource::Encrypted(Box::new(encrypted_file))
737 } else {
738 trace!("upload will be in clear text (room without encryption)");
739 let request_config = RequestConfig::short_retry()
740 .timeout(Media::reasonable_upload_timeout(&data));
741 let res =
742 room.client().media().upload(&mime, data, Some(request_config)).await?;
743 MediaSource::Plain(res.content_uri)
744 };
745
746 #[cfg(not(feature = "e2e-encryption"))]
747 let media_source = {
748 let request_config = RequestConfig::short_retry()
749 .timeout(Media::reasonable_upload_timeout(&data));
750 let res =
751 room.client().media().upload(&mime, data, Some(request_config)).await?;
752 MediaSource::Plain(res.content_uri)
753 };
754
755 let uri = match &media_source {
756 MediaSource::Plain(uri) => uri,
757 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
758 };
759 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
760
761 Ok(SentRequestKey::Media(SentMediaInfo {
762 file: media_source,
763 thumbnail: thumbnail_source,
764 #[cfg(feature = "unstable-msc4274")]
765 accumulated,
766 }))
767 };
768
769 let wait_for_cancel = async move {
770 if let Some(rx) = cancel_upload_rx {
771 rx.await
772 } else {
773 std::future::pending().await
774 }
775 };
776
777 tokio::select! {
778 biased;
779
780 _ = wait_for_cancel => {
781 Ok(None)
782 }
783
784 res = fut => {
785 res.map(Some)
786 }
787 }
788 }
789 }
790 }
791
792 pub fn is_enabled(&self) -> bool {
794 self.inner.locally_enabled.load(Ordering::SeqCst)
795 }
796
797 pub fn set_enabled(&self, enabled: bool) {
799 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
800
801 if enabled {
804 self.inner.notifier.notify_one();
805 }
806 }
807}
808
809impl From<&crate::Error> for QueueWedgeError {
810 fn from(value: &crate::Error) -> Self {
811 match value {
812 #[cfg(feature = "e2e-encryption")]
813 crate::Error::OlmError(error) => match &**error {
814 OlmError::SessionRecipientCollectionError(error) => match error {
815 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
816 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
817 }
818
819 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
820 QueueWedgeError::IdentityViolations { users: users.clone() }
821 }
822
823 SessionRecipientCollectionError::CrossSigningNotSetup
824 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
825 QueueWedgeError::CrossVerificationRequired
826 }
827 },
828 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
829 },
830
831 crate::Error::SendQueueWedgeError(error) => *error.clone(),
833
834 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
835 }
836 }
837}
838
839struct RoomSendQueueInner {
840 room: WeakRoom,
842
843 updates: broadcast::Sender<RoomSendQueueUpdate>,
847
848 queue: QueueStorage,
855
856 notifier: Arc<Notify>,
859
860 locally_enabled: Arc<AtomicBool>,
863
864 _task: JoinHandle<()>,
867}
868
869struct BeingSentInfo {
871 transaction_id: OwnedTransactionId,
873
874 cancel_upload: Option<oneshot::Sender<()>>,
877}
878
879impl BeingSentInfo {
880 fn cancel_upload(self) -> bool {
885 if let Some(cancel_upload) = self.cancel_upload {
886 let _ = cancel_upload.send(());
887 true
888 } else {
889 false
890 }
891 }
892}
893
894#[derive(Clone)]
897struct StoreLock {
898 client: WeakClient,
900
901 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
906}
907
908impl StoreLock {
909 async fn lock(&self) -> StoreLockGuard {
911 StoreLockGuard {
912 client: self.client.clone(),
913 being_sent: self.being_sent.clone().lock_owned().await,
914 }
915 }
916}
917
918struct StoreLockGuard {
921 client: WeakClient,
923
924 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
927}
928
929impl StoreLockGuard {
930 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
932 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
933 }
934}
935
936#[derive(Clone)]
937struct QueueStorage {
938 store: StoreLock,
941
942 room_id: OwnedRoomId,
944}
945
946impl QueueStorage {
947 const LOW_PRIORITY: usize = 0;
949
950 const HIGH_PRIORITY: usize = 10;
952
953 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
955 Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } }
956 }
957
958 async fn push(
962 &self,
963 request: QueuedRequestKind,
964 created_at: MilliSecondsSinceUnixEpoch,
965 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
966 let transaction_id = TransactionId::new();
967
968 self.store
969 .lock()
970 .await
971 .client()?
972 .state_store()
973 .save_send_queue_request(
974 &self.room_id,
975 transaction_id.clone(),
976 created_at,
977 request,
978 Self::LOW_PRIORITY,
979 )
980 .await?;
981
982 Ok(transaction_id)
983 }
984
985 async fn peek_next_to_send(
990 &self,
991 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
992 {
993 let mut guard = self.store.lock().await;
994 let queued_requests =
995 guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
996
997 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
998 let (cancel_upload_tx, cancel_upload_rx) =
999 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1000 let (tx, rx) = oneshot::channel();
1001 (Some(tx), Some(rx))
1002 } else {
1003 Default::default()
1004 };
1005
1006 let prev = guard.being_sent.replace(BeingSentInfo {
1007 transaction_id: request.transaction_id.clone(),
1008 cancel_upload: cancel_upload_tx,
1009 });
1010
1011 if let Some(prev) = prev {
1012 error!(
1013 prev_txn = ?prev.transaction_id,
1014 "a previous request was still active while picking a new one"
1015 );
1016 }
1017
1018 Ok(Some((request.clone(), cancel_upload_rx)))
1019 } else {
1020 Ok(None)
1021 }
1022 }
1023
1024 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1028 let was_being_sent = self.store.lock().await.being_sent.take();
1029
1030 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1031 if prev_txn != Some(transaction_id) {
1032 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1033 }
1034 }
1035
1036 async fn mark_as_wedged(
1040 &self,
1041 transaction_id: &TransactionId,
1042 reason: QueueWedgeError,
1043 ) -> Result<(), RoomSendQueueStorageError> {
1044 let mut guard = self.store.lock().await;
1046 let was_being_sent = guard.being_sent.take();
1047
1048 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1049 if prev_txn != Some(transaction_id) {
1050 error!(
1051 ?prev_txn,
1052 "previous active request didn't match that we expect (after permanent error)",
1053 );
1054 }
1055
1056 Ok(guard
1057 .client()?
1058 .state_store()
1059 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1060 .await?)
1061 }
1062
1063 async fn mark_as_unwedged(
1066 &self,
1067 transaction_id: &TransactionId,
1068 ) -> Result<(), RoomSendQueueStorageError> {
1069 Ok(self
1070 .store
1071 .lock()
1072 .await
1073 .client()?
1074 .state_store()
1075 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1076 .await?)
1077 }
1078
1079 async fn mark_as_sent(
1082 &self,
1083 transaction_id: &TransactionId,
1084 parent_key: SentRequestKey,
1085 ) -> Result<(), RoomSendQueueStorageError> {
1086 let mut guard = self.store.lock().await;
1088 let was_being_sent = guard.being_sent.take();
1089
1090 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1091 if prev_txn != Some(transaction_id) {
1092 error!(
1093 ?prev_txn,
1094 "previous active request didn't match that we expect (after successful send)",
1095 );
1096 }
1097
1098 let client = guard.client()?;
1099 let store = client.state_store();
1100
1101 store
1103 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1104 .await?;
1105
1106 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1107
1108 if !removed {
1109 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1110 }
1111
1112 Ok(())
1113 }
1114
1115 async fn cancel_event(
1122 &self,
1123 transaction_id: &TransactionId,
1124 ) -> Result<bool, RoomSendQueueStorageError> {
1125 let guard = self.store.lock().await;
1126
1127 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1128 == Some(transaction_id)
1129 {
1130 guard
1132 .client()?
1133 .state_store()
1134 .save_dependent_queued_request(
1135 &self.room_id,
1136 transaction_id,
1137 ChildTransactionId::new(),
1138 MilliSecondsSinceUnixEpoch::now(),
1139 DependentQueuedRequestKind::RedactEvent,
1140 )
1141 .await?;
1142
1143 return Ok(true);
1144 }
1145
1146 let removed = guard
1147 .client()?
1148 .state_store()
1149 .remove_send_queue_request(&self.room_id, transaction_id)
1150 .await?;
1151
1152 Ok(removed)
1153 }
1154
1155 async fn replace_event(
1162 &self,
1163 transaction_id: &TransactionId,
1164 serializable: SerializableEventContent,
1165 ) -> Result<bool, RoomSendQueueStorageError> {
1166 let guard = self.store.lock().await;
1167
1168 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1169 == Some(transaction_id)
1170 {
1171 guard
1173 .client()?
1174 .state_store()
1175 .save_dependent_queued_request(
1176 &self.room_id,
1177 transaction_id,
1178 ChildTransactionId::new(),
1179 MilliSecondsSinceUnixEpoch::now(),
1180 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1181 )
1182 .await?;
1183
1184 return Ok(true);
1185 }
1186
1187 let edited = guard
1188 .client()?
1189 .state_store()
1190 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1191 .await?;
1192
1193 Ok(edited)
1194 }
1195
1196 #[allow(clippy::too_many_arguments)]
1200 async fn push_media(
1201 &self,
1202 event: RoomMessageEventContent,
1203 content_type: Mime,
1204 send_event_txn: OwnedTransactionId,
1205 created_at: MilliSecondsSinceUnixEpoch,
1206 upload_file_txn: OwnedTransactionId,
1207 file_media_request: MediaRequestParameters,
1208 thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1209 ) -> Result<(), RoomSendQueueStorageError> {
1210 let guard = self.store.lock().await;
1211 let client = guard.client()?;
1212 let store = client.state_store();
1213
1214 let thumbnail_info = self
1215 .push_thumbnail_and_media_uploads(
1216 store,
1217 &content_type,
1218 send_event_txn.clone(),
1219 created_at,
1220 upload_file_txn.clone(),
1221 file_media_request,
1222 thumbnail,
1223 )
1224 .await?;
1225
1226 store
1228 .save_dependent_queued_request(
1229 &self.room_id,
1230 &upload_file_txn,
1231 send_event_txn.into(),
1232 created_at,
1233 DependentQueuedRequestKind::FinishUpload {
1234 local_echo: Box::new(event),
1235 file_upload: upload_file_txn.clone(),
1236 thumbnail_info,
1237 },
1238 )
1239 .await?;
1240
1241 Ok(())
1242 }
1243
1244 #[cfg(feature = "unstable-msc4274")]
1248 #[allow(clippy::too_many_arguments)]
1249 async fn push_gallery(
1250 &self,
1251 event: RoomMessageEventContent,
1252 send_event_txn: OwnedTransactionId,
1253 created_at: MilliSecondsSinceUnixEpoch,
1254 item_queue_infos: Vec<GalleryItemQueueInfo>,
1255 ) -> Result<(), RoomSendQueueStorageError> {
1256 let guard = self.store.lock().await;
1257 let client = guard.client()?;
1258 let store = client.state_store();
1259
1260 let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1261
1262 let Some((first, rest)) = item_queue_infos.split_first() else {
1263 return Ok(());
1264 };
1265
1266 let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1267 first;
1268
1269 let thumbnail_info = self
1270 .push_thumbnail_and_media_uploads(
1271 store,
1272 content_type,
1273 send_event_txn.clone(),
1274 created_at,
1275 upload_file_txn.clone(),
1276 file_media_request.clone(),
1277 thumbnail.clone(),
1278 )
1279 .await?;
1280
1281 finish_item_infos
1282 .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1283
1284 let mut last_upload_file_txn = upload_file_txn.clone();
1285
1286 for item_queue_info in rest {
1287 let GalleryItemQueueInfo {
1288 content_type,
1289 upload_file_txn,
1290 file_media_request,
1291 thumbnail,
1292 } = item_queue_info;
1293
1294 let thumbnail_info =
1295 if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) =
1296 thumbnail
1297 {
1298 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1299
1300 store
1303 .save_dependent_queued_request(
1304 &self.room_id,
1305 &last_upload_file_txn,
1306 upload_thumbnail_txn.clone().into(),
1307 created_at,
1308 DependentQueuedRequestKind::UploadFileOrThumbnail {
1309 content_type: thumbnail_content_type.to_string(),
1310 cache_key: thumbnail_media_request.clone(),
1311 related_to: send_event_txn.clone(),
1312 parent_is_thumbnail_upload: false,
1313 },
1314 )
1315 .await?;
1316
1317 last_upload_file_txn = upload_thumbnail_txn;
1318
1319 Some(thumbnail_info)
1320 } else {
1321 None
1322 };
1323
1324 store
1326 .save_dependent_queued_request(
1327 &self.room_id,
1328 &last_upload_file_txn,
1329 upload_file_txn.clone().into(),
1330 created_at,
1331 DependentQueuedRequestKind::UploadFileOrThumbnail {
1332 content_type: content_type.to_string(),
1333 cache_key: file_media_request.clone(),
1334 related_to: send_event_txn.clone(),
1335 parent_is_thumbnail_upload: thumbnail.is_some(),
1336 },
1337 )
1338 .await?;
1339
1340 finish_item_infos.push(FinishGalleryItemInfo {
1341 file_upload: upload_file_txn.clone(),
1342 thumbnail_info: thumbnail_info.cloned(),
1343 });
1344
1345 last_upload_file_txn = upload_file_txn.clone();
1346 }
1347
1348 store
1351 .save_dependent_queued_request(
1352 &self.room_id,
1353 &last_upload_file_txn,
1354 send_event_txn.into(),
1355 created_at,
1356 DependentQueuedRequestKind::FinishGallery {
1357 local_echo: Box::new(event),
1358 item_infos: finish_item_infos,
1359 },
1360 )
1361 .await?;
1362
1363 Ok(())
1364 }
1365
1366 #[allow(clippy::too_many_arguments)]
1372 async fn push_thumbnail_and_media_uploads(
1373 &self,
1374 store: &DynStateStore,
1375 content_type: &Mime,
1376 send_event_txn: OwnedTransactionId,
1377 created_at: MilliSecondsSinceUnixEpoch,
1378 upload_file_txn: OwnedTransactionId,
1379 file_media_request: MediaRequestParameters,
1380 thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1381 ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1382 if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) = thumbnail {
1383 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1384
1385 store
1387 .save_send_queue_request(
1388 &self.room_id,
1389 upload_thumbnail_txn.clone(),
1390 created_at,
1391 QueuedRequestKind::MediaUpload {
1392 content_type: thumbnail_content_type.to_string(),
1393 cache_key: thumbnail_media_request,
1394 thumbnail_source: None, related_to: send_event_txn.clone(),
1396 #[cfg(feature = "unstable-msc4274")]
1397 accumulated: vec![],
1398 },
1399 Self::LOW_PRIORITY,
1400 )
1401 .await?;
1402
1403 store
1405 .save_dependent_queued_request(
1406 &self.room_id,
1407 &upload_thumbnail_txn,
1408 upload_file_txn.into(),
1409 created_at,
1410 DependentQueuedRequestKind::UploadFileOrThumbnail {
1411 content_type: content_type.to_string(),
1412 cache_key: file_media_request,
1413 related_to: send_event_txn,
1414 #[cfg(feature = "unstable-msc4274")]
1415 parent_is_thumbnail_upload: true,
1416 },
1417 )
1418 .await?;
1419
1420 Ok(Some(thumbnail_info))
1421 } else {
1422 store
1424 .save_send_queue_request(
1425 &self.room_id,
1426 upload_file_txn,
1427 created_at,
1428 QueuedRequestKind::MediaUpload {
1429 content_type: content_type.to_string(),
1430 cache_key: file_media_request,
1431 thumbnail_source: None,
1432 related_to: send_event_txn,
1433 #[cfg(feature = "unstable-msc4274")]
1434 accumulated: vec![],
1435 },
1436 Self::LOW_PRIORITY,
1437 )
1438 .await?;
1439
1440 Ok(None)
1441 }
1442 }
1443
1444 #[instrument(skip(self))]
1446 async fn react(
1447 &self,
1448 transaction_id: &TransactionId,
1449 key: String,
1450 created_at: MilliSecondsSinceUnixEpoch,
1451 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1452 let guard = self.store.lock().await;
1453 let client = guard.client()?;
1454 let store = client.state_store();
1455
1456 let requests = store.load_send_queue_requests(&self.room_id).await?;
1457
1458 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1460 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1463 if !dependent_requests
1464 .into_iter()
1465 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1466 .any(|child_txn| *child_txn == *transaction_id)
1467 {
1468 return Ok(None);
1470 }
1471 }
1472
1473 let reaction_txn_id = ChildTransactionId::new();
1475 store
1476 .save_dependent_queued_request(
1477 &self.room_id,
1478 transaction_id,
1479 reaction_txn_id.clone(),
1480 created_at,
1481 DependentQueuedRequestKind::ReactEvent { key },
1482 )
1483 .await?;
1484
1485 Ok(Some(reaction_txn_id))
1486 }
1487
1488 async fn local_echoes(
1491 &self,
1492 room: &RoomSendQueue,
1493 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1494 let guard = self.store.lock().await;
1495 let client = guard.client()?;
1496 let store = client.state_store();
1497
1498 let local_requests =
1499 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1500 Some(LocalEcho {
1501 transaction_id: queued.transaction_id.clone(),
1502 content: match queued.kind {
1503 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1504 serialized_event: content,
1505 send_handle: SendHandle {
1506 room: room.clone(),
1507 transaction_id: queued.transaction_id,
1508 media_handles: vec![],
1509 created_at: queued.created_at,
1510 },
1511 send_error: queued.error,
1512 },
1513
1514 QueuedRequestKind::MediaUpload { .. } => {
1515 return None;
1518 }
1519 },
1520 })
1521 });
1522
1523 let reactions_and_medias = store
1524 .load_dependent_queued_requests(&self.room_id)
1525 .await?
1526 .into_iter()
1527 .filter_map(|dep| match dep.kind {
1528 DependentQueuedRequestKind::EditEvent { .. }
1529 | DependentQueuedRequestKind::RedactEvent => {
1530 None
1532 }
1533
1534 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1535 transaction_id: dep.own_transaction_id.clone().into(),
1536 content: LocalEchoContent::React {
1537 key,
1538 send_handle: SendReactionHandle {
1539 room: room.clone(),
1540 transaction_id: dep.own_transaction_id,
1541 },
1542 applies_to: dep.parent_transaction_id,
1543 },
1544 }),
1545
1546 DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1547 None
1549 }
1550
1551 DependentQueuedRequestKind::FinishUpload {
1552 local_echo,
1553 file_upload,
1554 thumbnail_info,
1555 } => {
1556 Some(LocalEcho {
1558 transaction_id: dep.own_transaction_id.clone().into(),
1559 content: LocalEchoContent::Event {
1560 serialized_event: SerializableEventContent::new(&(*local_echo).into())
1561 .ok()?,
1562 send_handle: SendHandle {
1563 room: room.clone(),
1564 transaction_id: dep.own_transaction_id.into(),
1565 media_handles: vec![MediaHandles {
1566 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1567 upload_file_txn: file_upload,
1568 }],
1569 created_at: dep.created_at,
1570 },
1571 send_error: None,
1572 },
1573 })
1574 }
1575
1576 #[cfg(feature = "unstable-msc4274")]
1577 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1578 self.create_gallery_local_echo(
1580 dep.own_transaction_id,
1581 room,
1582 dep.created_at,
1583 local_echo,
1584 item_infos,
1585 )
1586 }
1587 });
1588
1589 Ok(local_requests.chain(reactions_and_medias).collect())
1590 }
1591
1592 #[cfg(feature = "unstable-msc4274")]
1594 fn create_gallery_local_echo(
1595 &self,
1596 transaction_id: ChildTransactionId,
1597 room: &RoomSendQueue,
1598 created_at: MilliSecondsSinceUnixEpoch,
1599 local_echo: Box<RoomMessageEventContent>,
1600 item_infos: Vec<FinishGalleryItemInfo>,
1601 ) -> Option<LocalEcho> {
1602 Some(LocalEcho {
1603 transaction_id: transaction_id.clone().into(),
1604 content: LocalEchoContent::Event {
1605 serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1606 send_handle: SendHandle {
1607 room: room.clone(),
1608 transaction_id: transaction_id.into(),
1609 media_handles: item_infos
1610 .into_iter()
1611 .map(|i| MediaHandles {
1612 upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1613 upload_file_txn: i.file_upload,
1614 })
1615 .collect(),
1616 created_at,
1617 },
1618 send_error: None,
1619 },
1620 })
1621 }
1622
1623 #[instrument(skip_all)]
1631 async fn try_apply_single_dependent_request(
1632 &self,
1633 client: &Client,
1634 dependent_request: DependentQueuedRequest,
1635 new_updates: &mut Vec<RoomSendQueueUpdate>,
1636 ) -> Result<bool, RoomSendQueueError> {
1637 let store = client.state_store();
1638
1639 let parent_key = dependent_request.parent_key;
1640
1641 match dependent_request.kind {
1642 DependentQueuedRequestKind::EditEvent { new_content } => {
1643 if let Some(parent_key) = parent_key {
1644 let Some(event_id) = parent_key.into_event_id() else {
1645 return Err(RoomSendQueueError::StorageError(
1646 RoomSendQueueStorageError::InvalidParentKey,
1647 ));
1648 };
1649
1650 let room = client
1652 .get_room(&self.room_id)
1653 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1654
1655 let edited_content = match new_content.deserialize() {
1659 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1660 EditedContent::RoomMessage(c.into())
1662 }
1663
1664 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1665 let poll_start = c.poll_start().clone();
1666 EditedContent::PollStart {
1667 fallback_text: poll_start.question.text.clone(),
1668 new_content: poll_start,
1669 }
1670 }
1671
1672 Ok(c) => {
1673 warn!("Unsupported edit content type: {:?}", c.event_type());
1674 return Ok(true);
1675 }
1676
1677 Err(err) => {
1678 warn!("Unable to deserialize: {err}");
1679 return Ok(true);
1680 }
1681 };
1682
1683 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1684 Ok(e) => e,
1685 Err(err) => {
1686 warn!("couldn't create edited event: {err}");
1687 return Ok(true);
1688 }
1689 };
1690
1691 let serializable = SerializableEventContent::from_raw(
1693 Raw::new(&edit_event)
1694 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1695 edit_event.event_type().to_string(),
1696 );
1697
1698 store
1699 .save_send_queue_request(
1700 &self.room_id,
1701 dependent_request.own_transaction_id.into(),
1702 dependent_request.created_at,
1703 serializable.into(),
1704 Self::HIGH_PRIORITY,
1705 )
1706 .await
1707 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1708 } else {
1709 let edited = store
1711 .update_send_queue_request(
1712 &self.room_id,
1713 &dependent_request.parent_transaction_id,
1714 new_content.into(),
1715 )
1716 .await
1717 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1718
1719 if !edited {
1720 warn!("missing local echo upon dependent edit");
1721 }
1722 }
1723 }
1724
1725 DependentQueuedRequestKind::RedactEvent => {
1726 if let Some(parent_key) = parent_key {
1727 let Some(event_id) = parent_key.into_event_id() else {
1728 return Err(RoomSendQueueError::StorageError(
1729 RoomSendQueueStorageError::InvalidParentKey,
1730 ));
1731 };
1732
1733 let room = client
1735 .get_room(&self.room_id)
1736 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1737
1738 if let Err(err) = room
1746 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1747 .await
1748 {
1749 warn!("error when sending a redact for {event_id}: {err}");
1750 return Ok(false);
1751 }
1752 } else {
1753 let removed = store
1756 .remove_send_queue_request(
1757 &self.room_id,
1758 &dependent_request.parent_transaction_id,
1759 )
1760 .await
1761 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1762
1763 if !removed {
1764 warn!("missing local echo upon dependent redact");
1765 }
1766 }
1767 }
1768
1769 DependentQueuedRequestKind::ReactEvent { key } => {
1770 if let Some(parent_key) = parent_key {
1771 let Some(parent_event_id) = parent_key.into_event_id() else {
1772 return Err(RoomSendQueueError::StorageError(
1773 RoomSendQueueStorageError::InvalidParentKey,
1774 ));
1775 };
1776
1777 let react_event =
1779 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1780 let serializable = SerializableEventContent::from_raw(
1781 Raw::new(&react_event)
1782 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1783 react_event.event_type().to_string(),
1784 );
1785
1786 store
1787 .save_send_queue_request(
1788 &self.room_id,
1789 dependent_request.own_transaction_id.into(),
1790 dependent_request.created_at,
1791 serializable.into(),
1792 Self::HIGH_PRIORITY,
1793 )
1794 .await
1795 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1796 } else {
1797 return Ok(false);
1799 }
1800 }
1801
1802 DependentQueuedRequestKind::UploadFileOrThumbnail {
1803 content_type,
1804 cache_key,
1805 related_to,
1806 #[cfg(feature = "unstable-msc4274")]
1807 parent_is_thumbnail_upload,
1808 } => {
1809 let Some(parent_key) = parent_key else {
1810 return Ok(false);
1812 };
1813 let parent_is_thumbnail_upload = {
1814 cfg_if::cfg_if! {
1815 if #[cfg(feature = "unstable-msc4274")] {
1816 parent_is_thumbnail_upload
1817 } else {
1818 true
1822 }
1823 }
1824 };
1825 self.handle_dependent_file_or_thumbnail_upload(
1826 client,
1827 dependent_request.own_transaction_id.into(),
1828 parent_key,
1829 content_type,
1830 cache_key,
1831 related_to,
1832 parent_is_thumbnail_upload,
1833 )
1834 .await?;
1835 }
1836
1837 DependentQueuedRequestKind::FinishUpload {
1838 local_echo,
1839 file_upload,
1840 thumbnail_info,
1841 } => {
1842 let Some(parent_key) = parent_key else {
1843 return Ok(false);
1845 };
1846 self.handle_dependent_finish_upload(
1847 client,
1848 dependent_request.own_transaction_id.into(),
1849 parent_key,
1850 *local_echo,
1851 file_upload,
1852 thumbnail_info,
1853 new_updates,
1854 )
1855 .await?;
1856 }
1857
1858 #[cfg(feature = "unstable-msc4274")]
1859 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1860 let Some(parent_key) = parent_key else {
1861 return Ok(false);
1863 };
1864 self.handle_dependent_finish_gallery_upload(
1865 client,
1866 dependent_request.own_transaction_id.into(),
1867 parent_key,
1868 *local_echo,
1869 item_infos,
1870 new_updates,
1871 )
1872 .await?;
1873 }
1874 }
1875
1876 Ok(true)
1877 }
1878
1879 #[instrument(skip(self))]
1880 async fn apply_dependent_requests(
1881 &self,
1882 new_updates: &mut Vec<RoomSendQueueUpdate>,
1883 ) -> Result<(), RoomSendQueueError> {
1884 let guard = self.store.lock().await;
1885
1886 let client = guard.client()?;
1887 let store = client.state_store();
1888
1889 let dependent_requests = store
1890 .load_dependent_queued_requests(&self.room_id)
1891 .await
1892 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1893
1894 let num_initial_dependent_requests = dependent_requests.len();
1895 if num_initial_dependent_requests == 0 {
1896 return Ok(());
1898 }
1899
1900 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
1901
1902 for original in &dependent_requests {
1904 if !canonicalized_dependent_requests
1905 .iter()
1906 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
1907 {
1908 store
1909 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
1910 .await
1911 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1912 }
1913 }
1914
1915 let mut num_dependent_requests = canonicalized_dependent_requests.len();
1916
1917 debug!(
1918 num_dependent_requests,
1919 num_initial_dependent_requests, "starting handling of dependent requests"
1920 );
1921
1922 for dependent in canonicalized_dependent_requests {
1923 let dependent_id = dependent.own_transaction_id.clone();
1924
1925 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
1926 Ok(should_remove) => {
1927 if should_remove {
1928 store
1930 .remove_dependent_queued_request(&self.room_id, &dependent_id)
1931 .await
1932 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1933
1934 num_dependent_requests -= 1;
1935 }
1936 }
1937
1938 Err(err) => {
1939 warn!("error when applying single dependent request: {err}");
1940 }
1941 }
1942 }
1943
1944 debug!(
1945 leftover_dependent_requests = num_dependent_requests,
1946 "stopped handling dependent request"
1947 );
1948
1949 Ok(())
1950 }
1951
1952 async fn remove_dependent_send_queue_request(
1954 &self,
1955 dependent_event_id: &ChildTransactionId,
1956 ) -> Result<bool, RoomSendQueueStorageError> {
1957 Ok(self
1958 .store
1959 .lock()
1960 .await
1961 .client()?
1962 .state_store()
1963 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
1964 .await?)
1965 }
1966}
1967
1968#[cfg(feature = "unstable-msc4274")]
1969struct GalleryItemQueueInfo {
1971 content_type: Mime,
1972 upload_file_txn: OwnedTransactionId,
1973 file_media_request: MediaRequestParameters,
1974 thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1975}
1976
1977#[derive(Clone, Debug)]
1979pub enum LocalEchoContent {
1980 Event {
1982 serialized_event: SerializableEventContent,
1985 send_handle: SendHandle,
1987 send_error: Option<QueueWedgeError>,
1990 },
1991
1992 React {
1994 key: String,
1996 send_handle: SendReactionHandle,
1998 applies_to: OwnedTransactionId,
2000 },
2001}
2002
2003#[derive(Clone, Debug)]
2006pub struct LocalEcho {
2007 pub transaction_id: OwnedTransactionId,
2009 pub content: LocalEchoContent,
2011}
2012
2013#[derive(Clone, Debug)]
2016pub enum RoomSendQueueUpdate {
2017 NewLocalEvent(LocalEcho),
2022
2023 CancelledLocalEvent {
2026 transaction_id: OwnedTransactionId,
2028 },
2029
2030 ReplacedLocalEvent {
2032 transaction_id: OwnedTransactionId,
2034
2035 new_content: SerializableEventContent,
2037 },
2038
2039 SendError {
2044 transaction_id: OwnedTransactionId,
2046 error: Arc<crate::Error>,
2048 is_recoverable: bool,
2054 },
2055
2056 RetryEvent {
2058 transaction_id: OwnedTransactionId,
2060 },
2061
2062 SentEvent {
2065 transaction_id: OwnedTransactionId,
2067 event_id: OwnedEventId,
2069 },
2070
2071 UploadedMedia {
2073 related_to: OwnedTransactionId,
2075
2076 file: MediaSource,
2078 },
2079}
2080
2081#[derive(Debug, thiserror::Error)]
2083pub enum RoomSendQueueError {
2084 #[error("the room isn't in the joined state")]
2086 RoomNotJoined,
2087
2088 #[error("the room is now missing from the client")]
2092 RoomDisappeared,
2093
2094 #[error(transparent)]
2096 StorageError(#[from] RoomSendQueueStorageError),
2097
2098 #[error("the attachment event could not be created")]
2100 FailedToCreateAttachment,
2101
2102 #[cfg(feature = "unstable-msc4274")]
2104 #[error("the gallery contains no items")]
2105 EmptyGallery,
2106
2107 #[cfg(feature = "unstable-msc4274")]
2109 #[error("the gallery event could not be created")]
2110 FailedToCreateGallery,
2111}
2112
2113#[derive(Debug, thiserror::Error)]
2115pub enum RoomSendQueueStorageError {
2116 #[error(transparent)]
2118 StateStoreError(#[from] StoreError),
2119
2120 #[error(transparent)]
2122 EventCacheStoreError(#[from] EventCacheStoreError),
2123
2124 #[error(transparent)]
2126 LockError(#[from] LockStoreError),
2127
2128 #[error(transparent)]
2130 JsonSerialization(#[from] serde_json::Error),
2131
2132 #[error("a dependent event had an invalid parent key type")]
2135 InvalidParentKey,
2136
2137 #[error("The client is shutting down.")]
2139 ClientShuttingDown,
2140
2141 #[error("This operation is not implemented for media uploads")]
2143 OperationNotImplementedYet,
2144
2145 #[error("Can't edit a media caption when the underlying event isn't a media")]
2147 InvalidMediaCaptionEdit,
2148}
2149
2150#[derive(Clone, Debug)]
2152struct MediaHandles {
2153 upload_thumbnail_txn: Option<OwnedTransactionId>,
2157
2158 upload_file_txn: OwnedTransactionId,
2160}
2161
2162#[derive(Clone, Debug)]
2164pub struct SendHandle {
2165 room: RoomSendQueue,
2167
2168 transaction_id: OwnedTransactionId,
2173
2174 media_handles: Vec<MediaHandles>,
2176
2177 pub created_at: MilliSecondsSinceUnixEpoch,
2179}
2180
2181impl SendHandle {
2182 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2183 if !self.media_handles.is_empty() {
2184 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2185 } else {
2186 Ok(())
2187 }
2188 }
2189
2190 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2195 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2196 trace!("received an abort request");
2197
2198 let queue = &self.room.inner.queue;
2199
2200 for handles in &self.media_handles {
2201 if queue.abort_upload(&self.transaction_id, handles).await? {
2202 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2204 transaction_id: self.transaction_id.clone(),
2205 });
2206
2207 return Ok(true);
2208 }
2209
2210 }
2214
2215 if queue.cancel_event(&self.transaction_id).await? {
2216 trace!("successful abort");
2217
2218 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2220 transaction_id: self.transaction_id.clone(),
2221 });
2222
2223 Ok(true)
2224 } else {
2225 debug!("local echo didn't exist anymore, can't abort");
2226 Ok(false)
2227 }
2228 }
2229
2230 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2235 pub async fn edit_raw(
2236 &self,
2237 new_content: Raw<AnyMessageLikeEventContent>,
2238 event_type: String,
2239 ) -> Result<bool, RoomSendQueueStorageError> {
2240 trace!("received an edit request");
2241 self.nyi_for_uploads()?;
2242
2243 let serializable = SerializableEventContent::from_raw(new_content, event_type);
2244
2245 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2246 trace!("successful edit");
2247
2248 self.room.inner.notifier.notify_one();
2250
2251 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2253 transaction_id: self.transaction_id.clone(),
2254 new_content: serializable,
2255 });
2256
2257 Ok(true)
2258 } else {
2259 debug!("local echo doesn't exist anymore, can't edit");
2260 Ok(false)
2261 }
2262 }
2263
2264 pub async fn edit(
2269 &self,
2270 new_content: AnyMessageLikeEventContent,
2271 ) -> Result<bool, RoomSendQueueStorageError> {
2272 self.edit_raw(
2273 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2274 new_content.event_type().to_string(),
2275 )
2276 .await
2277 }
2278
2279 pub async fn edit_media_caption(
2284 &self,
2285 caption: Option<String>,
2286 formatted_caption: Option<FormattedBody>,
2287 mentions: Option<Mentions>,
2288 ) -> Result<bool, RoomSendQueueStorageError> {
2289 if let Some(new_content) = self
2290 .room
2291 .inner
2292 .queue
2293 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2294 .await?
2295 {
2296 trace!("successful edit of media caption");
2297
2298 self.room.inner.notifier.notify_one();
2300
2301 let new_content = SerializableEventContent::new(&new_content)
2302 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2303
2304 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2306 transaction_id: self.transaction_id.clone(),
2307 new_content,
2308 });
2309
2310 Ok(true)
2311 } else {
2312 debug!("local echo doesn't exist anymore, can't edit media caption");
2313 Ok(false)
2314 }
2315 }
2316
2317 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2320 let room = &self.room.inner;
2321 room.queue
2322 .mark_as_unwedged(&self.transaction_id)
2323 .await
2324 .map_err(RoomSendQueueError::StorageError)?;
2325
2326 for handles in &self.media_handles {
2334 room.queue
2335 .mark_as_unwedged(&handles.upload_file_txn)
2336 .await
2337 .map_err(RoomSendQueueError::StorageError)?;
2338
2339 if let Some(txn) = &handles.upload_thumbnail_txn {
2340 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2341 }
2342 }
2343
2344 room.notifier.notify_one();
2346
2347 let _ = room
2348 .updates
2349 .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() });
2350
2351 Ok(())
2352 }
2353
2354 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2359 pub async fn react(
2360 &self,
2361 key: String,
2362 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2363 trace!("received an intent to react");
2364
2365 let created_at = MilliSecondsSinceUnixEpoch::now();
2366 if let Some(reaction_txn_id) =
2367 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2368 {
2369 trace!("successfully queued react");
2370
2371 self.room.inner.notifier.notify_one();
2373
2374 let send_handle = SendReactionHandle {
2376 room: self.room.clone(),
2377 transaction_id: reaction_txn_id.clone(),
2378 };
2379
2380 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2381 transaction_id: reaction_txn_id.into(),
2384 content: LocalEchoContent::React {
2385 key,
2386 send_handle: send_handle.clone(),
2387 applies_to: self.transaction_id.clone(),
2388 },
2389 }));
2390
2391 Ok(Some(send_handle))
2392 } else {
2393 debug!("local echo doesn't exist anymore, can't react");
2394 Ok(None)
2395 }
2396 }
2397}
2398
2399#[derive(Clone, Debug)]
2401pub struct SendReactionHandle {
2402 room: RoomSendQueue,
2404 transaction_id: ChildTransactionId,
2406}
2407
2408impl SendReactionHandle {
2409 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2414 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2415 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2419 transaction_id: self.transaction_id.clone().into(),
2420 });
2421
2422 return Ok(true);
2423 }
2424
2425 let handle = SendHandle {
2428 room: self.room.clone(),
2429 transaction_id: self.transaction_id.clone().into(),
2430 media_handles: vec![],
2431 created_at: MilliSecondsSinceUnixEpoch::now(),
2432 };
2433
2434 handle.abort().await
2435 }
2436
2437 pub fn transaction_id(&self) -> &TransactionId {
2439 &self.transaction_id
2440 }
2441}
2442
2443fn canonicalize_dependent_requests(
2447 dependent: &[DependentQueuedRequest],
2448) -> Vec<DependentQueuedRequest> {
2449 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2450
2451 for d in dependent {
2452 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2453
2454 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2455 continue;
2458 }
2459
2460 match &d.kind {
2461 DependentQueuedRequestKind::EditEvent { .. } => {
2462 if let Some(prev_edit) = prevs
2464 .iter_mut()
2465 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2466 {
2467 *prev_edit = d;
2468 } else {
2469 prevs.insert(0, d);
2470 }
2471 }
2472
2473 DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2474 | DependentQueuedRequestKind::FinishUpload { .. }
2475 | DependentQueuedRequestKind::ReactEvent { .. } => {
2476 prevs.push(d);
2478 }
2479
2480 #[cfg(feature = "unstable-msc4274")]
2481 DependentQueuedRequestKind::FinishGallery { .. } => {
2482 prevs.push(d);
2484 }
2485
2486 DependentQueuedRequestKind::RedactEvent => {
2487 prevs.clear();
2489 prevs.push(d);
2490 }
2491 }
2492 }
2493
2494 by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2495}
2496
2497#[cfg(all(test, not(target_family = "wasm")))]
2498mod tests {
2499 use std::{sync::Arc, time::Duration};
2500
2501 use assert_matches2::{assert_let, assert_matches};
2502 use matrix_sdk_base::store::{
2503 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2504 SerializableEventContent,
2505 };
2506 use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2507 use ruma::{
2508 events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2509 room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2510 };
2511
2512 use super::canonicalize_dependent_requests;
2513 use crate::{client::WeakClient, test_utils::logged_in_client};
2514
2515 #[test]
2516 fn test_canonicalize_dependent_events_created_at() {
2517 let txn = TransactionId::new();
2520 let created_at = MilliSecondsSinceUnixEpoch::now();
2521
2522 let edit = DependentQueuedRequest {
2523 own_transaction_id: ChildTransactionId::new(),
2524 parent_transaction_id: txn.clone(),
2525 kind: DependentQueuedRequestKind::EditEvent {
2526 new_content: SerializableEventContent::new(
2527 &RoomMessageEventContent::text_plain("edit").into(),
2528 )
2529 .unwrap(),
2530 },
2531 parent_key: None,
2532 created_at,
2533 };
2534
2535 let res = canonicalize_dependent_requests(&[edit]);
2536
2537 assert_eq!(res.len(), 1);
2538 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2539 assert_let!(
2540 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2541 );
2542 assert_eq!(msg.body(), "edit");
2543 assert_eq!(res[0].parent_transaction_id, txn);
2544 assert_eq!(res[0].created_at, created_at);
2545 }
2546
2547 #[async_test]
2548 async fn test_client_no_cycle_with_send_queue() {
2549 for enabled in [true, false] {
2550 let client = logged_in_client(None).await;
2551 let weak_client = WeakClient::from_client(&client);
2552
2553 {
2554 let mut sync_response_builder = SyncResponseBuilder::new();
2555
2556 let room_id = room_id!("!a:b.c");
2557
2558 client
2560 .base_client()
2561 .receive_sync_response(
2562 sync_response_builder
2563 .add_joined_room(JoinedRoomBuilder::new(room_id))
2564 .build_sync_response(),
2565 )
2566 .await
2567 .unwrap();
2568
2569 let room = client.get_room(room_id).unwrap();
2570 let q = room.send_queue();
2571
2572 let _watcher = q.subscribe().await;
2573
2574 client.send_queue().set_enabled(enabled).await;
2575 }
2576
2577 drop(client);
2578
2579 tokio::time::sleep(Duration::from_millis(500)).await;
2581
2582 let client = weak_client.get();
2584 assert!(
2585 client.is_none(),
2586 "too many strong references to the client: {}",
2587 Arc::strong_count(&client.unwrap().inner)
2588 );
2589 }
2590 }
2591
2592 #[test]
2593 fn test_canonicalize_dependent_events_smoke_test() {
2594 let txn = TransactionId::new();
2596
2597 let edit = DependentQueuedRequest {
2598 own_transaction_id: ChildTransactionId::new(),
2599 parent_transaction_id: txn.clone(),
2600 kind: DependentQueuedRequestKind::EditEvent {
2601 new_content: SerializableEventContent::new(
2602 &RoomMessageEventContent::text_plain("edit").into(),
2603 )
2604 .unwrap(),
2605 },
2606 parent_key: None,
2607 created_at: MilliSecondsSinceUnixEpoch::now(),
2608 };
2609 let res = canonicalize_dependent_requests(&[edit]);
2610
2611 assert_eq!(res.len(), 1);
2612 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2613 assert_eq!(res[0].parent_transaction_id, txn);
2614 assert!(res[0].parent_key.is_none());
2615 }
2616
2617 #[test]
2618 fn test_canonicalize_dependent_events_redaction_preferred() {
2619 let txn = TransactionId::new();
2621
2622 let mut inputs = Vec::with_capacity(100);
2623 let redact = DependentQueuedRequest {
2624 own_transaction_id: ChildTransactionId::new(),
2625 parent_transaction_id: txn.clone(),
2626 kind: DependentQueuedRequestKind::RedactEvent,
2627 parent_key: None,
2628 created_at: MilliSecondsSinceUnixEpoch::now(),
2629 };
2630
2631 let edit = DependentQueuedRequest {
2632 own_transaction_id: ChildTransactionId::new(),
2633 parent_transaction_id: txn.clone(),
2634 kind: DependentQueuedRequestKind::EditEvent {
2635 new_content: SerializableEventContent::new(
2636 &RoomMessageEventContent::text_plain("edit").into(),
2637 )
2638 .unwrap(),
2639 },
2640 parent_key: None,
2641 created_at: MilliSecondsSinceUnixEpoch::now(),
2642 };
2643
2644 inputs.push({
2645 let mut edit = edit.clone();
2646 edit.own_transaction_id = ChildTransactionId::new();
2647 edit
2648 });
2649
2650 inputs.push(redact);
2651
2652 for _ in 0..98 {
2653 let mut edit = edit.clone();
2654 edit.own_transaction_id = ChildTransactionId::new();
2655 inputs.push(edit);
2656 }
2657
2658 let res = canonicalize_dependent_requests(&inputs);
2659
2660 assert_eq!(res.len(), 1);
2661 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2662 assert_eq!(res[0].parent_transaction_id, txn);
2663 }
2664
2665 #[test]
2666 fn test_canonicalize_dependent_events_last_edit_preferred() {
2667 let parent_txn = TransactionId::new();
2668
2669 let inputs = (0..10)
2671 .map(|i| DependentQueuedRequest {
2672 own_transaction_id: ChildTransactionId::new(),
2673 parent_transaction_id: parent_txn.clone(),
2674 kind: DependentQueuedRequestKind::EditEvent {
2675 new_content: SerializableEventContent::new(
2676 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2677 )
2678 .unwrap(),
2679 },
2680 parent_key: None,
2681 created_at: MilliSecondsSinceUnixEpoch::now(),
2682 })
2683 .collect::<Vec<_>>();
2684
2685 let txn = inputs[9].parent_transaction_id.clone();
2686
2687 let res = canonicalize_dependent_requests(&inputs);
2688
2689 assert_eq!(res.len(), 1);
2690 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2691 assert_let!(
2692 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2693 );
2694 assert_eq!(msg.body(), "edit9");
2695 assert_eq!(res[0].parent_transaction_id, txn);
2696 }
2697
2698 #[test]
2699 fn test_canonicalize_multiple_local_echoes() {
2700 let txn1 = TransactionId::new();
2701 let txn2 = TransactionId::new();
2702
2703 let child1 = ChildTransactionId::new();
2704 let child2 = ChildTransactionId::new();
2705
2706 let inputs = vec![
2707 DependentQueuedRequest {
2709 own_transaction_id: child1.clone(),
2710 kind: DependentQueuedRequestKind::RedactEvent,
2711 parent_transaction_id: txn1.clone(),
2712 parent_key: None,
2713 created_at: MilliSecondsSinceUnixEpoch::now(),
2714 },
2715 DependentQueuedRequest {
2717 own_transaction_id: child2,
2718 kind: DependentQueuedRequestKind::EditEvent {
2719 new_content: SerializableEventContent::new(
2720 &RoomMessageEventContent::text_plain("edit").into(),
2721 )
2722 .unwrap(),
2723 },
2724 parent_transaction_id: txn2.clone(),
2725 parent_key: None,
2726 created_at: MilliSecondsSinceUnixEpoch::now(),
2727 },
2728 ];
2729
2730 let res = canonicalize_dependent_requests(&inputs);
2731
2732 assert_eq!(res.len(), 2);
2734
2735 for dependent in res {
2736 if dependent.own_transaction_id == child1 {
2737 assert_eq!(dependent.parent_transaction_id, txn1);
2738 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2739 } else {
2740 assert_eq!(dependent.parent_transaction_id, txn2);
2741 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2742 }
2743 }
2744 }
2745
2746 #[test]
2747 fn test_canonicalize_reactions_after_edits() {
2748 let txn = TransactionId::new();
2750
2751 let react_id = ChildTransactionId::new();
2752 let react = DependentQueuedRequest {
2753 own_transaction_id: react_id.clone(),
2754 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2755 parent_transaction_id: txn.clone(),
2756 parent_key: None,
2757 created_at: MilliSecondsSinceUnixEpoch::now(),
2758 };
2759
2760 let edit_id = ChildTransactionId::new();
2761 let edit = DependentQueuedRequest {
2762 own_transaction_id: edit_id.clone(),
2763 kind: DependentQueuedRequestKind::EditEvent {
2764 new_content: SerializableEventContent::new(
2765 &RoomMessageEventContent::text_plain("edit").into(),
2766 )
2767 .unwrap(),
2768 },
2769 parent_transaction_id: txn,
2770 parent_key: None,
2771 created_at: MilliSecondsSinceUnixEpoch::now(),
2772 };
2773
2774 let res = canonicalize_dependent_requests(&[react, edit]);
2775
2776 assert_eq!(res.len(), 2);
2777 assert_eq!(res[0].own_transaction_id, edit_id);
2778 assert_eq!(res[1].own_transaction_id, react_id);
2779 }
2780}