1use std::{
132 collections::{BTreeMap, HashMap},
133 ops::Not,
134 str::FromStr as _,
135 sync::{
136 Arc, RwLock,
137 atomic::{AtomicBool, Ordering},
138 },
139};
140
141use eyeball::SharedObservable;
142#[cfg(feature = "e2e-encryption")]
143use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
144#[cfg(feature = "unstable-msc4274")]
145use matrix_sdk_base::store::FinishGalleryItemInfo;
146use matrix_sdk_base::{
147 RoomState, StoreError,
148 cross_process_lock::CrossProcessLockError,
149 deserialized_responses::{EncryptionInfo, TimelineEvent},
150 event_cache::store::EventCacheStoreError,
151 media::{MediaRequestParameters, store::MediaStoreError},
152 store::{
153 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
154 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
155 SentMediaInfo, SentRequestKey, SerializableEventContent,
156 },
157 task_monitor::BackgroundTaskHandle,
158};
159use matrix_sdk_common::locks::Mutex as SyncMutex;
160use mime::Mime;
161use ruma::{
162 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
163 TransactionId,
164 events::{
165 AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _, TimelineEventType,
166 reaction::ReactionEventContent,
167 relation::Annotation,
168 room::{
169 MediaSource,
170 message::{FormattedBody, RoomMessageEventContent},
171 },
172 },
173 serde::Raw,
174};
175use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
176use tracing::{debug, error, info, instrument, trace, warn};
177
178use crate::{
179 Client, Media, Room, TransmissionProgress,
180 client::WeakClient,
181 config::RequestConfig,
182 error::RetryKind,
183 room::{WeakRoom, edit::EditedContent},
184};
185
186mod progress;
187mod upload;
188
189pub use progress::AbstractProgress;
190
191pub struct SendQueue {
193 client: Client,
194}
195
196#[cfg(not(tarpaulin_include))]
197impl std::fmt::Debug for SendQueue {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("SendQueue").finish_non_exhaustive()
200 }
201}
202
203impl SendQueue {
204 pub(super) fn new(client: Client) -> Self {
205 Self { client }
206 }
207
208 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
211 if !self.is_enabled() {
212 return;
213 }
214
215 let room_ids =
216 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
217 |err| {
218 warn!("error when loading rooms with unsent requests: {err}");
219 Vec::new()
220 },
221 );
222
223 for room_id in room_ids {
225 if let Some(room) = self.client.get_room(&room_id) {
226 let _ = self.for_room(room);
227 }
228 }
229 }
230
231 #[inline(always)]
233 fn data(&self) -> &SendQueueData {
234 &self.client.inner.send_queue_data
235 }
236
237 pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
240 let data = self.data();
241
242 let mut map = data.rooms.write().unwrap();
243
244 let room_id = room.room_id();
245 if let Some(room_q) = map.get(room_id).cloned() {
246 return room_q;
247 }
248
249 let owned_room_id = room_id.to_owned();
250 let room_q = RoomSendQueue::new(
251 self.is_enabled(),
252 data.global_update_sender.clone(),
253 data.error_sender.clone(),
254 data.is_dropping.clone(),
255 &self.client,
256 owned_room_id.clone(),
257 data.report_media_upload_progress.clone(),
258 );
259
260 map.insert(owned_room_id, room_q.clone());
261
262 room_q
263 }
264
265 pub async fn set_enabled(&self, enabled: bool) {
275 debug!(?enabled, "setting global send queue enablement");
276
277 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
278
279 for room in self.data().rooms.read().unwrap().values() {
281 room.set_enabled(enabled);
282 }
283
284 self.respawn_tasks_for_rooms_with_unsent_requests().await;
287 }
288
289 pub fn is_enabled(&self) -> bool {
292 self.data().globally_enabled.load(Ordering::SeqCst)
293 }
294
295 pub fn enable_upload_progress(&self, enabled: bool) {
297 self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
298 }
299
300 pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
305 self.data().global_update_sender.subscribe()
306 }
307
308 pub async fn local_echoes(
310 &self,
311 ) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
312 let room_ids =
313 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
314 |err| {
315 warn!("error when loading rooms with unsent requests: {err}");
316 Vec::new()
317 },
318 );
319
320 let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
321
322 for room_id in room_ids {
323 if let Some(room) = self.client.get_room(&room_id) {
324 let queue = self.for_room(room);
325 local_echoes
326 .insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
327 }
328 }
329
330 Ok(local_echoes)
331 }
332
333 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
336 self.data().error_sender.subscribe()
337 }
338}
339
340#[derive(Clone, Debug)]
343struct QueueThumbnailInfo {
344 finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
346
347 media_request_parameters: MediaRequestParameters,
349
350 content_type: Mime,
352
353 file_size: usize,
355}
356
357#[derive(Clone, Debug)]
359pub struct SendQueueRoomError {
360 pub room_id: OwnedRoomId,
362
363 pub error: Arc<crate::Error>,
365
366 pub is_recoverable: bool,
372}
373
374impl Client {
375 pub fn send_queue(&self) -> SendQueue {
378 SendQueue::new(self.clone())
379 }
380}
381
382pub(super) struct SendQueueData {
383 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
385
386 globally_enabled: AtomicBool,
391
392 global_update_sender: broadcast::Sender<SendQueueUpdate>,
396
397 error_sender: broadcast::Sender<SendQueueRoomError>,
399
400 is_dropping: Arc<AtomicBool>,
402
403 report_media_upload_progress: Arc<AtomicBool>,
405}
406
407impl SendQueueData {
408 pub fn new(globally_enabled: bool) -> Self {
410 let (global_update_sender, _) = broadcast::channel(32);
411 let (error_sender, _) = broadcast::channel(32);
412
413 Self {
414 rooms: Default::default(),
415 globally_enabled: AtomicBool::new(globally_enabled),
416 global_update_sender,
417 error_sender,
418 is_dropping: Arc::new(false.into()),
419 report_media_upload_progress: Arc::new(false.into()),
420 }
421 }
422}
423
424impl Drop for SendQueueData {
425 fn drop(&mut self) {
426 debug!("globally dropping the send queue");
429 self.is_dropping.store(true, Ordering::SeqCst);
430
431 let rooms = self.rooms.read().unwrap();
432 for room in rooms.values() {
433 room.inner.notifier.notify_one();
434 }
435 }
436}
437
438impl Room {
439 pub fn send_queue(&self) -> RoomSendQueue {
441 self.client.send_queue().for_room(self.clone())
442 }
443}
444
445#[derive(Clone)]
449pub struct RoomSendQueue {
450 inner: Arc<RoomSendQueueInner>,
451}
452
453#[cfg(not(tarpaulin_include))]
454impl std::fmt::Debug for RoomSendQueue {
455 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
457 }
458}
459
460impl RoomSendQueue {
461 fn new(
462 globally_enabled: bool,
463 global_update_sender: broadcast::Sender<SendQueueUpdate>,
464 global_error_sender: broadcast::Sender<SendQueueRoomError>,
465 is_dropping: Arc<AtomicBool>,
466 client: &Client,
467 room_id: OwnedRoomId,
468 report_media_upload_progress: Arc<AtomicBool>,
469 ) -> Self {
470 let (update_sender, _) = broadcast::channel(32);
471
472 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
473 let notifier = Arc::new(Notify::new());
474
475 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
476 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
477
478 let task = client.task_monitor().spawn_infinite_task(
479 "send_queue",
480 Self::sending_task(
481 weak_room.clone(),
482 queue.clone(),
483 notifier.clone(),
484 global_update_sender.clone(),
485 update_sender.clone(),
486 locally_enabled.clone(),
487 global_error_sender,
488 is_dropping,
489 report_media_upload_progress,
490 ),
491 );
492
493 Self {
494 inner: Arc::new(RoomSendQueueInner {
495 room: weak_room,
496 global_update_sender,
497 update_sender,
498 _task: task,
499 queue,
500 notifier,
501 locally_enabled,
502 }),
503 }
504 }
505
506 pub async fn send_raw(
521 &self,
522 content: Raw<AnyMessageLikeEventContent>,
523 event_type: String,
524 ) -> Result<SendHandle, RoomSendQueueError> {
525 let Some(room) = self.inner.room.get() else {
526 return Err(RoomSendQueueError::RoomDisappeared);
527 };
528 if room.state() != RoomState::Joined {
529 return Err(RoomSendQueueError::RoomNotJoined);
530 }
531
532 let content = SerializableEventContent::from_raw(content, event_type);
533
534 let created_at = MilliSecondsSinceUnixEpoch::now();
535 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
536 trace!(%transaction_id, "manager sends a raw event to the background task");
537
538 self.inner.notifier.notify_one();
539
540 let send_handle = SendHandle {
541 room: self.clone(),
542 transaction_id: transaction_id.clone(),
543 media_handles: vec![],
544 created_at,
545 };
546
547 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
548 transaction_id,
549 content: LocalEchoContent::Event {
550 serialized_event: content,
551 send_handle: send_handle.clone(),
552 send_error: None,
553 },
554 }));
555
556 Ok(send_handle)
557 }
558
559 pub async fn send(
574 &self,
575 content: AnyMessageLikeEventContent,
576 ) -> Result<SendHandle, RoomSendQueueError> {
577 self.send_raw(
578 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
579 content.event_type().to_string(),
580 )
581 .await
582 }
583
584 pub async fn redact(
599 &self,
600 redacts: OwnedEventId,
601 reason: Option<&str>,
602 ) -> Result<SendRedactionHandle, RoomSendQueueError> {
603 let Some(room) = self.inner.room.get() else {
604 return Err(RoomSendQueueError::RoomDisappeared);
605 };
606 if room.state() != RoomState::Joined {
607 return Err(RoomSendQueueError::RoomNotJoined);
608 }
609
610 let request = QueuedRequestKind::Redaction {
611 redacts: redacts.clone(),
612 reason: reason.map(str::to_owned),
613 };
614
615 let created_at = MilliSecondsSinceUnixEpoch::now();
616 let transaction_id = self.inner.queue.push(request, created_at).await?;
617 trace!(%transaction_id, "manager sends a redaction event to the background task");
618
619 self.inner.notifier.notify_one();
620
621 let send_handle =
622 SendRedactionHandle { room: self.clone(), transaction_id: transaction_id.clone() };
623
624 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
625 transaction_id,
626 content: LocalEchoContent::Redaction {
627 redacts,
628 reason: reason.map(str::to_owned),
629 send_handle: send_handle.clone(),
630 send_error: None,
631 },
632 }));
633
634 Ok(send_handle)
635 }
636
637 pub async fn subscribe(
643 &self,
644 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
645 {
646 let local_echoes = self.inner.queue.local_echoes(self).await?;
647
648 Ok((local_echoes, self.inner.update_sender.subscribe()))
649 }
650
651 #[allow(clippy::too_many_arguments)]
657 #[instrument(skip_all, fields(room_id = %room.room_id()))]
658 async fn sending_task(
659 room: WeakRoom,
660 queue: QueueStorage,
661 notifier: Arc<Notify>,
662 global_update_sender: broadcast::Sender<SendQueueUpdate>,
663 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
664 locally_enabled: Arc<AtomicBool>,
665 global_error_sender: broadcast::Sender<SendQueueRoomError>,
666 is_dropping: Arc<AtomicBool>,
667 report_media_upload_progress: Arc<AtomicBool>,
668 ) {
669 trace!("spawned the sending task");
670
671 let room_id = room.room_id();
672
673 loop {
674 if is_dropping.load(Ordering::SeqCst) {
676 trace!("shutting down!");
677 break;
678 }
679
680 let mut new_updates = Vec::new();
683 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
684 warn!("errors when applying dependent requests: {err}");
685 }
686
687 for up in new_updates {
688 send_update(&global_update_sender, &update_sender, room_id, up);
689 }
690
691 if !locally_enabled.load(Ordering::SeqCst) {
692 trace!("not enabled, sleeping");
693 notifier.notified().await;
695 continue;
696 }
697
698 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
699 Ok(Some(request)) => request,
700
701 Ok(None) => {
702 trace!("queue is empty, sleeping");
703 notifier.notified().await;
705 continue;
706 }
707
708 Err(err) => {
709 warn!("error when loading next request to send: {err}");
710 continue;
711 }
712 };
713
714 let txn_id = queued_request.transaction_id.clone();
715 trace!(txn_id = %txn_id, "received a request to send!");
716
717 let Some(room) = room.get() else {
718 if is_dropping.load(Ordering::SeqCst) {
719 break;
720 }
721 error!("the weak room couldn't be upgraded but we're not shutting down?");
722 continue;
723 };
724
725 let (related_txn_id, media_upload_progress_info, http_progress) =
730 if let QueuedRequestKind::MediaUpload {
731 cache_key,
732 thumbnail_source,
733 #[cfg(feature = "unstable-msc4274")]
734 accumulated,
735 related_to,
736 ..
737 } = &queued_request.kind
738 {
739 let (media_upload_progress_info, http_progress) =
742 if report_media_upload_progress.load(Ordering::SeqCst) {
743 let media_upload_progress_info =
744 RoomSendQueue::create_media_upload_progress_info(
745 &queued_request.transaction_id,
746 related_to,
747 cache_key,
748 thumbnail_source.as_ref(),
749 #[cfg(feature = "unstable-msc4274")]
750 accumulated,
751 &room,
752 &queue,
753 )
754 .await;
755
756 let progress = RoomSendQueue::create_media_upload_progress_observable(
757 &media_upload_progress_info,
758 related_to,
759 &update_sender,
760 );
761
762 (Some(media_upload_progress_info), Some(progress))
763 } else {
764 Default::default()
765 };
766
767 (Some(related_to.clone()), media_upload_progress_info, http_progress)
768 } else {
769 Default::default()
770 };
771
772 match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
773 {
774 Ok((Some(parent_key), encryption_info)) => match queue
775 .mark_as_sent(&txn_id, parent_key.clone())
776 .await
777 {
778 Ok(()) => match parent_key {
779 SentRequestKey::Event { event_id, event, event_type } => {
780 send_update(
781 &global_update_sender,
782 &update_sender,
783 room_id,
784 RoomSendQueueUpdate::SentEvent {
785 transaction_id: txn_id,
786 event_id: event_id.clone(),
787 },
788 );
789
790 if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
810 {
811 let timeline_event = match Raw::from_json_string(
812 format!(
814 "{{\
815 \"event_id\":\"{event_id}\",\
816 \"origin_server_ts\":{ts},\
817 \"sender\":\"{sender}\",\
818 \"type\":\"{type}\",\
819 \"content\":{content}\
820 }}",
821 event_id = event_id,
822 ts = MilliSecondsSinceUnixEpoch::now().get(),
823 sender = room.client().user_id().expect("Client must be logged-in"),
824 type = event_type,
825 content = event.into_json(),
826 ),
827 ) {
828 Ok(event) => match encryption_info {
829 #[cfg(feature = "e2e-encryption")]
830 Some(encryption_info) => {
831 use matrix_sdk_base::deserialized_responses::DecryptedRoomEvent;
832 let decrypted_event = DecryptedRoomEvent {
833 event: event.cast_unchecked(),
834 encryption_info: Arc::new(encryption_info),
835 unsigned_encryption_info: None,
836 };
837 Some(TimelineEvent::from_decrypted(
838 decrypted_event,
839 None,
840 ))
841 }
842 _ => Some(TimelineEvent::from_plaintext(event)),
843 },
844 Err(err) => {
845 error!(
846 ?err,
847 "Failed to build the (sync) event before the saving in the Event Cache"
848 );
849 None
850 }
851 };
852
853 if let Some(timeline_event) = timeline_event
856 && let Err(err) = room_event_cache
857 .insert_sent_event_from_send_queue(timeline_event)
858 .await
859 {
860 error!(
861 ?err,
862 "Failed to save the sent event in the Event Cache"
863 );
864 }
865 } else {
866 info!(
867 "Cannot insert the sent event in the Event Cache because \
868 either the room no longer exists, or the Room Event Cache cannot be retrieved"
869 );
870 }
871 }
872
873 SentRequestKey::Media(sent_media_info) => {
874 let index =
877 media_upload_progress_info.as_ref().map_or(0, |info| info.index);
878 let progress = media_upload_progress_info
879 .as_ref()
880 .map(|info| {
881 AbstractProgress { current: info.bytes, total: info.bytes }
882 + info.offsets
883 })
884 .unwrap_or(AbstractProgress { current: 1, total: 1 });
885
886 let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
889 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
890 file: Some(sent_media_info.file),
891 index,
892 progress,
893 });
894 }
895
896 SentRequestKey::Redaction { event_id, redacts, reason } => {
897 send_update(
898 &global_update_sender,
899 &update_sender,
900 room_id,
901 RoomSendQueueUpdate::SentEvent {
902 transaction_id: txn_id,
903 event_id: event_id.clone(),
904 },
905 );
906
907 if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
912 {
913 let content_field_redacts = room.version().is_some_and(|id| {
914 id.rules()
915 .is_some_and(|rules| rules.redaction.content_field_redacts)
916 });
917 let redacts = if content_field_redacts.not() {
918 format!("\"redacts\":\"{redacts}\",")
919 } else {
920 "".to_owned()
921 };
922 let reason = reason.map_or_else(
923 || "".to_owned(),
924 |r| format!("\"reason\": \"{r}\""),
925 );
926 let content = if content_field_redacts {
927 format!("\"redacts\":\"{redacts}\",{reason}")
928 } else {
929 reason
930 };
931
932 let timeline_event = match Raw::from_json_string(
933 format!(
935 "{{\
936 {redacts}\
937 \"event_id\":\"{event_id}\",\
938 \"origin_server_ts\":{ts},\
939 \"sender\":\"{sender}\",\
940 \"type\":\"{type}\",\
941 \"content\":{{{content}}}\
942 }}",
943 redacts = redacts,
944 event_id = event_id,
945 ts = MilliSecondsSinceUnixEpoch::now().get(),
946 sender = room.client().user_id().expect("Client must be logged-in"),
947 type = TimelineEventType::RoomRedaction,
948 content = content
949 ),
950 ) {
951 Ok(event) => Some(TimelineEvent::from_plaintext(event)),
952 Err(err) => {
953 error!(
954 ?err,
955 "Failed to build the (sync) redaction event before the saving in the Event Cache"
956 );
957 None
958 }
959 };
960
961 if let Some(timeline_event) = timeline_event
964 && let Err(err) = room_event_cache
965 .insert_sent_event_from_send_queue(timeline_event)
966 .await
967 {
968 error!(
969 ?err,
970 "Failed to save the sent redaction event in the Event Cache"
971 );
972 }
973 } else {
974 info!(
975 "Cannot insert the sent redaction event in the Event Cache because \
976 either the room no longer exists, or the Room Event Cache cannot be retrieved"
977 );
978 }
979 }
980 },
981
982 Err(err) => {
983 warn!("unable to mark queued request as sent: {err}");
984 }
985 },
986
987 Ok((None, _)) => {
988 debug!("Request has been aborted while running, continuing.");
989 }
990
991 Err(err) => {
992 let is_recoverable = match err {
993 crate::Error::Http(ref http_err) => {
994 matches!(
996 http_err.retry_kind(),
997 RetryKind::Transient { .. } | RetryKind::NetworkFailure
998 )
999 }
1000
1001 crate::Error::ConcurrentRequestFailed => true,
1006
1007 _ => false,
1009 };
1010
1011 locally_enabled.store(false, Ordering::SeqCst);
1013
1014 if is_recoverable {
1015 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
1016
1017 queue.mark_as_not_being_sent(&txn_id).await;
1020
1021 } else {
1027 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
1028
1029 if let Err(storage_error) =
1031 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
1032 {
1033 warn!("unable to mark request as wedged: {storage_error}");
1034 }
1035 }
1036
1037 let error = Arc::new(err);
1038
1039 let _ = global_error_sender.send(SendQueueRoomError {
1040 room_id: room_id.to_owned(),
1041 error: error.clone(),
1042 is_recoverable,
1043 });
1044
1045 send_update(
1046 &global_update_sender,
1047 &update_sender,
1048 room_id,
1049 RoomSendQueueUpdate::SendError {
1050 transaction_id: related_txn_id.unwrap_or(txn_id),
1051 error,
1052 is_recoverable,
1053 },
1054 );
1055 }
1056 }
1057 }
1058
1059 info!("exited sending task");
1060 }
1061
1062 async fn handle_request(
1066 room: &Room,
1067 request: QueuedRequest,
1068 cancel_upload_rx: Option<oneshot::Receiver<()>>,
1069 progress: Option<SharedObservable<TransmissionProgress>>,
1070 ) -> Result<(Option<SentRequestKey>, Option<EncryptionInfo>), crate::Error> {
1071 match request.kind {
1072 QueuedRequestKind::Event { content } => {
1073 let (event, event_type) = content.into_raw();
1074
1075 let result = room
1076 .send_raw(&event_type, &event)
1077 .with_transaction_id(&request.transaction_id)
1078 .with_request_config(RequestConfig::short_retry())
1079 .await?;
1080
1081 trace!(txn_id = %request.transaction_id, event_id = %result.response.event_id, "event successfully sent");
1082
1083 Ok((
1084 Some(SentRequestKey::Event {
1085 event_id: result.response.event_id,
1086 event,
1087 event_type,
1088 }),
1089 result.encryption_info,
1090 ))
1091 }
1092
1093 QueuedRequestKind::MediaUpload {
1094 content_type,
1095 cache_key,
1096 thumbnail_source,
1097 related_to: relates_to,
1098 #[cfg(feature = "unstable-msc4274")]
1099 accumulated,
1100 } => {
1101 trace!(%relates_to, "uploading media related to event");
1102
1103 let fut = async move {
1104 let data = room
1105 .client()
1106 .media_store()
1107 .lock()
1108 .await?
1109 .get_media_content(&cache_key)
1110 .await?
1111 .ok_or(crate::Error::SendQueueWedgeError(Box::new(
1112 QueueWedgeError::MissingMediaContent,
1113 )))?;
1114
1115 let mime = Mime::from_str(&content_type).map_err(|_| {
1116 crate::Error::SendQueueWedgeError(Box::new(
1117 QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
1118 ))
1119 })?;
1120
1121 #[cfg(feature = "e2e-encryption")]
1122 let media_source = if room.latest_encryption_state().await?.is_encrypted() {
1123 trace!("upload will be encrypted (encrypted room)");
1124
1125 let mut cursor = std::io::Cursor::new(data);
1126 let mut req = room
1127 .client
1128 .upload_encrypted_file(&mut cursor)
1129 .with_request_config(RequestConfig::short_retry());
1130 if let Some(progress) = progress {
1131 req = req.with_send_progress_observable(progress);
1132 }
1133 let encrypted_file = req.await?;
1134
1135 MediaSource::Encrypted(Box::new(encrypted_file))
1136 } else {
1137 trace!("upload will be in clear text (room without encryption)");
1138
1139 let request_config = RequestConfig::short_retry()
1140 .timeout(Media::reasonable_upload_timeout(&data));
1141 let mut req =
1142 room.client().media().upload(&mime, data, Some(request_config));
1143 if let Some(progress) = progress {
1144 req = req.with_send_progress_observable(progress);
1145 }
1146 let res = req.await?;
1147
1148 MediaSource::Plain(res.content_uri)
1149 };
1150
1151 #[cfg(not(feature = "e2e-encryption"))]
1152 let media_source = {
1153 let request_config = RequestConfig::short_retry()
1154 .timeout(Media::reasonable_upload_timeout(&data));
1155 let mut req =
1156 room.client().media().upload(&mime, data, Some(request_config));
1157 if let Some(progress) = progress {
1158 req = req.with_send_progress_observable(progress);
1159 }
1160 let res = req.await?;
1161 MediaSource::Plain(res.content_uri)
1162 };
1163
1164 let uri = match &media_source {
1165 MediaSource::Plain(uri) => uri,
1166 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
1167 };
1168 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
1169
1170 Ok((
1171 Some(SentRequestKey::Media(SentMediaInfo {
1172 file: media_source,
1173 thumbnail: thumbnail_source,
1174 #[cfg(feature = "unstable-msc4274")]
1175 accumulated,
1176 })),
1177 None,
1178 ))
1179 };
1180
1181 let wait_for_cancel = async move {
1182 if let Some(rx) = cancel_upload_rx {
1183 rx.await
1184 } else {
1185 std::future::pending().await
1186 }
1187 };
1188
1189 tokio::select! {
1190 biased;
1191
1192 _ = wait_for_cancel => {
1193 Ok((None, None))
1194 }
1195
1196 res = fut => {
1197 res
1198 }
1199 }
1200 }
1201
1202 QueuedRequestKind::Redaction { redacts, reason } => {
1203 let result = room
1204 .redact(&redacts, reason.as_deref(), Some(request.transaction_id.clone()))
1205 .await?;
1206
1207 trace!(txn_id = %request.transaction_id, event_id = %result.event_id, "redaction successfully sent");
1208
1209 Ok((
1210 Some(SentRequestKey::Redaction { event_id: result.event_id, redacts, reason }),
1211 None,
1212 ))
1213 }
1214 }
1215 }
1216
1217 pub fn is_enabled(&self) -> bool {
1219 self.inner.locally_enabled.load(Ordering::SeqCst)
1220 }
1221
1222 pub fn set_enabled(&self, enabled: bool) {
1224 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
1225
1226 if enabled {
1229 self.inner.notifier.notify_one();
1230 }
1231 }
1232
1233 fn send_update(&self, update: RoomSendQueueUpdate) {
1237 let _ = self.inner.update_sender.send(update.clone());
1238 let _ = self
1239 .inner
1240 .global_update_sender
1241 .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
1242 }
1243}
1244
1245fn send_update(
1246 global_update_sender: &broadcast::Sender<SendQueueUpdate>,
1247 update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
1248 room_id: &RoomId,
1249 update: RoomSendQueueUpdate,
1250) {
1251 let _ = update_sender.send(update.clone());
1252 let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
1253}
1254
1255impl From<&crate::Error> for QueueWedgeError {
1256 fn from(value: &crate::Error) -> Self {
1257 match value {
1258 #[cfg(feature = "e2e-encryption")]
1259 crate::Error::OlmError(error) => match &**error {
1260 OlmError::SessionRecipientCollectionError(error) => match error {
1261 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
1262 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
1263 }
1264
1265 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
1266 QueueWedgeError::IdentityViolations { users: users.clone() }
1267 }
1268
1269 SessionRecipientCollectionError::CrossSigningNotSetup
1270 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
1271 QueueWedgeError::CrossVerificationRequired
1272 }
1273 },
1274 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1275 },
1276
1277 crate::Error::SendQueueWedgeError(error) => *error.clone(),
1279
1280 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1281 }
1282 }
1283}
1284
1285struct RoomSendQueueInner {
1286 room: WeakRoom,
1288
1289 global_update_sender: broadcast::Sender<SendQueueUpdate>,
1293
1294 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1300
1301 queue: QueueStorage,
1308
1309 notifier: Arc<Notify>,
1312
1313 locally_enabled: Arc<AtomicBool>,
1316
1317 _task: BackgroundTaskHandle,
1320}
1321
1322struct BeingSentInfo {
1324 transaction_id: OwnedTransactionId,
1326
1327 cancel_upload: Option<oneshot::Sender<()>>,
1330}
1331
1332impl BeingSentInfo {
1333 fn cancel_upload(self) -> bool {
1338 if let Some(cancel_upload) = self.cancel_upload {
1339 let _ = cancel_upload.send(());
1340 true
1341 } else {
1342 false
1343 }
1344 }
1345}
1346
1347#[derive(Clone)]
1350struct StoreLock {
1351 client: WeakClient,
1353
1354 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1359}
1360
1361impl StoreLock {
1362 async fn lock(&self) -> StoreLockGuard {
1364 StoreLockGuard {
1365 client: self.client.clone(),
1366 being_sent: self.being_sent.clone().lock_owned().await,
1367 }
1368 }
1369}
1370
1371struct StoreLockGuard {
1374 client: WeakClient,
1376
1377 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1380}
1381
1382impl StoreLockGuard {
1383 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1385 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1386 }
1387}
1388
1389#[derive(Clone)]
1390struct QueueStorage {
1391 store: StoreLock,
1394
1395 room_id: OwnedRoomId,
1397
1398 thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1411}
1412
1413impl QueueStorage {
1414 const LOW_PRIORITY: usize = 0;
1416
1417 const HIGH_PRIORITY: usize = 10;
1419
1420 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1422 Self {
1423 room_id: room,
1424 store: StoreLock { client, being_sent: Default::default() },
1425 thumbnail_file_sizes: Default::default(),
1426 }
1427 }
1428
1429 async fn push(
1433 &self,
1434 request: QueuedRequestKind,
1435 created_at: MilliSecondsSinceUnixEpoch,
1436 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1437 let transaction_id = TransactionId::new();
1438
1439 self.store
1440 .lock()
1441 .await
1442 .client()?
1443 .state_store()
1444 .save_send_queue_request(
1445 &self.room_id,
1446 transaction_id.clone(),
1447 created_at,
1448 request,
1449 Self::LOW_PRIORITY,
1450 )
1451 .await?;
1452
1453 Ok(transaction_id)
1454 }
1455
1456 async fn peek_next_to_send(
1461 &self,
1462 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1463 {
1464 let mut guard = self.store.lock().await;
1465 let queued_requests =
1466 guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1467
1468 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1469 let (cancel_upload_tx, cancel_upload_rx) =
1470 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1471 let (tx, rx) = oneshot::channel();
1472 (Some(tx), Some(rx))
1473 } else {
1474 Default::default()
1475 };
1476
1477 let prev = guard.being_sent.replace(BeingSentInfo {
1478 transaction_id: request.transaction_id.clone(),
1479 cancel_upload: cancel_upload_tx,
1480 });
1481
1482 if let Some(prev) = prev {
1483 error!(
1484 prev_txn = ?prev.transaction_id,
1485 "a previous request was still active while picking a new one"
1486 );
1487 }
1488
1489 Ok(Some((request.clone(), cancel_upload_rx)))
1490 } else {
1491 Ok(None)
1492 }
1493 }
1494
1495 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1499 let was_being_sent = self.store.lock().await.being_sent.take();
1500
1501 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1502 if prev_txn != Some(transaction_id) {
1503 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1504 }
1505 }
1506
1507 async fn mark_as_wedged(
1511 &self,
1512 transaction_id: &TransactionId,
1513 reason: QueueWedgeError,
1514 ) -> Result<(), RoomSendQueueStorageError> {
1515 let mut guard = self.store.lock().await;
1517 let was_being_sent = guard.being_sent.take();
1518
1519 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1520 if prev_txn != Some(transaction_id) {
1521 error!(
1522 ?prev_txn,
1523 "previous active request didn't match that we expect (after permanent error)",
1524 );
1525 }
1526
1527 Ok(guard
1528 .client()?
1529 .state_store()
1530 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1531 .await?)
1532 }
1533
1534 async fn mark_as_unwedged(
1537 &self,
1538 transaction_id: &TransactionId,
1539 ) -> Result<(), RoomSendQueueStorageError> {
1540 Ok(self
1541 .store
1542 .lock()
1543 .await
1544 .client()?
1545 .state_store()
1546 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1547 .await?)
1548 }
1549
1550 async fn mark_as_sent(
1553 &self,
1554 transaction_id: &TransactionId,
1555 parent_key: SentRequestKey,
1556 ) -> Result<(), RoomSendQueueStorageError> {
1557 let mut guard = self.store.lock().await;
1559 let was_being_sent = guard.being_sent.take();
1560
1561 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1562 if prev_txn != Some(transaction_id) {
1563 error!(
1564 ?prev_txn,
1565 "previous active request didn't match that we expect (after successful send)",
1566 );
1567 }
1568
1569 let client = guard.client()?;
1570 let store = client.state_store();
1571
1572 store
1574 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1575 .await?;
1576
1577 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1578
1579 if !removed {
1580 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1581 }
1582
1583 self.thumbnail_file_sizes.lock().remove(transaction_id);
1584
1585 Ok(())
1586 }
1587
1588 async fn cancel_event(
1595 &self,
1596 transaction_id: &TransactionId,
1597 ) -> Result<bool, RoomSendQueueStorageError> {
1598 let guard = self.store.lock().await;
1599
1600 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1601 == Some(transaction_id)
1602 {
1603 guard
1605 .client()?
1606 .state_store()
1607 .save_dependent_queued_request(
1608 &self.room_id,
1609 transaction_id,
1610 ChildTransactionId::new(),
1611 MilliSecondsSinceUnixEpoch::now(),
1612 DependentQueuedRequestKind::RedactEvent,
1613 )
1614 .await?;
1615
1616 return Ok(true);
1617 }
1618
1619 let removed = guard
1620 .client()?
1621 .state_store()
1622 .remove_send_queue_request(&self.room_id, transaction_id)
1623 .await?;
1624
1625 self.thumbnail_file_sizes.lock().remove(transaction_id);
1626
1627 Ok(removed)
1628 }
1629
1630 async fn replace_event(
1637 &self,
1638 transaction_id: &TransactionId,
1639 serializable: SerializableEventContent,
1640 ) -> Result<bool, RoomSendQueueStorageError> {
1641 let guard = self.store.lock().await;
1642
1643 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1644 == Some(transaction_id)
1645 {
1646 guard
1648 .client()?
1649 .state_store()
1650 .save_dependent_queued_request(
1651 &self.room_id,
1652 transaction_id,
1653 ChildTransactionId::new(),
1654 MilliSecondsSinceUnixEpoch::now(),
1655 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1656 )
1657 .await?;
1658
1659 return Ok(true);
1660 }
1661
1662 let edited = guard
1663 .client()?
1664 .state_store()
1665 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1666 .await?;
1667
1668 Ok(edited)
1669 }
1670
1671 #[allow(clippy::too_many_arguments)]
1675 async fn push_media(
1676 &self,
1677 event: RoomMessageEventContent,
1678 content_type: Mime,
1679 send_event_txn: OwnedTransactionId,
1680 created_at: MilliSecondsSinceUnixEpoch,
1681 upload_file_txn: OwnedTransactionId,
1682 file_media_request: MediaRequestParameters,
1683 thumbnail: Option<QueueThumbnailInfo>,
1684 ) -> Result<(), RoomSendQueueStorageError> {
1685 let guard = self.store.lock().await;
1686 let client = guard.client()?;
1687 let store = client.state_store();
1688
1689 let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1691
1692 let thumbnail_info = self
1693 .push_thumbnail_and_media_uploads(
1694 store,
1695 &content_type,
1696 send_event_txn.clone(),
1697 created_at,
1698 upload_file_txn.clone(),
1699 file_media_request,
1700 thumbnail,
1701 )
1702 .await?;
1703
1704 store
1706 .save_dependent_queued_request(
1707 &self.room_id,
1708 &upload_file_txn,
1709 send_event_txn.clone().into(),
1710 created_at,
1711 DependentQueuedRequestKind::FinishUpload {
1712 local_echo: Box::new(event),
1713 file_upload: upload_file_txn.clone(),
1714 thumbnail_info,
1715 },
1716 )
1717 .await?;
1718
1719 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1720
1721 Ok(())
1722 }
1723
1724 #[cfg(feature = "unstable-msc4274")]
1728 #[allow(clippy::too_many_arguments)]
1729 async fn push_gallery(
1730 &self,
1731 event: RoomMessageEventContent,
1732 send_event_txn: OwnedTransactionId,
1733 created_at: MilliSecondsSinceUnixEpoch,
1734 item_queue_infos: Vec<GalleryItemQueueInfo>,
1735 ) -> Result<(), RoomSendQueueStorageError> {
1736 let guard = self.store.lock().await;
1737 let client = guard.client()?;
1738 let store = client.state_store();
1739
1740 let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1741 let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1742
1743 let Some((first, rest)) = item_queue_infos.split_first() else {
1744 return Ok(());
1745 };
1746
1747 let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1748 first;
1749
1750 let thumbnail_info = self
1751 .push_thumbnail_and_media_uploads(
1752 store,
1753 content_type,
1754 send_event_txn.clone(),
1755 created_at,
1756 upload_file_txn.clone(),
1757 file_media_request.clone(),
1758 thumbnail.clone(),
1759 )
1760 .await?;
1761
1762 finish_item_infos
1763 .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1764 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1765
1766 let mut last_upload_file_txn = upload_file_txn.clone();
1767
1768 for item_queue_info in rest {
1769 let GalleryItemQueueInfo {
1770 content_type,
1771 upload_file_txn,
1772 file_media_request,
1773 thumbnail,
1774 } = item_queue_info;
1775
1776 let thumbnail_info = if let Some(QueueThumbnailInfo {
1777 finish_upload_thumbnail_info: thumbnail_info,
1778 media_request_parameters: thumbnail_media_request,
1779 content_type: thumbnail_content_type,
1780 ..
1781 }) = thumbnail
1782 {
1783 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1784
1785 store
1788 .save_dependent_queued_request(
1789 &self.room_id,
1790 &last_upload_file_txn,
1791 upload_thumbnail_txn.clone().into(),
1792 created_at,
1793 DependentQueuedRequestKind::UploadFileOrThumbnail {
1794 content_type: thumbnail_content_type.to_string(),
1795 cache_key: thumbnail_media_request.clone(),
1796 related_to: send_event_txn.clone(),
1797 parent_is_thumbnail_upload: false,
1798 },
1799 )
1800 .await?;
1801
1802 last_upload_file_txn = upload_thumbnail_txn;
1803
1804 Some(thumbnail_info)
1805 } else {
1806 None
1807 };
1808
1809 store
1811 .save_dependent_queued_request(
1812 &self.room_id,
1813 &last_upload_file_txn,
1814 upload_file_txn.clone().into(),
1815 created_at,
1816 DependentQueuedRequestKind::UploadFileOrThumbnail {
1817 content_type: content_type.to_string(),
1818 cache_key: file_media_request.clone(),
1819 related_to: send_event_txn.clone(),
1820 parent_is_thumbnail_upload: thumbnail.is_some(),
1821 },
1822 )
1823 .await?;
1824
1825 finish_item_infos.push(FinishGalleryItemInfo {
1826 file_upload: upload_file_txn.clone(),
1827 thumbnail_info: thumbnail_info.cloned(),
1828 });
1829 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1830
1831 last_upload_file_txn = upload_file_txn.clone();
1832 }
1833
1834 store
1837 .save_dependent_queued_request(
1838 &self.room_id,
1839 &last_upload_file_txn,
1840 send_event_txn.clone().into(),
1841 created_at,
1842 DependentQueuedRequestKind::FinishGallery {
1843 local_echo: Box::new(event),
1844 item_infos: finish_item_infos,
1845 },
1846 )
1847 .await?;
1848
1849 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1850
1851 Ok(())
1852 }
1853
1854 #[allow(clippy::too_many_arguments)]
1860 async fn push_thumbnail_and_media_uploads(
1861 &self,
1862 store: &DynStateStore,
1863 content_type: &Mime,
1864 send_event_txn: OwnedTransactionId,
1865 created_at: MilliSecondsSinceUnixEpoch,
1866 upload_file_txn: OwnedTransactionId,
1867 file_media_request: MediaRequestParameters,
1868 thumbnail: Option<QueueThumbnailInfo>,
1869 ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1870 if let Some(QueueThumbnailInfo {
1871 finish_upload_thumbnail_info: thumbnail_info,
1872 media_request_parameters: thumbnail_media_request,
1873 content_type: thumbnail_content_type,
1874 ..
1875 }) = thumbnail
1876 {
1877 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1878
1879 store
1881 .save_send_queue_request(
1882 &self.room_id,
1883 upload_thumbnail_txn.clone(),
1884 created_at,
1885 QueuedRequestKind::MediaUpload {
1886 content_type: thumbnail_content_type.to_string(),
1887 cache_key: thumbnail_media_request,
1888 thumbnail_source: None, related_to: send_event_txn.clone(),
1890 #[cfg(feature = "unstable-msc4274")]
1891 accumulated: vec![],
1892 },
1893 Self::LOW_PRIORITY,
1894 )
1895 .await?;
1896
1897 store
1899 .save_dependent_queued_request(
1900 &self.room_id,
1901 &upload_thumbnail_txn,
1902 upload_file_txn.into(),
1903 created_at,
1904 DependentQueuedRequestKind::UploadFileOrThumbnail {
1905 content_type: content_type.to_string(),
1906 cache_key: file_media_request,
1907 related_to: send_event_txn,
1908 parent_is_thumbnail_upload: true,
1909 },
1910 )
1911 .await?;
1912
1913 Ok(Some(thumbnail_info))
1914 } else {
1915 store
1917 .save_send_queue_request(
1918 &self.room_id,
1919 upload_file_txn,
1920 created_at,
1921 QueuedRequestKind::MediaUpload {
1922 content_type: content_type.to_string(),
1923 cache_key: file_media_request,
1924 thumbnail_source: None,
1925 related_to: send_event_txn,
1926 #[cfg(feature = "unstable-msc4274")]
1927 accumulated: vec![],
1928 },
1929 Self::LOW_PRIORITY,
1930 )
1931 .await?;
1932
1933 Ok(None)
1934 }
1935 }
1936
1937 #[instrument(skip(self))]
1939 async fn react(
1940 &self,
1941 transaction_id: &TransactionId,
1942 key: String,
1943 created_at: MilliSecondsSinceUnixEpoch,
1944 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1945 let guard = self.store.lock().await;
1946 let client = guard.client()?;
1947 let store = client.state_store();
1948
1949 let requests = store.load_send_queue_requests(&self.room_id).await?;
1950
1951 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1953 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1956 if !dependent_requests
1957 .into_iter()
1958 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1959 .any(|child_txn| *child_txn == *transaction_id)
1960 {
1961 return Ok(None);
1963 }
1964 }
1965
1966 let reaction_txn_id = ChildTransactionId::new();
1968 store
1969 .save_dependent_queued_request(
1970 &self.room_id,
1971 transaction_id,
1972 reaction_txn_id.clone(),
1973 created_at,
1974 DependentQueuedRequestKind::ReactEvent { key },
1975 )
1976 .await?;
1977
1978 Ok(Some(reaction_txn_id))
1979 }
1980
1981 async fn local_echoes(
1984 &self,
1985 room: &RoomSendQueue,
1986 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1987 let guard = self.store.lock().await;
1988 let client = guard.client()?;
1989 let store = client.state_store();
1990
1991 let local_requests =
1992 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1993 Some(LocalEcho {
1994 transaction_id: queued.transaction_id.clone(),
1995 content: match queued.kind {
1996 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1997 serialized_event: content,
1998 send_handle: SendHandle {
1999 room: room.clone(),
2000 transaction_id: queued.transaction_id,
2001 media_handles: vec![],
2002 created_at: queued.created_at,
2003 },
2004 send_error: queued.error,
2005 },
2006
2007 QueuedRequestKind::MediaUpload { .. } => {
2008 return None;
2011 }
2012
2013 QueuedRequestKind::Redaction { redacts, reason } => {
2014 LocalEchoContent::Redaction {
2015 redacts,
2016 reason,
2017 send_handle: SendRedactionHandle {
2018 room: room.clone(),
2019 transaction_id: queued.transaction_id,
2020 },
2021 send_error: queued.error,
2022 }
2023 }
2024 },
2025 })
2026 });
2027
2028 let reactions_and_medias = store
2029 .load_dependent_queued_requests(&self.room_id)
2030 .await?
2031 .into_iter()
2032 .filter_map(|dep| match dep.kind {
2033 DependentQueuedRequestKind::EditEvent { .. }
2034 | DependentQueuedRequestKind::RedactEvent => {
2035 None
2037 }
2038
2039 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
2040 transaction_id: dep.own_transaction_id.clone().into(),
2041 content: LocalEchoContent::React {
2042 key,
2043 send_handle: SendReactionHandle {
2044 room: room.clone(),
2045 transaction_id: dep.own_transaction_id,
2046 },
2047 applies_to: dep.parent_transaction_id,
2048 },
2049 }),
2050
2051 DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
2052 None
2054 }
2055
2056 DependentQueuedRequestKind::FinishUpload {
2057 local_echo,
2058 file_upload,
2059 thumbnail_info,
2060 } => {
2061 Some(LocalEcho {
2063 transaction_id: dep.own_transaction_id.clone().into(),
2064 content: LocalEchoContent::Event {
2065 serialized_event: SerializableEventContent::new(&(*local_echo).into())
2066 .ok()?,
2067 send_handle: SendHandle {
2068 room: room.clone(),
2069 transaction_id: dep.own_transaction_id.into(),
2070 media_handles: vec![MediaHandles {
2071 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
2072 upload_file_txn: file_upload,
2073 }],
2074 created_at: dep.created_at,
2075 },
2076 send_error: None,
2077 },
2078 })
2079 }
2080
2081 #[cfg(feature = "unstable-msc4274")]
2082 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2083 self.create_gallery_local_echo(
2085 dep.own_transaction_id,
2086 room,
2087 dep.created_at,
2088 local_echo,
2089 item_infos,
2090 )
2091 }
2092 });
2093
2094 Ok(local_requests.chain(reactions_and_medias).collect())
2095 }
2096
2097 #[cfg(feature = "unstable-msc4274")]
2099 fn create_gallery_local_echo(
2100 &self,
2101 transaction_id: ChildTransactionId,
2102 room: &RoomSendQueue,
2103 created_at: MilliSecondsSinceUnixEpoch,
2104 local_echo: Box<RoomMessageEventContent>,
2105 item_infos: Vec<FinishGalleryItemInfo>,
2106 ) -> Option<LocalEcho> {
2107 Some(LocalEcho {
2108 transaction_id: transaction_id.clone().into(),
2109 content: LocalEchoContent::Event {
2110 serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
2111 send_handle: SendHandle {
2112 room: room.clone(),
2113 transaction_id: transaction_id.into(),
2114 media_handles: item_infos
2115 .into_iter()
2116 .map(|i| MediaHandles {
2117 upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
2118 upload_file_txn: i.file_upload,
2119 })
2120 .collect(),
2121 created_at,
2122 },
2123 send_error: None,
2124 },
2125 })
2126 }
2127
2128 #[instrument(skip_all)]
2136 async fn try_apply_single_dependent_request(
2137 &self,
2138 client: &Client,
2139 dependent_request: DependentQueuedRequest,
2140 new_updates: &mut Vec<RoomSendQueueUpdate>,
2141 ) -> Result<bool, RoomSendQueueError> {
2142 let store = client.state_store();
2143
2144 let parent_key = dependent_request.parent_key;
2145
2146 match dependent_request.kind {
2147 DependentQueuedRequestKind::EditEvent { new_content } => {
2148 if let Some(parent_key) = parent_key {
2149 let Some(event_id) = parent_key.into_event_id() else {
2150 return Err(RoomSendQueueError::StorageError(
2151 RoomSendQueueStorageError::InvalidParentKey,
2152 ));
2153 };
2154
2155 let room = client
2157 .get_room(&self.room_id)
2158 .ok_or(RoomSendQueueError::RoomDisappeared)?;
2159
2160 let edited_content = match new_content.deserialize() {
2164 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
2165 EditedContent::RoomMessage(c.into())
2167 }
2168
2169 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
2170 let poll_start = c.poll_start().clone();
2171 EditedContent::PollStart {
2172 fallback_text: poll_start.question.text.clone(),
2173 new_content: poll_start,
2174 }
2175 }
2176
2177 Ok(c) => {
2178 warn!("Unsupported edit content type: {:?}", c.event_type());
2179 return Ok(true);
2180 }
2181
2182 Err(err) => {
2183 warn!("Unable to deserialize: {err}");
2184 return Ok(true);
2185 }
2186 };
2187
2188 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
2189 Ok(e) => e,
2190 Err(err) => {
2191 warn!("couldn't create edited event: {err}");
2192 return Ok(true);
2193 }
2194 };
2195
2196 let serializable = SerializableEventContent::from_raw(
2198 Raw::new(&edit_event)
2199 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2200 edit_event.event_type().to_string(),
2201 );
2202
2203 store
2204 .save_send_queue_request(
2205 &self.room_id,
2206 dependent_request.own_transaction_id.into(),
2207 dependent_request.created_at,
2208 serializable.into(),
2209 Self::HIGH_PRIORITY,
2210 )
2211 .await
2212 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2213 } else {
2214 let edited = store
2216 .update_send_queue_request(
2217 &self.room_id,
2218 &dependent_request.parent_transaction_id,
2219 new_content.into(),
2220 )
2221 .await
2222 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2223
2224 if !edited {
2225 warn!("missing local echo upon dependent edit");
2226 }
2227 }
2228 }
2229
2230 DependentQueuedRequestKind::RedactEvent => {
2231 if let Some(parent_key) = parent_key {
2232 let Some(event_id) = parent_key.into_event_id() else {
2233 return Err(RoomSendQueueError::StorageError(
2234 RoomSendQueueStorageError::InvalidParentKey,
2235 ));
2236 };
2237
2238 let room = client
2240 .get_room(&self.room_id)
2241 .ok_or(RoomSendQueueError::RoomDisappeared)?;
2242
2243 if let Err(err) = room
2251 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
2252 .await
2253 {
2254 warn!("error when sending a redact for {event_id}: {err}");
2255 return Ok(false);
2256 }
2257 } else {
2258 let removed = store
2261 .remove_send_queue_request(
2262 &self.room_id,
2263 &dependent_request.parent_transaction_id,
2264 )
2265 .await
2266 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2267
2268 if !removed {
2269 warn!("missing local echo upon dependent redact");
2270 }
2271 }
2272 }
2273
2274 DependentQueuedRequestKind::ReactEvent { key } => {
2275 if let Some(parent_key) = parent_key {
2276 let Some(parent_event_id) = parent_key.into_event_id() else {
2277 return Err(RoomSendQueueError::StorageError(
2278 RoomSendQueueStorageError::InvalidParentKey,
2279 ));
2280 };
2281
2282 let react_event =
2284 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
2285 let serializable = SerializableEventContent::from_raw(
2286 Raw::new(&react_event)
2287 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2288 react_event.event_type().to_string(),
2289 );
2290
2291 store
2292 .save_send_queue_request(
2293 &self.room_id,
2294 dependent_request.own_transaction_id.into(),
2295 dependent_request.created_at,
2296 serializable.into(),
2297 Self::HIGH_PRIORITY,
2298 )
2299 .await
2300 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2301 } else {
2302 return Ok(false);
2304 }
2305 }
2306
2307 DependentQueuedRequestKind::UploadFileOrThumbnail {
2308 content_type,
2309 cache_key,
2310 related_to,
2311 parent_is_thumbnail_upload,
2312 } => {
2313 let Some(parent_key) = parent_key else {
2314 return Ok(false);
2316 };
2317 self.handle_dependent_file_or_thumbnail_upload(
2318 client,
2319 dependent_request.own_transaction_id.into(),
2320 parent_key,
2321 content_type,
2322 cache_key,
2323 related_to,
2324 parent_is_thumbnail_upload,
2325 )
2326 .await?;
2327 }
2328
2329 DependentQueuedRequestKind::FinishUpload {
2330 local_echo,
2331 file_upload,
2332 thumbnail_info,
2333 } => {
2334 let Some(parent_key) = parent_key else {
2335 return Ok(false);
2337 };
2338 self.handle_dependent_finish_upload(
2339 client,
2340 dependent_request.own_transaction_id.into(),
2341 parent_key,
2342 *local_echo,
2343 file_upload,
2344 thumbnail_info,
2345 new_updates,
2346 )
2347 .await?;
2348 }
2349
2350 #[cfg(feature = "unstable-msc4274")]
2351 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2352 let Some(parent_key) = parent_key else {
2353 return Ok(false);
2355 };
2356 self.handle_dependent_finish_gallery_upload(
2357 client,
2358 dependent_request.own_transaction_id.into(),
2359 parent_key,
2360 *local_echo,
2361 item_infos,
2362 new_updates,
2363 )
2364 .await?;
2365 }
2366 }
2367
2368 Ok(true)
2369 }
2370
2371 #[instrument(skip(self))]
2372 async fn apply_dependent_requests(
2373 &self,
2374 new_updates: &mut Vec<RoomSendQueueUpdate>,
2375 ) -> Result<(), RoomSendQueueError> {
2376 let guard = self.store.lock().await;
2377
2378 let client = guard.client()?;
2379 let store = client.state_store();
2380
2381 let dependent_requests = store
2382 .load_dependent_queued_requests(&self.room_id)
2383 .await
2384 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2385
2386 let num_initial_dependent_requests = dependent_requests.len();
2387 if num_initial_dependent_requests == 0 {
2388 return Ok(());
2390 }
2391
2392 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2393
2394 for original in &dependent_requests {
2396 if !canonicalized_dependent_requests
2397 .iter()
2398 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2399 {
2400 store
2401 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2402 .await
2403 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2404 }
2405 }
2406
2407 let mut num_dependent_requests = canonicalized_dependent_requests.len();
2408
2409 debug!(
2410 num_dependent_requests,
2411 num_initial_dependent_requests, "starting handling of dependent requests"
2412 );
2413
2414 for dependent in canonicalized_dependent_requests {
2415 let dependent_id = dependent.own_transaction_id.clone();
2416
2417 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2418 Ok(should_remove) => {
2419 if should_remove {
2420 store
2422 .remove_dependent_queued_request(&self.room_id, &dependent_id)
2423 .await
2424 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2425
2426 num_dependent_requests -= 1;
2427 }
2428 }
2429
2430 Err(err) => {
2431 warn!("error when applying single dependent request: {err}");
2432 }
2433 }
2434 }
2435
2436 debug!(
2437 leftover_dependent_requests = num_dependent_requests,
2438 "stopped handling dependent request"
2439 );
2440
2441 Ok(())
2442 }
2443
2444 async fn remove_dependent_send_queue_request(
2446 &self,
2447 dependent_event_id: &ChildTransactionId,
2448 ) -> Result<bool, RoomSendQueueStorageError> {
2449 Ok(self
2450 .store
2451 .lock()
2452 .await
2453 .client()?
2454 .state_store()
2455 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2456 .await?)
2457 }
2458}
2459
2460#[cfg(feature = "unstable-msc4274")]
2461struct GalleryItemQueueInfo {
2463 content_type: Mime,
2464 upload_file_txn: OwnedTransactionId,
2465 file_media_request: MediaRequestParameters,
2466 thumbnail: Option<QueueThumbnailInfo>,
2467}
2468
2469#[derive(Clone, Debug)]
2471pub enum LocalEchoContent {
2472 Event {
2474 serialized_event: SerializableEventContent,
2477 send_handle: SendHandle,
2479 send_error: Option<QueueWedgeError>,
2482 },
2483
2484 React {
2486 key: String,
2488 send_handle: SendReactionHandle,
2490 applies_to: OwnedTransactionId,
2492 },
2493
2494 Redaction {
2496 redacts: OwnedEventId,
2498 reason: Option<String>,
2500 send_handle: SendRedactionHandle,
2502 send_error: Option<QueueWedgeError>,
2505 },
2506}
2507
2508#[derive(Clone, Debug)]
2511pub struct LocalEcho {
2512 pub transaction_id: OwnedTransactionId,
2514 pub content: LocalEchoContent,
2516}
2517
2518#[derive(Clone, Debug)]
2521pub enum RoomSendQueueUpdate {
2522 NewLocalEvent(LocalEcho),
2527
2528 CancelledLocalEvent {
2531 transaction_id: OwnedTransactionId,
2533 },
2534
2535 ReplacedLocalEvent {
2537 transaction_id: OwnedTransactionId,
2539
2540 new_content: SerializableEventContent,
2542 },
2543
2544 SendError {
2549 transaction_id: OwnedTransactionId,
2551 error: Arc<crate::Error>,
2553 is_recoverable: bool,
2559 },
2560
2561 RetryEvent {
2563 transaction_id: OwnedTransactionId,
2565 },
2566
2567 SentEvent {
2570 transaction_id: OwnedTransactionId,
2572 event_id: OwnedEventId,
2574 },
2575
2576 MediaUpload {
2579 related_to: OwnedTransactionId,
2581
2582 file: Option<MediaSource>,
2584
2585 index: u64,
2589
2590 progress: AbstractProgress,
2594 },
2595}
2596
2597#[derive(Clone, Debug)]
2602pub struct SendQueueUpdate {
2603 pub room_id: OwnedRoomId,
2605
2606 pub update: RoomSendQueueUpdate,
2608}
2609
2610#[derive(Debug, thiserror::Error)]
2612pub enum RoomSendQueueError {
2613 #[error("the room isn't in the joined state")]
2615 RoomNotJoined,
2616
2617 #[error("the room is now missing from the client")]
2621 RoomDisappeared,
2622
2623 #[error(transparent)]
2625 StorageError(#[from] RoomSendQueueStorageError),
2626
2627 #[error("the attachment event could not be created")]
2629 FailedToCreateAttachment,
2630
2631 #[cfg(feature = "unstable-msc4274")]
2633 #[error("the gallery contains no items")]
2634 EmptyGallery,
2635
2636 #[cfg(feature = "unstable-msc4274")]
2638 #[error("the gallery event could not be created")]
2639 FailedToCreateGallery,
2640}
2641
2642#[derive(Debug, thiserror::Error)]
2644pub enum RoomSendQueueStorageError {
2645 #[error(transparent)]
2647 StateStoreError(#[from] StoreError),
2648
2649 #[error(transparent)]
2651 EventCacheStoreError(#[from] EventCacheStoreError),
2652
2653 #[error(transparent)]
2655 MediaStoreError(#[from] MediaStoreError),
2656
2657 #[error(transparent)]
2659 LockError(#[from] CrossProcessLockError),
2660
2661 #[error(transparent)]
2663 JsonSerialization(#[from] serde_json::Error),
2664
2665 #[error("a dependent event had an invalid parent key type")]
2668 InvalidParentKey,
2669
2670 #[error("The client is shutting down.")]
2672 ClientShuttingDown,
2673
2674 #[error("This operation is not implemented for media uploads")]
2676 OperationNotImplementedYet,
2677
2678 #[error("Can't edit a media caption when the underlying event isn't a media")]
2680 InvalidMediaCaptionEdit,
2681}
2682
2683#[derive(Clone, Debug)]
2685struct MediaHandles {
2686 upload_thumbnail_txn: Option<OwnedTransactionId>,
2690
2691 upload_file_txn: OwnedTransactionId,
2693}
2694
2695#[derive(Clone, Debug)]
2697pub struct SendHandle {
2698 room: RoomSendQueue,
2700
2701 transaction_id: OwnedTransactionId,
2706
2707 media_handles: Vec<MediaHandles>,
2709
2710 pub created_at: MilliSecondsSinceUnixEpoch,
2712}
2713
2714impl SendHandle {
2715 #[cfg(test)]
2717 pub(crate) fn new(
2718 room: RoomSendQueue,
2719 transaction_id: OwnedTransactionId,
2720 created_at: MilliSecondsSinceUnixEpoch,
2721 ) -> Self {
2722 Self { room, transaction_id, media_handles: vec![], created_at }
2723 }
2724
2725 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2726 if !self.media_handles.is_empty() {
2727 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2728 } else {
2729 Ok(())
2730 }
2731 }
2732
2733 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2738 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2739 trace!("received an abort request");
2740
2741 let queue = &self.room.inner.queue;
2742
2743 for handles in &self.media_handles {
2744 if queue.abort_upload(&self.transaction_id, handles).await? {
2745 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2747 transaction_id: self.transaction_id.clone(),
2748 });
2749
2750 return Ok(true);
2751 }
2752
2753 }
2757
2758 if queue.cancel_event(&self.transaction_id).await? {
2759 trace!("successful abort");
2760
2761 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2763 transaction_id: self.transaction_id.clone(),
2764 });
2765
2766 Ok(true)
2767 } else {
2768 debug!("local echo didn't exist anymore, can't abort");
2769 Ok(false)
2770 }
2771 }
2772
2773 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2778 pub async fn edit_raw(
2779 &self,
2780 new_content: Raw<AnyMessageLikeEventContent>,
2781 event_type: String,
2782 ) -> Result<bool, RoomSendQueueStorageError> {
2783 trace!("received an edit request");
2784 self.nyi_for_uploads()?;
2785
2786 let serializable = SerializableEventContent::from_raw(new_content, event_type);
2787
2788 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2789 trace!("successful edit");
2790
2791 self.room.inner.notifier.notify_one();
2793
2794 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2796 transaction_id: self.transaction_id.clone(),
2797 new_content: serializable,
2798 });
2799
2800 Ok(true)
2801 } else {
2802 debug!("local echo doesn't exist anymore, can't edit");
2803 Ok(false)
2804 }
2805 }
2806
2807 pub async fn edit(
2812 &self,
2813 new_content: AnyMessageLikeEventContent,
2814 ) -> Result<bool, RoomSendQueueStorageError> {
2815 self.edit_raw(
2816 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2817 new_content.event_type().to_string(),
2818 )
2819 .await
2820 }
2821
2822 pub async fn edit_media_caption(
2827 &self,
2828 caption: Option<String>,
2829 formatted_caption: Option<FormattedBody>,
2830 mentions: Option<Mentions>,
2831 ) -> Result<bool, RoomSendQueueStorageError> {
2832 if let Some(new_content) = self
2833 .room
2834 .inner
2835 .queue
2836 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2837 .await?
2838 {
2839 trace!("successful edit of media caption");
2840
2841 self.room.inner.notifier.notify_one();
2843
2844 let new_content = SerializableEventContent::new(&new_content)
2845 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2846
2847 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2849 transaction_id: self.transaction_id.clone(),
2850 new_content,
2851 });
2852
2853 Ok(true)
2854 } else {
2855 debug!("local echo doesn't exist anymore, can't edit media caption");
2856 Ok(false)
2857 }
2858 }
2859
2860 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2863 let room = &self.room.inner;
2864 room.queue
2865 .mark_as_unwedged(&self.transaction_id)
2866 .await
2867 .map_err(RoomSendQueueError::StorageError)?;
2868
2869 for handles in &self.media_handles {
2877 room.queue
2878 .mark_as_unwedged(&handles.upload_file_txn)
2879 .await
2880 .map_err(RoomSendQueueError::StorageError)?;
2881
2882 if let Some(txn) = &handles.upload_thumbnail_txn {
2883 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2884 }
2885 }
2886
2887 room.notifier.notify_one();
2889
2890 self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2891 transaction_id: self.transaction_id.clone(),
2892 });
2893
2894 Ok(())
2895 }
2896
2897 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2902 pub async fn react(
2903 &self,
2904 key: String,
2905 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2906 trace!("received an intent to react");
2907
2908 let created_at = MilliSecondsSinceUnixEpoch::now();
2909 if let Some(reaction_txn_id) =
2910 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2911 {
2912 trace!("successfully queued react");
2913
2914 self.room.inner.notifier.notify_one();
2916
2917 let send_handle = SendReactionHandle {
2919 room: self.room.clone(),
2920 transaction_id: reaction_txn_id.clone(),
2921 };
2922
2923 self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2924 transaction_id: reaction_txn_id.into(),
2927 content: LocalEchoContent::React {
2928 key,
2929 send_handle: send_handle.clone(),
2930 applies_to: self.transaction_id.clone(),
2931 },
2932 }));
2933
2934 Ok(Some(send_handle))
2935 } else {
2936 debug!("local echo doesn't exist anymore, can't react");
2937 Ok(None)
2938 }
2939 }
2940}
2941
2942#[derive(Clone, Debug)]
2944pub struct SendReactionHandle {
2945 room: RoomSendQueue,
2947 transaction_id: ChildTransactionId,
2949}
2950
2951impl SendReactionHandle {
2952 #[cfg(test)]
2954 pub(crate) fn new(room: RoomSendQueue, transaction_id: ChildTransactionId) -> Self {
2955 Self { room, transaction_id }
2956 }
2957
2958 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2963 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2964 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2968 transaction_id: self.transaction_id.clone().into(),
2969 });
2970
2971 return Ok(true);
2972 }
2973
2974 let handle = SendHandle {
2977 room: self.room.clone(),
2978 transaction_id: self.transaction_id.clone().into(),
2979 media_handles: vec![],
2980 created_at: MilliSecondsSinceUnixEpoch::now(),
2981 };
2982
2983 handle.abort().await
2984 }
2985
2986 pub fn transaction_id(&self) -> &TransactionId {
2988 &self.transaction_id
2989 }
2990}
2991
2992#[derive(Clone, Debug)]
2995pub struct SendRedactionHandle {
2996 room: RoomSendQueue,
2998
2999 transaction_id: OwnedTransactionId,
3001}
3002
3003impl SendRedactionHandle {
3004 #[cfg(test)]
3006 pub(crate) fn new(room: RoomSendQueue, transaction_id: OwnedTransactionId) -> Self {
3007 Self { room, transaction_id }
3008 }
3009
3010 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
3015 trace!("received a redaction abort request");
3016
3017 let queue = &self.room.inner.queue;
3018
3019 if queue.cancel_event(&self.transaction_id).await? {
3020 trace!("successful redaction abort");
3021
3022 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
3024 transaction_id: self.transaction_id.clone(),
3025 });
3026
3027 Ok(true)
3028 } else {
3029 debug!("local echo of redaction didn't exist anymore, can't abort");
3030 Ok(false)
3031 }
3032 }
3033}
3034
3035fn canonicalize_dependent_requests(
3039 dependent: &[DependentQueuedRequest],
3040) -> Vec<DependentQueuedRequest> {
3041 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
3042
3043 for d in dependent {
3044 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
3045
3046 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
3047 continue;
3050 }
3051
3052 match &d.kind {
3053 DependentQueuedRequestKind::EditEvent { .. } => {
3054 if let Some(prev_edit) = prevs
3056 .iter_mut()
3057 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
3058 {
3059 *prev_edit = d;
3060 } else {
3061 prevs.insert(0, d);
3062 }
3063 }
3064
3065 DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
3066 | DependentQueuedRequestKind::FinishUpload { .. }
3067 | DependentQueuedRequestKind::ReactEvent { .. } => {
3068 prevs.push(d);
3070 }
3071
3072 #[cfg(feature = "unstable-msc4274")]
3073 DependentQueuedRequestKind::FinishGallery { .. } => {
3074 prevs.push(d);
3076 }
3077
3078 DependentQueuedRequestKind::RedactEvent => {
3079 prevs.clear();
3081 prevs.push(d);
3082 }
3083 }
3084 }
3085
3086 by_txn.into_values().flat_map(|entries| entries.into_iter().cloned()).collect()
3087}
3088
3089#[cfg(all(test, not(target_family = "wasm")))]
3090mod tests {
3091 use std::{sync::Arc, time::Duration};
3092
3093 use assert_matches2::{assert_let, assert_matches};
3094 use matrix_sdk_base::store::{
3095 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
3096 SerializableEventContent,
3097 };
3098 use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
3099 use ruma::{
3100 MilliSecondsSinceUnixEpoch, TransactionId,
3101 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
3102 room_id,
3103 };
3104
3105 use super::canonicalize_dependent_requests;
3106 use crate::{client::WeakClient, test_utils::logged_in_client};
3107
3108 #[test]
3109 fn test_canonicalize_dependent_events_created_at() {
3110 let txn = TransactionId::new();
3113 let created_at = MilliSecondsSinceUnixEpoch::now();
3114
3115 let edit = DependentQueuedRequest {
3116 own_transaction_id: ChildTransactionId::new(),
3117 parent_transaction_id: txn.clone(),
3118 kind: DependentQueuedRequestKind::EditEvent {
3119 new_content: SerializableEventContent::new(
3120 &RoomMessageEventContent::text_plain("edit").into(),
3121 )
3122 .unwrap(),
3123 },
3124 parent_key: None,
3125 created_at,
3126 };
3127
3128 let res = canonicalize_dependent_requests(&[edit]);
3129
3130 assert_eq!(res.len(), 1);
3131 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
3132 assert_let!(
3133 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
3134 );
3135 assert_eq!(msg.body(), "edit");
3136 assert_eq!(res[0].parent_transaction_id, txn);
3137 assert_eq!(res[0].created_at, created_at);
3138 }
3139
3140 #[async_test]
3141 async fn test_client_no_cycle_with_send_queue() {
3142 for enabled in [true, false] {
3143 let client = logged_in_client(None).await;
3144 let weak_client = WeakClient::from_client(&client);
3145
3146 {
3147 let mut sync_response_builder = SyncResponseBuilder::new();
3148
3149 let room_id = room_id!("!a:b.c");
3150
3151 client
3153 .base_client()
3154 .receive_sync_response(
3155 sync_response_builder
3156 .add_joined_room(JoinedRoomBuilder::new(room_id))
3157 .build_sync_response(),
3158 )
3159 .await
3160 .unwrap();
3161
3162 let room = client.get_room(room_id).unwrap();
3163 let q = room.send_queue();
3164
3165 let _watcher = q.subscribe().await;
3166
3167 client.send_queue().set_enabled(enabled).await;
3168 }
3169
3170 drop(client);
3171
3172 tokio::time::sleep(Duration::from_millis(500)).await;
3174
3175 let client = weak_client.get();
3177 assert!(
3178 client.is_none(),
3179 "too many strong references to the client: {}",
3180 Arc::strong_count(&client.unwrap().inner)
3181 );
3182 }
3183 }
3184
3185 #[test]
3186 fn test_canonicalize_dependent_events_smoke_test() {
3187 let txn = TransactionId::new();
3189
3190 let edit = DependentQueuedRequest {
3191 own_transaction_id: ChildTransactionId::new(),
3192 parent_transaction_id: txn.clone(),
3193 kind: DependentQueuedRequestKind::EditEvent {
3194 new_content: SerializableEventContent::new(
3195 &RoomMessageEventContent::text_plain("edit").into(),
3196 )
3197 .unwrap(),
3198 },
3199 parent_key: None,
3200 created_at: MilliSecondsSinceUnixEpoch::now(),
3201 };
3202 let res = canonicalize_dependent_requests(&[edit]);
3203
3204 assert_eq!(res.len(), 1);
3205 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
3206 assert_eq!(res[0].parent_transaction_id, txn);
3207 assert!(res[0].parent_key.is_none());
3208 }
3209
3210 #[test]
3211 fn test_canonicalize_dependent_events_redaction_preferred() {
3212 let txn = TransactionId::new();
3214
3215 let mut inputs = Vec::with_capacity(100);
3216 let redact = DependentQueuedRequest {
3217 own_transaction_id: ChildTransactionId::new(),
3218 parent_transaction_id: txn.clone(),
3219 kind: DependentQueuedRequestKind::RedactEvent,
3220 parent_key: None,
3221 created_at: MilliSecondsSinceUnixEpoch::now(),
3222 };
3223
3224 let edit = DependentQueuedRequest {
3225 own_transaction_id: ChildTransactionId::new(),
3226 parent_transaction_id: txn.clone(),
3227 kind: DependentQueuedRequestKind::EditEvent {
3228 new_content: SerializableEventContent::new(
3229 &RoomMessageEventContent::text_plain("edit").into(),
3230 )
3231 .unwrap(),
3232 },
3233 parent_key: None,
3234 created_at: MilliSecondsSinceUnixEpoch::now(),
3235 };
3236
3237 inputs.push({
3238 let mut edit = edit.clone();
3239 edit.own_transaction_id = ChildTransactionId::new();
3240 edit
3241 });
3242
3243 inputs.push(redact);
3244
3245 for _ in 0..98 {
3246 let mut edit = edit.clone();
3247 edit.own_transaction_id = ChildTransactionId::new();
3248 inputs.push(edit);
3249 }
3250
3251 let res = canonicalize_dependent_requests(&inputs);
3252
3253 assert_eq!(res.len(), 1);
3254 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
3255 assert_eq!(res[0].parent_transaction_id, txn);
3256 }
3257
3258 #[test]
3259 fn test_canonicalize_dependent_events_last_edit_preferred() {
3260 let parent_txn = TransactionId::new();
3261
3262 let inputs = (0..10)
3264 .map(|i| DependentQueuedRequest {
3265 own_transaction_id: ChildTransactionId::new(),
3266 parent_transaction_id: parent_txn.clone(),
3267 kind: DependentQueuedRequestKind::EditEvent {
3268 new_content: SerializableEventContent::new(
3269 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
3270 )
3271 .unwrap(),
3272 },
3273 parent_key: None,
3274 created_at: MilliSecondsSinceUnixEpoch::now(),
3275 })
3276 .collect::<Vec<_>>();
3277
3278 let txn = inputs[9].parent_transaction_id.clone();
3279
3280 let res = canonicalize_dependent_requests(&inputs);
3281
3282 assert_eq!(res.len(), 1);
3283 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
3284 assert_let!(
3285 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
3286 );
3287 assert_eq!(msg.body(), "edit9");
3288 assert_eq!(res[0].parent_transaction_id, txn);
3289 }
3290
3291 #[test]
3292 fn test_canonicalize_multiple_local_echoes() {
3293 let txn1 = TransactionId::new();
3294 let txn2 = TransactionId::new();
3295
3296 let child1 = ChildTransactionId::new();
3297 let child2 = ChildTransactionId::new();
3298
3299 let inputs = vec![
3300 DependentQueuedRequest {
3302 own_transaction_id: child1.clone(),
3303 kind: DependentQueuedRequestKind::RedactEvent,
3304 parent_transaction_id: txn1.clone(),
3305 parent_key: None,
3306 created_at: MilliSecondsSinceUnixEpoch::now(),
3307 },
3308 DependentQueuedRequest {
3310 own_transaction_id: child2,
3311 kind: DependentQueuedRequestKind::EditEvent {
3312 new_content: SerializableEventContent::new(
3313 &RoomMessageEventContent::text_plain("edit").into(),
3314 )
3315 .unwrap(),
3316 },
3317 parent_transaction_id: txn2.clone(),
3318 parent_key: None,
3319 created_at: MilliSecondsSinceUnixEpoch::now(),
3320 },
3321 ];
3322
3323 let res = canonicalize_dependent_requests(&inputs);
3324
3325 assert_eq!(res.len(), 2);
3327
3328 for dependent in res {
3329 if dependent.own_transaction_id == child1 {
3330 assert_eq!(dependent.parent_transaction_id, txn1);
3331 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
3332 } else {
3333 assert_eq!(dependent.parent_transaction_id, txn2);
3334 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
3335 }
3336 }
3337 }
3338
3339 #[test]
3340 fn test_canonicalize_reactions_after_edits() {
3341 let txn = TransactionId::new();
3343
3344 let react_id = ChildTransactionId::new();
3345 let react = DependentQueuedRequest {
3346 own_transaction_id: react_id.clone(),
3347 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
3348 parent_transaction_id: txn.clone(),
3349 parent_key: None,
3350 created_at: MilliSecondsSinceUnixEpoch::now(),
3351 };
3352
3353 let edit_id = ChildTransactionId::new();
3354 let edit = DependentQueuedRequest {
3355 own_transaction_id: edit_id.clone(),
3356 kind: DependentQueuedRequestKind::EditEvent {
3357 new_content: SerializableEventContent::new(
3358 &RoomMessageEventContent::text_plain("edit").into(),
3359 )
3360 .unwrap(),
3361 },
3362 parent_transaction_id: txn,
3363 parent_key: None,
3364 created_at: MilliSecondsSinceUnixEpoch::now(),
3365 };
3366
3367 let res = canonicalize_dependent_requests(&[react, edit]);
3368
3369 assert_eq!(res.len(), 2);
3370 assert_eq!(res[0].own_transaction_id, edit_id);
3371 assert_eq!(res[1].own_transaction_id, react_id);
3372 }
3373}