1use std::{
132 collections::{BTreeMap, HashMap},
133 str::FromStr as _,
134 sync::{
135 atomic::{AtomicBool, Ordering},
136 Arc, RwLock,
137 },
138};
139
140use as_variant::as_variant;
141use matrix_sdk_base::{
142 event_cache::store::EventCacheStoreError,
143 media::MediaRequestParameters,
144 store::{
145 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
146 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
147 SentMediaInfo, SentRequestKey, SerializableEventContent,
148 },
149 store_locks::LockStoreError,
150 RoomState, StoreError,
151};
152use matrix_sdk_common::executor::{spawn, JoinHandle};
153use mime::Mime;
154use ruma::{
155 events::{
156 reaction::ReactionEventContent,
157 relation::Annotation,
158 room::{
159 message::{FormattedBody, RoomMessageEventContent},
160 MediaSource,
161 },
162 AnyMessageLikeEventContent, EventContent as _, Mentions,
163 },
164 serde::Raw,
165 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
166};
167use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
168use tracing::{debug, error, info, instrument, trace, warn};
169
170#[cfg(feature = "e2e-encryption")]
171use crate::crypto::{OlmError, SessionRecipientCollectionError};
172use crate::{
173 client::WeakClient,
174 config::RequestConfig,
175 error::RetryKind,
176 room::{edit::EditedContent, WeakRoom},
177 Client, Media, Room,
178};
179
180mod upload;
181
182pub struct SendQueue {
184 client: Client,
185}
186
187#[cfg(not(tarpaulin_include))]
188impl std::fmt::Debug for SendQueue {
189 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190 f.debug_struct("SendQueue").finish_non_exhaustive()
191 }
192}
193
194impl SendQueue {
195 pub(super) fn new(client: Client) -> Self {
196 Self { client }
197 }
198
199 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
202 if !self.is_enabled() {
203 return;
204 }
205
206 let room_ids =
207 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
208 |err| {
209 warn!("error when loading rooms with unsent requests: {err}");
210 Vec::new()
211 },
212 );
213
214 for room_id in room_ids {
216 if let Some(room) = self.client.get_room(&room_id) {
217 let _ = self.for_room(room);
218 }
219 }
220 }
221
222 #[inline(always)]
224 fn data(&self) -> &SendQueueData {
225 &self.client.inner.send_queue_data
226 }
227
228 fn for_room(&self, room: Room) -> RoomSendQueue {
231 let data = self.data();
232
233 let mut map = data.rooms.write().unwrap();
234
235 let room_id = room.room_id();
236 if let Some(room_q) = map.get(room_id).cloned() {
237 return room_q;
238 }
239
240 let owned_room_id = room_id.to_owned();
241 let room_q = RoomSendQueue::new(
242 self.is_enabled(),
243 data.error_reporter.clone(),
244 data.is_dropping.clone(),
245 &self.client,
246 owned_room_id.clone(),
247 );
248
249 map.insert(owned_room_id, room_q.clone());
250
251 room_q
252 }
253
254 pub async fn set_enabled(&self, enabled: bool) {
264 debug!(?enabled, "setting global send queue enablement");
265
266 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
267
268 for room in self.data().rooms.read().unwrap().values() {
270 room.set_enabled(enabled);
271 }
272
273 self.respawn_tasks_for_rooms_with_unsent_requests().await;
276 }
277
278 pub fn is_enabled(&self) -> bool {
281 self.data().globally_enabled.load(Ordering::SeqCst)
282 }
283
284 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
287 self.data().error_reporter.subscribe()
288 }
289}
290
291#[derive(Clone, Debug)]
293pub struct SendQueueRoomError {
294 pub room_id: OwnedRoomId,
296
297 pub error: Arc<crate::Error>,
299
300 pub is_recoverable: bool,
306}
307
308impl Client {
309 pub fn send_queue(&self) -> SendQueue {
312 SendQueue::new(self.clone())
313 }
314}
315
316pub(super) struct SendQueueData {
317 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
319
320 globally_enabled: AtomicBool,
325
326 error_reporter: broadcast::Sender<SendQueueRoomError>,
328
329 is_dropping: Arc<AtomicBool>,
331}
332
333impl SendQueueData {
334 pub fn new(globally_enabled: bool) -> Self {
336 let (sender, _) = broadcast::channel(32);
337
338 Self {
339 rooms: Default::default(),
340 globally_enabled: AtomicBool::new(globally_enabled),
341 error_reporter: sender,
342 is_dropping: Arc::new(false.into()),
343 }
344 }
345}
346
347impl Drop for SendQueueData {
348 fn drop(&mut self) {
349 debug!("globally dropping the send queue");
352 self.is_dropping.store(true, Ordering::SeqCst);
353
354 let rooms = self.rooms.read().unwrap();
355 for room in rooms.values() {
356 room.inner.notifier.notify_one();
357 }
358 }
359}
360
361impl Room {
362 pub fn send_queue(&self) -> RoomSendQueue {
364 self.client.send_queue().for_room(self.clone())
365 }
366}
367
368#[derive(Clone)]
372pub struct RoomSendQueue {
373 inner: Arc<RoomSendQueueInner>,
374}
375
376#[cfg(not(tarpaulin_include))]
377impl std::fmt::Debug for RoomSendQueue {
378 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
380 }
381}
382
383impl RoomSendQueue {
384 fn new(
385 globally_enabled: bool,
386 global_error_reporter: broadcast::Sender<SendQueueRoomError>,
387 is_dropping: Arc<AtomicBool>,
388 client: &Client,
389 room_id: OwnedRoomId,
390 ) -> Self {
391 let (updates_sender, _) = broadcast::channel(32);
392
393 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
394 let notifier = Arc::new(Notify::new());
395
396 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
397 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
398
399 let task = spawn(Self::sending_task(
400 weak_room.clone(),
401 queue.clone(),
402 notifier.clone(),
403 updates_sender.clone(),
404 locally_enabled.clone(),
405 global_error_reporter,
406 is_dropping,
407 ));
408
409 Self {
410 inner: Arc::new(RoomSendQueueInner {
411 room: weak_room,
412 updates: updates_sender,
413 _task: task,
414 queue,
415 notifier,
416 locally_enabled,
417 }),
418 }
419 }
420
421 pub async fn send_raw(
436 &self,
437 content: Raw<AnyMessageLikeEventContent>,
438 event_type: String,
439 ) -> Result<SendHandle, RoomSendQueueError> {
440 let Some(room) = self.inner.room.get() else {
441 return Err(RoomSendQueueError::RoomDisappeared);
442 };
443 if room.state() != RoomState::Joined {
444 return Err(RoomSendQueueError::RoomNotJoined);
445 }
446
447 let content = SerializableEventContent::from_raw(content, event_type);
448
449 let created_at = MilliSecondsSinceUnixEpoch::now();
450 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
451 trace!(%transaction_id, "manager sends a raw event to the background task");
452
453 self.inner.notifier.notify_one();
454
455 let send_handle = SendHandle {
456 room: self.clone(),
457 transaction_id: transaction_id.clone(),
458 media_handles: None,
459 created_at,
460 };
461
462 let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
463 transaction_id,
464 content: LocalEchoContent::Event {
465 serialized_event: content,
466 send_handle: send_handle.clone(),
467 send_error: None,
468 },
469 }));
470
471 Ok(send_handle)
472 }
473
474 pub async fn send(
489 &self,
490 content: AnyMessageLikeEventContent,
491 ) -> Result<SendHandle, RoomSendQueueError> {
492 self.send_raw(
493 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
494 content.event_type().to_string(),
495 )
496 .await
497 }
498
499 pub async fn subscribe(
502 &self,
503 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
504 {
505 let local_echoes = self.inner.queue.local_echoes(self).await?;
506
507 Ok((local_echoes, self.inner.updates.subscribe()))
508 }
509
510 #[instrument(skip_all, fields(room_id = %room.room_id()))]
516 async fn sending_task(
517 room: WeakRoom,
518 queue: QueueStorage,
519 notifier: Arc<Notify>,
520 updates: broadcast::Sender<RoomSendQueueUpdate>,
521 locally_enabled: Arc<AtomicBool>,
522 global_error_reporter: broadcast::Sender<SendQueueRoomError>,
523 is_dropping: Arc<AtomicBool>,
524 ) {
525 trace!("spawned the sending task");
526
527 loop {
528 if is_dropping.load(Ordering::SeqCst) {
530 trace!("shutting down!");
531 break;
532 }
533
534 let mut new_updates = Vec::new();
537 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
538 warn!("errors when applying dependent requests: {err}");
539 }
540
541 for up in new_updates {
542 let _ = updates.send(up);
543 }
544
545 if !locally_enabled.load(Ordering::SeqCst) {
546 trace!("not enabled, sleeping");
547 notifier.notified().await;
549 continue;
550 }
551
552 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
553 Ok(Some(request)) => request,
554
555 Ok(None) => {
556 trace!("queue is empty, sleeping");
557 notifier.notified().await;
559 continue;
560 }
561
562 Err(err) => {
563 warn!("error when loading next request to send: {err}");
564 continue;
565 }
566 };
567
568 let txn_id = queued_request.transaction_id.clone();
569 trace!(txn_id = %txn_id, "received a request to send!");
570
571 let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::MediaUpload { related_to, .. } => related_to.clone());
572
573 let Some(room) = room.get() else {
574 if is_dropping.load(Ordering::SeqCst) {
575 break;
576 }
577 error!("the weak room couldn't be upgraded but we're not shutting down?");
578 continue;
579 };
580
581 match Self::handle_request(&room, queued_request, cancel_upload_rx).await {
582 Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
583 {
584 Ok(()) => match parent_key {
585 SentRequestKey::Event(event_id) => {
586 let _ = updates.send(RoomSendQueueUpdate::SentEvent {
587 transaction_id: txn_id,
588 event_id,
589 });
590 }
591
592 SentRequestKey::Media(media_info) => {
593 let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
594 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
595 file: media_info.file,
596 });
597 }
598 },
599
600 Err(err) => {
601 warn!("unable to mark queued request as sent: {err}");
602 }
603 },
604
605 Ok(None) => {
606 debug!("Request has been aborted while running, continuing.");
607 }
608
609 Err(err) => {
610 let is_recoverable = match err {
611 crate::Error::Http(ref http_err) => {
612 matches!(
614 http_err.retry_kind(),
615 RetryKind::Transient { .. } | RetryKind::NetworkFailure
616 )
617 }
618
619 crate::Error::ConcurrentRequestFailed => true,
624
625 _ => false,
627 };
628
629 locally_enabled.store(false, Ordering::SeqCst);
631
632 if is_recoverable {
633 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
634
635 queue.mark_as_not_being_sent(&txn_id).await;
638
639 } else {
645 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
646
647 if let Err(storage_error) =
649 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
650 {
651 warn!("unable to mark request as wedged: {storage_error}");
652 }
653 }
654
655 let error = Arc::new(err);
656
657 let _ = global_error_reporter.send(SendQueueRoomError {
658 room_id: room.room_id().to_owned(),
659 error: error.clone(),
660 is_recoverable,
661 });
662
663 let _ = updates.send(RoomSendQueueUpdate::SendError {
664 transaction_id: related_txn_id.unwrap_or(txn_id),
665 error,
666 is_recoverable,
667 });
668 }
669 }
670 }
671
672 info!("exited sending task");
673 }
674
675 async fn handle_request(
679 room: &Room,
680 request: QueuedRequest,
681 cancel_upload_rx: Option<oneshot::Receiver<()>>,
682 ) -> Result<Option<SentRequestKey>, crate::Error> {
683 match request.kind {
684 QueuedRequestKind::Event { content } => {
685 let (event, event_type) = content.raw();
686
687 let res = room
688 .send_raw(event_type, event)
689 .with_transaction_id(&request.transaction_id)
690 .with_request_config(RequestConfig::short_retry())
691 .await?;
692
693 trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
694 Ok(Some(SentRequestKey::Event(res.event_id)))
695 }
696
697 QueuedRequestKind::MediaUpload {
698 content_type,
699 cache_key,
700 thumbnail_source,
701 related_to: relates_to,
702 } => {
703 trace!(%relates_to, "uploading media related to event");
704
705 let fut = async move {
706 let mime = Mime::from_str(&content_type).map_err(|_| {
707 crate::Error::SendQueueWedgeError(QueueWedgeError::InvalidMimeType {
708 mime_type: content_type.clone(),
709 })
710 })?;
711
712 let data = room
713 .client()
714 .event_cache_store()
715 .lock()
716 .await?
717 .get_media_content(&cache_key)
718 .await?
719 .ok_or(crate::Error::SendQueueWedgeError(
720 QueueWedgeError::MissingMediaContent,
721 ))?;
722
723 #[cfg(feature = "e2e-encryption")]
724 let media_source = if room.latest_encryption_state().await?.is_encrypted() {
725 trace!("upload will be encrypted (encrypted room)");
726 let mut cursor = std::io::Cursor::new(data);
727 let encrypted_file = room
728 .client()
729 .upload_encrypted_file(&mime, &mut cursor)
730 .with_request_config(RequestConfig::short_retry())
731 .await?;
732 MediaSource::Encrypted(Box::new(encrypted_file))
733 } else {
734 trace!("upload will be in clear text (room without encryption)");
735 let request_config = RequestConfig::short_retry()
736 .timeout(Media::reasonable_upload_timeout(&data));
737 let res =
738 room.client().media().upload(&mime, data, Some(request_config)).await?;
739 MediaSource::Plain(res.content_uri)
740 };
741
742 #[cfg(not(feature = "e2e-encryption"))]
743 let media_source = {
744 let request_config = RequestConfig::short_retry()
745 .timeout(Media::reasonable_upload_timeout(&data));
746 let res =
747 room.client().media().upload(&mime, data, Some(request_config)).await?;
748 MediaSource::Plain(res.content_uri)
749 };
750
751 let uri = match &media_source {
752 MediaSource::Plain(uri) => uri,
753 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
754 };
755 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
756
757 Ok(SentRequestKey::Media(SentMediaInfo {
758 file: media_source,
759 thumbnail: thumbnail_source,
760 }))
761 };
762
763 let wait_for_cancel = async move {
764 if let Some(rx) = cancel_upload_rx {
765 rx.await
766 } else {
767 std::future::pending().await
768 }
769 };
770
771 tokio::select! {
772 biased;
773
774 _ = wait_for_cancel => {
775 Ok(None)
776 }
777
778 res = fut => {
779 res.map(Some)
780 }
781 }
782 }
783 }
784 }
785
786 pub fn is_enabled(&self) -> bool {
788 self.inner.locally_enabled.load(Ordering::SeqCst)
789 }
790
791 pub fn set_enabled(&self, enabled: bool) {
793 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
794
795 if enabled {
798 self.inner.notifier.notify_one();
799 }
800 }
801}
802
803impl From<&crate::Error> for QueueWedgeError {
804 fn from(value: &crate::Error) -> Self {
805 match value {
806 #[cfg(feature = "e2e-encryption")]
807 crate::Error::OlmError(OlmError::SessionRecipientCollectionError(error)) => match error
808 {
809 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
810 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
811 }
812
813 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
814 QueueWedgeError::IdentityViolations { users: users.clone() }
815 }
816
817 SessionRecipientCollectionError::CrossSigningNotSetup
818 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
819 QueueWedgeError::CrossVerificationRequired
820 }
821 },
822
823 crate::Error::SendQueueWedgeError(error) => error.clone(),
825
826 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
827 }
828 }
829}
830
831struct RoomSendQueueInner {
832 room: WeakRoom,
834
835 updates: broadcast::Sender<RoomSendQueueUpdate>,
839
840 queue: QueueStorage,
847
848 notifier: Arc<Notify>,
851
852 locally_enabled: Arc<AtomicBool>,
855
856 _task: JoinHandle<()>,
859}
860
861struct BeingSentInfo {
863 transaction_id: OwnedTransactionId,
865
866 cancel_upload: Option<oneshot::Sender<()>>,
869}
870
871impl BeingSentInfo {
872 fn cancel_upload(self) -> bool {
877 if let Some(cancel_upload) = self.cancel_upload {
878 let _ = cancel_upload.send(());
879 true
880 } else {
881 false
882 }
883 }
884}
885
886#[derive(Clone)]
889struct StoreLock {
890 client: WeakClient,
892
893 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
898}
899
900impl StoreLock {
901 async fn lock(&self) -> StoreLockGuard {
903 StoreLockGuard {
904 client: self.client.clone(),
905 being_sent: self.being_sent.clone().lock_owned().await,
906 }
907 }
908}
909
910struct StoreLockGuard {
913 client: WeakClient,
915
916 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
919}
920
921impl StoreLockGuard {
922 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
924 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
925 }
926}
927
928#[derive(Clone)]
929struct QueueStorage {
930 store: StoreLock,
933
934 room_id: OwnedRoomId,
936}
937
938impl QueueStorage {
939 const LOW_PRIORITY: usize = 0;
941
942 const HIGH_PRIORITY: usize = 10;
944
945 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
947 Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } }
948 }
949
950 async fn push(
954 &self,
955 request: QueuedRequestKind,
956 created_at: MilliSecondsSinceUnixEpoch,
957 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
958 let transaction_id = TransactionId::new();
959
960 self.store
961 .lock()
962 .await
963 .client()?
964 .state_store()
965 .save_send_queue_request(
966 &self.room_id,
967 transaction_id.clone(),
968 created_at,
969 request,
970 Self::LOW_PRIORITY,
971 )
972 .await?;
973
974 Ok(transaction_id)
975 }
976
977 async fn peek_next_to_send(
982 &self,
983 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
984 {
985 let mut guard = self.store.lock().await;
986 let queued_requests =
987 guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
988
989 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
990 let (cancel_upload_tx, cancel_upload_rx) =
991 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
992 let (tx, rx) = oneshot::channel();
993 (Some(tx), Some(rx))
994 } else {
995 Default::default()
996 };
997
998 let prev = guard.being_sent.replace(BeingSentInfo {
999 transaction_id: request.transaction_id.clone(),
1000 cancel_upload: cancel_upload_tx,
1001 });
1002
1003 if let Some(prev) = prev {
1004 error!(
1005 prev_txn = ?prev.transaction_id,
1006 "a previous request was still active while picking a new one"
1007 );
1008 }
1009
1010 Ok(Some((request.clone(), cancel_upload_rx)))
1011 } else {
1012 Ok(None)
1013 }
1014 }
1015
1016 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1020 let was_being_sent = self.store.lock().await.being_sent.take();
1021
1022 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1023 if prev_txn != Some(transaction_id) {
1024 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1025 }
1026 }
1027
1028 async fn mark_as_wedged(
1032 &self,
1033 transaction_id: &TransactionId,
1034 reason: QueueWedgeError,
1035 ) -> Result<(), RoomSendQueueStorageError> {
1036 let mut guard = self.store.lock().await;
1038 let was_being_sent = guard.being_sent.take();
1039
1040 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1041 if prev_txn != Some(transaction_id) {
1042 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after permanent error)");
1043 }
1044
1045 Ok(guard
1046 .client()?
1047 .state_store()
1048 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1049 .await?)
1050 }
1051
1052 async fn mark_as_unwedged(
1055 &self,
1056 transaction_id: &TransactionId,
1057 ) -> Result<(), RoomSendQueueStorageError> {
1058 Ok(self
1059 .store
1060 .lock()
1061 .await
1062 .client()?
1063 .state_store()
1064 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1065 .await?)
1066 }
1067
1068 async fn mark_as_sent(
1071 &self,
1072 transaction_id: &TransactionId,
1073 parent_key: SentRequestKey,
1074 ) -> Result<(), RoomSendQueueStorageError> {
1075 let mut guard = self.store.lock().await;
1077 let was_being_sent = guard.being_sent.take();
1078
1079 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1080 if prev_txn != Some(transaction_id) {
1081 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after successful send");
1082 }
1083
1084 let client = guard.client()?;
1085 let store = client.state_store();
1086
1087 store
1089 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1090 .await?;
1091
1092 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1093
1094 if !removed {
1095 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1096 }
1097
1098 Ok(())
1099 }
1100
1101 async fn cancel_event(
1108 &self,
1109 transaction_id: &TransactionId,
1110 ) -> Result<bool, RoomSendQueueStorageError> {
1111 let guard = self.store.lock().await;
1112
1113 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1114 == Some(transaction_id)
1115 {
1116 guard
1118 .client()?
1119 .state_store()
1120 .save_dependent_queued_request(
1121 &self.room_id,
1122 transaction_id,
1123 ChildTransactionId::new(),
1124 MilliSecondsSinceUnixEpoch::now(),
1125 DependentQueuedRequestKind::RedactEvent,
1126 )
1127 .await?;
1128
1129 return Ok(true);
1130 }
1131
1132 let removed = guard
1133 .client()?
1134 .state_store()
1135 .remove_send_queue_request(&self.room_id, transaction_id)
1136 .await?;
1137
1138 Ok(removed)
1139 }
1140
1141 async fn replace_event(
1148 &self,
1149 transaction_id: &TransactionId,
1150 serializable: SerializableEventContent,
1151 ) -> Result<bool, RoomSendQueueStorageError> {
1152 let guard = self.store.lock().await;
1153
1154 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1155 == Some(transaction_id)
1156 {
1157 guard
1159 .client()?
1160 .state_store()
1161 .save_dependent_queued_request(
1162 &self.room_id,
1163 transaction_id,
1164 ChildTransactionId::new(),
1165 MilliSecondsSinceUnixEpoch::now(),
1166 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1167 )
1168 .await?;
1169
1170 return Ok(true);
1171 }
1172
1173 let edited = guard
1174 .client()?
1175 .state_store()
1176 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1177 .await?;
1178
1179 Ok(edited)
1180 }
1181
1182 #[allow(clippy::too_many_arguments)]
1186 async fn push_media(
1187 &self,
1188 event: RoomMessageEventContent,
1189 content_type: Mime,
1190 send_event_txn: OwnedTransactionId,
1191 created_at: MilliSecondsSinceUnixEpoch,
1192 upload_file_txn: OwnedTransactionId,
1193 file_media_request: MediaRequestParameters,
1194 thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1195 ) -> Result<(), RoomSendQueueStorageError> {
1196 let guard = self.store.lock().await;
1197 let client = guard.client()?;
1198 let store = client.state_store();
1199 let thumbnail_info =
1200 if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) =
1201 thumbnail
1202 {
1203 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1204
1205 store
1207 .save_send_queue_request(
1208 &self.room_id,
1209 upload_thumbnail_txn.clone(),
1210 created_at,
1211 QueuedRequestKind::MediaUpload {
1212 content_type: thumbnail_content_type.to_string(),
1213 cache_key: thumbnail_media_request,
1214 thumbnail_source: None, related_to: send_event_txn.clone(),
1216 },
1217 Self::LOW_PRIORITY,
1218 )
1219 .await?;
1220
1221 store
1223 .save_dependent_queued_request(
1224 &self.room_id,
1225 &upload_thumbnail_txn,
1226 upload_file_txn.clone().into(),
1227 created_at,
1228 DependentQueuedRequestKind::UploadFileWithThumbnail {
1229 content_type: content_type.to_string(),
1230 cache_key: file_media_request,
1231 related_to: send_event_txn.clone(),
1232 },
1233 )
1234 .await?;
1235
1236 Some(thumbnail_info)
1237 } else {
1238 store
1240 .save_send_queue_request(
1241 &self.room_id,
1242 upload_file_txn.clone(),
1243 created_at,
1244 QueuedRequestKind::MediaUpload {
1245 content_type: content_type.to_string(),
1246 cache_key: file_media_request,
1247 thumbnail_source: None,
1248 related_to: send_event_txn.clone(),
1249 },
1250 Self::LOW_PRIORITY,
1251 )
1252 .await?;
1253
1254 None
1255 };
1256
1257 store
1259 .save_dependent_queued_request(
1260 &self.room_id,
1261 &upload_file_txn,
1262 send_event_txn.into(),
1263 created_at,
1264 DependentQueuedRequestKind::FinishUpload {
1265 local_echo: event,
1266 file_upload: upload_file_txn.clone(),
1267 thumbnail_info,
1268 },
1269 )
1270 .await?;
1271
1272 Ok(())
1273 }
1274
1275 #[instrument(skip(self))]
1277 async fn react(
1278 &self,
1279 transaction_id: &TransactionId,
1280 key: String,
1281 created_at: MilliSecondsSinceUnixEpoch,
1282 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1283 let guard = self.store.lock().await;
1284 let client = guard.client()?;
1285 let store = client.state_store();
1286
1287 let requests = store.load_send_queue_requests(&self.room_id).await?;
1288
1289 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1291 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1294 if !dependent_requests
1295 .into_iter()
1296 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1297 .any(|child_txn| *child_txn == *transaction_id)
1298 {
1299 return Ok(None);
1301 }
1302 }
1303
1304 let reaction_txn_id = ChildTransactionId::new();
1306 store
1307 .save_dependent_queued_request(
1308 &self.room_id,
1309 transaction_id,
1310 reaction_txn_id.clone(),
1311 created_at,
1312 DependentQueuedRequestKind::ReactEvent { key },
1313 )
1314 .await?;
1315
1316 Ok(Some(reaction_txn_id))
1317 }
1318
1319 async fn local_echoes(
1322 &self,
1323 room: &RoomSendQueue,
1324 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1325 let guard = self.store.lock().await;
1326 let client = guard.client()?;
1327 let store = client.state_store();
1328
1329 let local_requests =
1330 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1331 Some(LocalEcho {
1332 transaction_id: queued.transaction_id.clone(),
1333 content: match queued.kind {
1334 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1335 serialized_event: content,
1336 send_handle: SendHandle {
1337 room: room.clone(),
1338 transaction_id: queued.transaction_id,
1339 media_handles: None,
1340 created_at: queued.created_at,
1341 },
1342 send_error: queued.error,
1343 },
1344
1345 QueuedRequestKind::MediaUpload { .. } => {
1346 return None;
1349 }
1350 },
1351 })
1352 });
1353
1354 let reactions_and_medias = store
1355 .load_dependent_queued_requests(&self.room_id)
1356 .await?
1357 .into_iter()
1358 .filter_map(|dep| match dep.kind {
1359 DependentQueuedRequestKind::EditEvent { .. }
1360 | DependentQueuedRequestKind::RedactEvent => {
1361 None
1363 }
1364
1365 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1366 transaction_id: dep.own_transaction_id.clone().into(),
1367 content: LocalEchoContent::React {
1368 key,
1369 send_handle: SendReactionHandle {
1370 room: room.clone(),
1371 transaction_id: dep.own_transaction_id,
1372 },
1373 applies_to: dep.parent_transaction_id,
1374 },
1375 }),
1376
1377 DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => {
1378 None
1380 }
1381
1382 DependentQueuedRequestKind::FinishUpload {
1383 local_echo,
1384 file_upload,
1385 thumbnail_info,
1386 } => {
1387 Some(LocalEcho {
1389 transaction_id: dep.own_transaction_id.clone().into(),
1390 content: LocalEchoContent::Event {
1391 serialized_event: SerializableEventContent::new(&local_echo.into())
1392 .ok()?,
1393 send_handle: SendHandle {
1394 room: room.clone(),
1395 transaction_id: dep.own_transaction_id.into(),
1396 media_handles: Some(MediaHandles {
1397 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1398 upload_file_txn: file_upload,
1399 }),
1400 created_at: dep.created_at,
1401 },
1402 send_error: None,
1403 },
1404 })
1405 }
1406 });
1407
1408 Ok(local_requests.chain(reactions_and_medias).collect())
1409 }
1410
1411 #[instrument(skip_all)]
1419 async fn try_apply_single_dependent_request(
1420 &self,
1421 client: &Client,
1422 dependent_request: DependentQueuedRequest,
1423 new_updates: &mut Vec<RoomSendQueueUpdate>,
1424 ) -> Result<bool, RoomSendQueueError> {
1425 let store = client.state_store();
1426
1427 let parent_key = dependent_request.parent_key;
1428
1429 match dependent_request.kind {
1430 DependentQueuedRequestKind::EditEvent { new_content } => {
1431 if let Some(parent_key) = parent_key {
1432 let Some(event_id) = parent_key.into_event_id() else {
1433 return Err(RoomSendQueueError::StorageError(
1434 RoomSendQueueStorageError::InvalidParentKey,
1435 ));
1436 };
1437
1438 let room = client
1440 .get_room(&self.room_id)
1441 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1442
1443 let edited_content = match new_content.deserialize() {
1447 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1448 EditedContent::RoomMessage(c.into())
1450 }
1451
1452 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1453 let poll_start = c.poll_start().clone();
1454 EditedContent::PollStart {
1455 fallback_text: poll_start.question.text.clone(),
1456 new_content: poll_start,
1457 }
1458 }
1459
1460 Ok(c) => {
1461 warn!("Unsupported edit content type: {:?}", c.event_type());
1462 return Ok(true);
1463 }
1464
1465 Err(err) => {
1466 warn!("Unable to deserialize: {err}");
1467 return Ok(true);
1468 }
1469 };
1470
1471 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1472 Ok(e) => e,
1473 Err(err) => {
1474 warn!("couldn't create edited event: {err}");
1475 return Ok(true);
1476 }
1477 };
1478
1479 let serializable = SerializableEventContent::from_raw(
1481 Raw::new(&edit_event)
1482 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1483 edit_event.event_type().to_string(),
1484 );
1485
1486 store
1487 .save_send_queue_request(
1488 &self.room_id,
1489 dependent_request.own_transaction_id.into(),
1490 dependent_request.created_at,
1491 serializable.into(),
1492 Self::HIGH_PRIORITY,
1493 )
1494 .await
1495 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1496 } else {
1497 let edited = store
1499 .update_send_queue_request(
1500 &self.room_id,
1501 &dependent_request.parent_transaction_id,
1502 new_content.into(),
1503 )
1504 .await
1505 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1506
1507 if !edited {
1508 warn!("missing local echo upon dependent edit");
1509 }
1510 }
1511 }
1512
1513 DependentQueuedRequestKind::RedactEvent => {
1514 if let Some(parent_key) = parent_key {
1515 let Some(event_id) = parent_key.into_event_id() else {
1516 return Err(RoomSendQueueError::StorageError(
1517 RoomSendQueueStorageError::InvalidParentKey,
1518 ));
1519 };
1520
1521 let room = client
1523 .get_room(&self.room_id)
1524 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1525
1526 if let Err(err) = room
1534 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1535 .await
1536 {
1537 warn!("error when sending a redact for {event_id}: {err}");
1538 return Ok(false);
1539 }
1540 } else {
1541 let removed = store
1544 .remove_send_queue_request(
1545 &self.room_id,
1546 &dependent_request.parent_transaction_id,
1547 )
1548 .await
1549 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1550
1551 if !removed {
1552 warn!("missing local echo upon dependent redact");
1553 }
1554 }
1555 }
1556
1557 DependentQueuedRequestKind::ReactEvent { key } => {
1558 if let Some(parent_key) = parent_key {
1559 let Some(parent_event_id) = parent_key.into_event_id() else {
1560 return Err(RoomSendQueueError::StorageError(
1561 RoomSendQueueStorageError::InvalidParentKey,
1562 ));
1563 };
1564
1565 let react_event =
1567 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1568 let serializable = SerializableEventContent::from_raw(
1569 Raw::new(&react_event)
1570 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1571 react_event.event_type().to_string(),
1572 );
1573
1574 store
1575 .save_send_queue_request(
1576 &self.room_id,
1577 dependent_request.own_transaction_id.into(),
1578 dependent_request.created_at,
1579 serializable.into(),
1580 Self::HIGH_PRIORITY,
1581 )
1582 .await
1583 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1584 } else {
1585 return Ok(false);
1587 }
1588 }
1589
1590 DependentQueuedRequestKind::UploadFileWithThumbnail {
1591 content_type,
1592 cache_key,
1593 related_to,
1594 } => {
1595 let Some(parent_key) = parent_key else {
1596 return Ok(false);
1598 };
1599 self.handle_dependent_file_upload_with_thumbnail(
1600 client,
1601 dependent_request.own_transaction_id.into(),
1602 parent_key,
1603 content_type,
1604 cache_key,
1605 related_to,
1606 )
1607 .await?;
1608 }
1609
1610 DependentQueuedRequestKind::FinishUpload {
1611 local_echo,
1612 file_upload,
1613 thumbnail_info,
1614 } => {
1615 let Some(parent_key) = parent_key else {
1616 return Ok(false);
1618 };
1619 self.handle_dependent_finish_upload(
1620 client,
1621 dependent_request.own_transaction_id.into(),
1622 parent_key,
1623 local_echo,
1624 file_upload,
1625 thumbnail_info,
1626 new_updates,
1627 )
1628 .await?;
1629 }
1630 }
1631
1632 Ok(true)
1633 }
1634
1635 #[instrument(skip(self))]
1636 async fn apply_dependent_requests(
1637 &self,
1638 new_updates: &mut Vec<RoomSendQueueUpdate>,
1639 ) -> Result<(), RoomSendQueueError> {
1640 let guard = self.store.lock().await;
1641
1642 let client = guard.client()?;
1643 let store = client.state_store();
1644
1645 let dependent_requests = store
1646 .load_dependent_queued_requests(&self.room_id)
1647 .await
1648 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1649
1650 let num_initial_dependent_requests = dependent_requests.len();
1651 if num_initial_dependent_requests == 0 {
1652 return Ok(());
1654 }
1655
1656 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
1657
1658 for original in &dependent_requests {
1660 if !canonicalized_dependent_requests
1661 .iter()
1662 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
1663 {
1664 store
1665 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
1666 .await
1667 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1668 }
1669 }
1670
1671 let mut num_dependent_requests = canonicalized_dependent_requests.len();
1672
1673 debug!(
1674 num_dependent_requests,
1675 num_initial_dependent_requests, "starting handling of dependent requests"
1676 );
1677
1678 for dependent in canonicalized_dependent_requests {
1679 let dependent_id = dependent.own_transaction_id.clone();
1680
1681 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
1682 Ok(should_remove) => {
1683 if should_remove {
1684 store
1686 .remove_dependent_queued_request(&self.room_id, &dependent_id)
1687 .await
1688 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1689
1690 num_dependent_requests -= 1;
1691 }
1692 }
1693
1694 Err(err) => {
1695 warn!("error when applying single dependent request: {err}");
1696 }
1697 }
1698 }
1699
1700 debug!(
1701 leftover_dependent_requests = num_dependent_requests,
1702 "stopped handling dependent request"
1703 );
1704
1705 Ok(())
1706 }
1707
1708 async fn remove_dependent_send_queue_request(
1710 &self,
1711 dependent_event_id: &ChildTransactionId,
1712 ) -> Result<bool, RoomSendQueueStorageError> {
1713 Ok(self
1714 .store
1715 .lock()
1716 .await
1717 .client()?
1718 .state_store()
1719 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
1720 .await?)
1721 }
1722}
1723
1724#[derive(Clone, Debug)]
1726pub enum LocalEchoContent {
1727 Event {
1729 serialized_event: SerializableEventContent,
1732 send_handle: SendHandle,
1734 send_error: Option<QueueWedgeError>,
1737 },
1738
1739 React {
1741 key: String,
1743 send_handle: SendReactionHandle,
1745 applies_to: OwnedTransactionId,
1747 },
1748}
1749
1750#[derive(Clone, Debug)]
1753pub struct LocalEcho {
1754 pub transaction_id: OwnedTransactionId,
1756 pub content: LocalEchoContent,
1758}
1759
1760#[derive(Clone, Debug)]
1763pub enum RoomSendQueueUpdate {
1764 NewLocalEvent(LocalEcho),
1769
1770 CancelledLocalEvent {
1773 transaction_id: OwnedTransactionId,
1775 },
1776
1777 ReplacedLocalEvent {
1779 transaction_id: OwnedTransactionId,
1781
1782 new_content: SerializableEventContent,
1784 },
1785
1786 SendError {
1791 transaction_id: OwnedTransactionId,
1793 error: Arc<crate::Error>,
1795 is_recoverable: bool,
1801 },
1802
1803 RetryEvent {
1805 transaction_id: OwnedTransactionId,
1807 },
1808
1809 SentEvent {
1812 transaction_id: OwnedTransactionId,
1814 event_id: OwnedEventId,
1816 },
1817
1818 UploadedMedia {
1820 related_to: OwnedTransactionId,
1822
1823 file: MediaSource,
1825 },
1826}
1827
1828#[derive(Debug, thiserror::Error)]
1830pub enum RoomSendQueueError {
1831 #[error("the room isn't in the joined state")]
1833 RoomNotJoined,
1834
1835 #[error("the room is now missing from the client")]
1839 RoomDisappeared,
1840
1841 #[error(transparent)]
1843 StorageError(#[from] RoomSendQueueStorageError),
1844}
1845
1846#[derive(Debug, thiserror::Error)]
1848pub enum RoomSendQueueStorageError {
1849 #[error(transparent)]
1851 StateStoreError(#[from] StoreError),
1852
1853 #[error(transparent)]
1855 EventCacheStoreError(#[from] EventCacheStoreError),
1856
1857 #[error(transparent)]
1859 LockError(#[from] LockStoreError),
1860
1861 #[error(transparent)]
1863 JsonSerialization(#[from] serde_json::Error),
1864
1865 #[error("a dependent event had an invalid parent key type")]
1868 InvalidParentKey,
1869
1870 #[error("The client is shutting down.")]
1872 ClientShuttingDown,
1873
1874 #[error("This operation is not implemented for media uploads")]
1876 OperationNotImplementedYet,
1877
1878 #[error("Can't edit a media caption when the underlying event isn't a media")]
1880 InvalidMediaCaptionEdit,
1881}
1882
1883#[derive(Clone, Debug)]
1885struct MediaHandles {
1886 upload_thumbnail_txn: Option<OwnedTransactionId>,
1890
1891 upload_file_txn: OwnedTransactionId,
1893}
1894
1895#[derive(Clone, Debug)]
1897pub struct SendHandle {
1898 room: RoomSendQueue,
1900
1901 transaction_id: OwnedTransactionId,
1906
1907 media_handles: Option<MediaHandles>,
1909
1910 pub created_at: MilliSecondsSinceUnixEpoch,
1912}
1913
1914impl SendHandle {
1915 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
1916 if self.media_handles.is_some() {
1917 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
1918 } else {
1919 Ok(())
1920 }
1921 }
1922
1923 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1928 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
1929 trace!("received an abort request");
1930
1931 let queue = &self.room.inner.queue;
1932
1933 if let Some(handles) = &self.media_handles {
1934 if queue.abort_upload(&self.transaction_id, handles).await? {
1935 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1937 transaction_id: self.transaction_id.clone(),
1938 });
1939
1940 return Ok(true);
1941 }
1942
1943 }
1947
1948 if queue.cancel_event(&self.transaction_id).await? {
1949 trace!("successful abort");
1950
1951 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1953 transaction_id: self.transaction_id.clone(),
1954 });
1955
1956 Ok(true)
1957 } else {
1958 debug!("local echo didn't exist anymore, can't abort");
1959 Ok(false)
1960 }
1961 }
1962
1963 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1968 pub async fn edit_raw(
1969 &self,
1970 new_content: Raw<AnyMessageLikeEventContent>,
1971 event_type: String,
1972 ) -> Result<bool, RoomSendQueueStorageError> {
1973 trace!("received an edit request");
1974 self.nyi_for_uploads()?;
1975
1976 let serializable = SerializableEventContent::from_raw(new_content, event_type);
1977
1978 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
1979 trace!("successful edit");
1980
1981 self.room.inner.notifier.notify_one();
1983
1984 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
1986 transaction_id: self.transaction_id.clone(),
1987 new_content: serializable,
1988 });
1989
1990 Ok(true)
1991 } else {
1992 debug!("local echo doesn't exist anymore, can't edit");
1993 Ok(false)
1994 }
1995 }
1996
1997 pub async fn edit(
2002 &self,
2003 new_content: AnyMessageLikeEventContent,
2004 ) -> Result<bool, RoomSendQueueStorageError> {
2005 self.edit_raw(
2006 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2007 new_content.event_type().to_string(),
2008 )
2009 .await
2010 }
2011
2012 pub async fn edit_media_caption(
2017 &self,
2018 caption: Option<String>,
2019 formatted_caption: Option<FormattedBody>,
2020 mentions: Option<Mentions>,
2021 ) -> Result<bool, RoomSendQueueStorageError> {
2022 if let Some(new_content) = self
2023 .room
2024 .inner
2025 .queue
2026 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2027 .await?
2028 {
2029 trace!("successful edit of media caption");
2030
2031 self.room.inner.notifier.notify_one();
2033
2034 let new_content = SerializableEventContent::new(&new_content)
2035 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2036
2037 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2039 transaction_id: self.transaction_id.clone(),
2040 new_content,
2041 });
2042
2043 Ok(true)
2044 } else {
2045 debug!("local echo doesn't exist anymore, can't edit media caption");
2046 Ok(false)
2047 }
2048 }
2049
2050 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2053 let room = &self.room.inner;
2054 room.queue
2055 .mark_as_unwedged(&self.transaction_id)
2056 .await
2057 .map_err(RoomSendQueueError::StorageError)?;
2058
2059 if let Some(handles) = &self.media_handles {
2067 room.queue
2068 .mark_as_unwedged(&handles.upload_file_txn)
2069 .await
2070 .map_err(RoomSendQueueError::StorageError)?;
2071
2072 if let Some(txn) = &handles.upload_thumbnail_txn {
2073 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2074 }
2075 }
2076
2077 room.notifier.notify_one();
2079
2080 let _ = room
2081 .updates
2082 .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() });
2083
2084 Ok(())
2085 }
2086
2087 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2092 pub async fn react(
2093 &self,
2094 key: String,
2095 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2096 trace!("received an intent to react");
2097
2098 let created_at = MilliSecondsSinceUnixEpoch::now();
2099 if let Some(reaction_txn_id) =
2100 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2101 {
2102 trace!("successfully queued react");
2103
2104 self.room.inner.notifier.notify_one();
2106
2107 let send_handle = SendReactionHandle {
2109 room: self.room.clone(),
2110 transaction_id: reaction_txn_id.clone(),
2111 };
2112
2113 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2114 transaction_id: reaction_txn_id.into(),
2117 content: LocalEchoContent::React {
2118 key,
2119 send_handle: send_handle.clone(),
2120 applies_to: self.transaction_id.clone(),
2121 },
2122 }));
2123
2124 Ok(Some(send_handle))
2125 } else {
2126 debug!("local echo doesn't exist anymore, can't react");
2127 Ok(None)
2128 }
2129 }
2130}
2131
2132#[derive(Clone, Debug)]
2134pub struct SendReactionHandle {
2135 room: RoomSendQueue,
2137 transaction_id: ChildTransactionId,
2139}
2140
2141impl SendReactionHandle {
2142 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2147 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2148 let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2152 transaction_id: self.transaction_id.clone().into(),
2153 });
2154
2155 return Ok(true);
2156 }
2157
2158 let handle = SendHandle {
2161 room: self.room.clone(),
2162 transaction_id: self.transaction_id.clone().into(),
2163 media_handles: None,
2164 created_at: MilliSecondsSinceUnixEpoch::now(),
2165 };
2166
2167 handle.abort().await
2168 }
2169
2170 pub fn transaction_id(&self) -> &TransactionId {
2172 &self.transaction_id
2173 }
2174}
2175
2176fn canonicalize_dependent_requests(
2180 dependent: &[DependentQueuedRequest],
2181) -> Vec<DependentQueuedRequest> {
2182 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2183
2184 for d in dependent {
2185 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2186
2187 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2188 continue;
2191 }
2192
2193 match &d.kind {
2194 DependentQueuedRequestKind::EditEvent { .. } => {
2195 if let Some(prev_edit) = prevs
2197 .iter_mut()
2198 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2199 {
2200 *prev_edit = d;
2201 } else {
2202 prevs.insert(0, d);
2203 }
2204 }
2205
2206 DependentQueuedRequestKind::UploadFileWithThumbnail { .. }
2207 | DependentQueuedRequestKind::FinishUpload { .. }
2208 | DependentQueuedRequestKind::ReactEvent { .. } => {
2209 prevs.push(d);
2211 }
2212
2213 DependentQueuedRequestKind::RedactEvent => {
2214 prevs.clear();
2216 prevs.push(d);
2217 }
2218 }
2219 }
2220
2221 by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2222}
2223
2224#[cfg(all(test, not(target_arch = "wasm32")))]
2225mod tests {
2226 use std::{sync::Arc, time::Duration};
2227
2228 use assert_matches2::{assert_let, assert_matches};
2229 use matrix_sdk_base::store::{
2230 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2231 SerializableEventContent,
2232 };
2233 use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2234 use ruma::{
2235 events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2236 room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2237 };
2238
2239 use super::canonicalize_dependent_requests;
2240 use crate::{client::WeakClient, test_utils::logged_in_client};
2241
2242 #[test]
2243 fn test_canonicalize_dependent_events_created_at() {
2244 let txn = TransactionId::new();
2247 let created_at = MilliSecondsSinceUnixEpoch::now();
2248
2249 let edit = DependentQueuedRequest {
2250 own_transaction_id: ChildTransactionId::new(),
2251 parent_transaction_id: txn.clone(),
2252 kind: DependentQueuedRequestKind::EditEvent {
2253 new_content: SerializableEventContent::new(
2254 &RoomMessageEventContent::text_plain("edit").into(),
2255 )
2256 .unwrap(),
2257 },
2258 parent_key: None,
2259 created_at,
2260 };
2261
2262 let res = canonicalize_dependent_requests(&[edit]);
2263
2264 assert_eq!(res.len(), 1);
2265 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2266 assert_let!(
2267 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2268 );
2269 assert_eq!(msg.body(), "edit");
2270 assert_eq!(res[0].parent_transaction_id, txn);
2271 assert_eq!(res[0].created_at, created_at);
2272 }
2273
2274 #[async_test]
2275 async fn test_client_no_cycle_with_send_queue() {
2276 for enabled in [true, false] {
2277 let client = logged_in_client(None).await;
2278 let weak_client = WeakClient::from_client(&client);
2279
2280 {
2281 let mut sync_response_builder = SyncResponseBuilder::new();
2282
2283 let room_id = room_id!("!a:b.c");
2284
2285 client
2287 .base_client()
2288 .receive_sync_response(
2289 sync_response_builder
2290 .add_joined_room(JoinedRoomBuilder::new(room_id))
2291 .build_sync_response(),
2292 )
2293 .await
2294 .unwrap();
2295
2296 let room = client.get_room(room_id).unwrap();
2297 let q = room.send_queue();
2298
2299 let _watcher = q.subscribe().await;
2300
2301 client.send_queue().set_enabled(enabled).await;
2302 }
2303
2304 drop(client);
2305
2306 tokio::time::sleep(Duration::from_millis(500)).await;
2308
2309 let client = weak_client.get();
2311 assert!(
2312 client.is_none(),
2313 "too many strong references to the client: {}",
2314 Arc::strong_count(&client.unwrap().inner)
2315 );
2316 }
2317 }
2318
2319 #[test]
2320 fn test_canonicalize_dependent_events_smoke_test() {
2321 let txn = TransactionId::new();
2323
2324 let edit = DependentQueuedRequest {
2325 own_transaction_id: ChildTransactionId::new(),
2326 parent_transaction_id: txn.clone(),
2327 kind: DependentQueuedRequestKind::EditEvent {
2328 new_content: SerializableEventContent::new(
2329 &RoomMessageEventContent::text_plain("edit").into(),
2330 )
2331 .unwrap(),
2332 },
2333 parent_key: None,
2334 created_at: MilliSecondsSinceUnixEpoch::now(),
2335 };
2336 let res = canonicalize_dependent_requests(&[edit]);
2337
2338 assert_eq!(res.len(), 1);
2339 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2340 assert_eq!(res[0].parent_transaction_id, txn);
2341 assert!(res[0].parent_key.is_none());
2342 }
2343
2344 #[test]
2345 fn test_canonicalize_dependent_events_redaction_preferred() {
2346 let txn = TransactionId::new();
2348
2349 let mut inputs = Vec::with_capacity(100);
2350 let redact = DependentQueuedRequest {
2351 own_transaction_id: ChildTransactionId::new(),
2352 parent_transaction_id: txn.clone(),
2353 kind: DependentQueuedRequestKind::RedactEvent,
2354 parent_key: None,
2355 created_at: MilliSecondsSinceUnixEpoch::now(),
2356 };
2357
2358 let edit = DependentQueuedRequest {
2359 own_transaction_id: ChildTransactionId::new(),
2360 parent_transaction_id: txn.clone(),
2361 kind: DependentQueuedRequestKind::EditEvent {
2362 new_content: SerializableEventContent::new(
2363 &RoomMessageEventContent::text_plain("edit").into(),
2364 )
2365 .unwrap(),
2366 },
2367 parent_key: None,
2368 created_at: MilliSecondsSinceUnixEpoch::now(),
2369 };
2370
2371 inputs.push({
2372 let mut edit = edit.clone();
2373 edit.own_transaction_id = ChildTransactionId::new();
2374 edit
2375 });
2376
2377 inputs.push(redact);
2378
2379 for _ in 0..98 {
2380 let mut edit = edit.clone();
2381 edit.own_transaction_id = ChildTransactionId::new();
2382 inputs.push(edit);
2383 }
2384
2385 let res = canonicalize_dependent_requests(&inputs);
2386
2387 assert_eq!(res.len(), 1);
2388 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2389 assert_eq!(res[0].parent_transaction_id, txn);
2390 }
2391
2392 #[test]
2393 fn test_canonicalize_dependent_events_last_edit_preferred() {
2394 let parent_txn = TransactionId::new();
2395
2396 let inputs = (0..10)
2398 .map(|i| DependentQueuedRequest {
2399 own_transaction_id: ChildTransactionId::new(),
2400 parent_transaction_id: parent_txn.clone(),
2401 kind: DependentQueuedRequestKind::EditEvent {
2402 new_content: SerializableEventContent::new(
2403 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2404 )
2405 .unwrap(),
2406 },
2407 parent_key: None,
2408 created_at: MilliSecondsSinceUnixEpoch::now(),
2409 })
2410 .collect::<Vec<_>>();
2411
2412 let txn = inputs[9].parent_transaction_id.clone();
2413
2414 let res = canonicalize_dependent_requests(&inputs);
2415
2416 assert_eq!(res.len(), 1);
2417 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2418 assert_let!(
2419 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2420 );
2421 assert_eq!(msg.body(), "edit9");
2422 assert_eq!(res[0].parent_transaction_id, txn);
2423 }
2424
2425 #[test]
2426 fn test_canonicalize_multiple_local_echoes() {
2427 let txn1 = TransactionId::new();
2428 let txn2 = TransactionId::new();
2429
2430 let child1 = ChildTransactionId::new();
2431 let child2 = ChildTransactionId::new();
2432
2433 let inputs = vec![
2434 DependentQueuedRequest {
2436 own_transaction_id: child1.clone(),
2437 kind: DependentQueuedRequestKind::RedactEvent,
2438 parent_transaction_id: txn1.clone(),
2439 parent_key: None,
2440 created_at: MilliSecondsSinceUnixEpoch::now(),
2441 },
2442 DependentQueuedRequest {
2444 own_transaction_id: child2,
2445 kind: DependentQueuedRequestKind::EditEvent {
2446 new_content: SerializableEventContent::new(
2447 &RoomMessageEventContent::text_plain("edit").into(),
2448 )
2449 .unwrap(),
2450 },
2451 parent_transaction_id: txn2.clone(),
2452 parent_key: None,
2453 created_at: MilliSecondsSinceUnixEpoch::now(),
2454 },
2455 ];
2456
2457 let res = canonicalize_dependent_requests(&inputs);
2458
2459 assert_eq!(res.len(), 2);
2461
2462 for dependent in res {
2463 if dependent.own_transaction_id == child1 {
2464 assert_eq!(dependent.parent_transaction_id, txn1);
2465 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2466 } else {
2467 assert_eq!(dependent.parent_transaction_id, txn2);
2468 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2469 }
2470 }
2471 }
2472
2473 #[test]
2474 fn test_canonicalize_reactions_after_edits() {
2475 let txn = TransactionId::new();
2477
2478 let react_id = ChildTransactionId::new();
2479 let react = DependentQueuedRequest {
2480 own_transaction_id: react_id.clone(),
2481 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2482 parent_transaction_id: txn.clone(),
2483 parent_key: None,
2484 created_at: MilliSecondsSinceUnixEpoch::now(),
2485 };
2486
2487 let edit_id = ChildTransactionId::new();
2488 let edit = DependentQueuedRequest {
2489 own_transaction_id: edit_id.clone(),
2490 kind: DependentQueuedRequestKind::EditEvent {
2491 new_content: SerializableEventContent::new(
2492 &RoomMessageEventContent::text_plain("edit").into(),
2493 )
2494 .unwrap(),
2495 },
2496 parent_transaction_id: txn,
2497 parent_key: None,
2498 created_at: MilliSecondsSinceUnixEpoch::now(),
2499 };
2500
2501 let res = canonicalize_dependent_requests(&[react, edit]);
2502
2503 assert_eq!(res.len(), 2);
2504 assert_eq!(res[0].own_transaction_id, edit_id);
2505 assert_eq!(res[1].own_transaction_id, react_id);
2506 }
2507}