1use std::{
132 collections::{BTreeMap, HashMap},
133 str::FromStr as _,
134 sync::{
135 Arc, RwLock,
136 atomic::{AtomicBool, Ordering},
137 },
138};
139
140use eyeball::SharedObservable;
141#[cfg(feature = "e2e-encryption")]
142use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
143#[cfg(feature = "unstable-msc4274")]
144use matrix_sdk_base::store::FinishGalleryItemInfo;
145use matrix_sdk_base::{
146 RoomState, StoreError,
147 cross_process_lock::CrossProcessLockError,
148 deserialized_responses::{EncryptionInfo, TimelineEvent},
149 event_cache::store::EventCacheStoreError,
150 media::{MediaRequestParameters, store::MediaStoreError},
151 store::{
152 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
153 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
154 SentMediaInfo, SentRequestKey, SerializableEventContent,
155 },
156 task_monitor::BackgroundTaskHandle,
157};
158use matrix_sdk_common::locks::Mutex as SyncMutex;
159use mime::Mime;
160use ruma::{
161 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
162 TransactionId,
163 events::{
164 AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
165 reaction::ReactionEventContent,
166 relation::Annotation,
167 room::{
168 MediaSource,
169 message::{FormattedBody, RoomMessageEventContent},
170 },
171 },
172 serde::Raw,
173};
174use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
175use tracing::{debug, error, info, instrument, trace, warn};
176
177use crate::{
178 Client, Media, Room, TransmissionProgress,
179 client::WeakClient,
180 config::RequestConfig,
181 error::RetryKind,
182 room::{WeakRoom, edit::EditedContent},
183};
184
185mod progress;
186mod upload;
187
188pub use progress::AbstractProgress;
189
190pub struct SendQueue {
192 client: Client,
193}
194
195#[cfg(not(tarpaulin_include))]
196impl std::fmt::Debug for SendQueue {
197 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198 f.debug_struct("SendQueue").finish_non_exhaustive()
199 }
200}
201
202impl SendQueue {
203 pub(super) fn new(client: Client) -> Self {
204 Self { client }
205 }
206
207 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
210 if !self.is_enabled() {
211 return;
212 }
213
214 let room_ids =
215 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
216 |err| {
217 warn!("error when loading rooms with unsent requests: {err}");
218 Vec::new()
219 },
220 );
221
222 for room_id in room_ids {
224 if let Some(room) = self.client.get_room(&room_id) {
225 let _ = self.for_room(room);
226 }
227 }
228 }
229
230 #[inline(always)]
232 fn data(&self) -> &SendQueueData {
233 &self.client.inner.send_queue_data
234 }
235
236 pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
239 let data = self.data();
240
241 let mut map = data.rooms.write().unwrap();
242
243 let room_id = room.room_id();
244 if let Some(room_q) = map.get(room_id).cloned() {
245 return room_q;
246 }
247
248 let owned_room_id = room_id.to_owned();
249 let room_q = RoomSendQueue::new(
250 self.is_enabled(),
251 data.global_update_sender.clone(),
252 data.error_sender.clone(),
253 data.is_dropping.clone(),
254 &self.client,
255 owned_room_id.clone(),
256 data.report_media_upload_progress.clone(),
257 );
258
259 map.insert(owned_room_id, room_q.clone());
260
261 room_q
262 }
263
264 pub async fn set_enabled(&self, enabled: bool) {
274 debug!(?enabled, "setting global send queue enablement");
275
276 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
277
278 for room in self.data().rooms.read().unwrap().values() {
280 room.set_enabled(enabled);
281 }
282
283 self.respawn_tasks_for_rooms_with_unsent_requests().await;
286 }
287
288 pub fn is_enabled(&self) -> bool {
291 self.data().globally_enabled.load(Ordering::SeqCst)
292 }
293
294 pub fn enable_upload_progress(&self, enabled: bool) {
296 self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
297 }
298
299 pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
304 self.data().global_update_sender.subscribe()
305 }
306
307 pub async fn local_echoes(
309 &self,
310 ) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
311 let room_ids =
312 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
313 |err| {
314 warn!("error when loading rooms with unsent requests: {err}");
315 Vec::new()
316 },
317 );
318
319 let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
320
321 for room_id in room_ids {
322 if let Some(room) = self.client.get_room(&room_id) {
323 let queue = self.for_room(room);
324 local_echoes
325 .insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
326 }
327 }
328
329 Ok(local_echoes)
330 }
331
332 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
335 self.data().error_sender.subscribe()
336 }
337}
338
339#[derive(Clone, Debug)]
342struct QueueThumbnailInfo {
343 finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
345
346 media_request_parameters: MediaRequestParameters,
348
349 content_type: Mime,
351
352 file_size: usize,
354}
355
356#[derive(Clone, Debug)]
358pub struct SendQueueRoomError {
359 pub room_id: OwnedRoomId,
361
362 pub error: Arc<crate::Error>,
364
365 pub is_recoverable: bool,
371}
372
373impl Client {
374 pub fn send_queue(&self) -> SendQueue {
377 SendQueue::new(self.clone())
378 }
379}
380
381pub(super) struct SendQueueData {
382 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
384
385 globally_enabled: AtomicBool,
390
391 global_update_sender: broadcast::Sender<SendQueueUpdate>,
395
396 error_sender: broadcast::Sender<SendQueueRoomError>,
398
399 is_dropping: Arc<AtomicBool>,
401
402 report_media_upload_progress: Arc<AtomicBool>,
404}
405
406impl SendQueueData {
407 pub fn new(globally_enabled: bool) -> Self {
409 let (global_update_sender, _) = broadcast::channel(32);
410 let (error_sender, _) = broadcast::channel(32);
411
412 Self {
413 rooms: Default::default(),
414 globally_enabled: AtomicBool::new(globally_enabled),
415 global_update_sender,
416 error_sender,
417 is_dropping: Arc::new(false.into()),
418 report_media_upload_progress: Arc::new(false.into()),
419 }
420 }
421}
422
423impl Drop for SendQueueData {
424 fn drop(&mut self) {
425 debug!("globally dropping the send queue");
428 self.is_dropping.store(true, Ordering::SeqCst);
429
430 let rooms = self.rooms.read().unwrap();
431 for room in rooms.values() {
432 room.inner.notifier.notify_one();
433 }
434 }
435}
436
437impl Room {
438 pub fn send_queue(&self) -> RoomSendQueue {
440 self.client.send_queue().for_room(self.clone())
441 }
442}
443
444#[derive(Clone)]
448pub struct RoomSendQueue {
449 inner: Arc<RoomSendQueueInner>,
450}
451
452#[cfg(not(tarpaulin_include))]
453impl std::fmt::Debug for RoomSendQueue {
454 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
456 }
457}
458
459impl RoomSendQueue {
460 fn new(
461 globally_enabled: bool,
462 global_update_sender: broadcast::Sender<SendQueueUpdate>,
463 global_error_sender: broadcast::Sender<SendQueueRoomError>,
464 is_dropping: Arc<AtomicBool>,
465 client: &Client,
466 room_id: OwnedRoomId,
467 report_media_upload_progress: Arc<AtomicBool>,
468 ) -> Self {
469 let (update_sender, _) = broadcast::channel(32);
470
471 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
472 let notifier = Arc::new(Notify::new());
473
474 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
475 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
476
477 let task = client.task_monitor().spawn_background_task(
478 "send_queue",
479 Self::sending_task(
480 weak_room.clone(),
481 queue.clone(),
482 notifier.clone(),
483 global_update_sender.clone(),
484 update_sender.clone(),
485 locally_enabled.clone(),
486 global_error_sender,
487 is_dropping,
488 report_media_upload_progress,
489 ),
490 );
491
492 Self {
493 inner: Arc::new(RoomSendQueueInner {
494 room: weak_room,
495 global_update_sender,
496 update_sender,
497 _task: task,
498 queue,
499 notifier,
500 locally_enabled,
501 }),
502 }
503 }
504
505 pub async fn send_raw(
520 &self,
521 content: Raw<AnyMessageLikeEventContent>,
522 event_type: String,
523 ) -> Result<SendHandle, RoomSendQueueError> {
524 let Some(room) = self.inner.room.get() else {
525 return Err(RoomSendQueueError::RoomDisappeared);
526 };
527 if room.state() != RoomState::Joined {
528 return Err(RoomSendQueueError::RoomNotJoined);
529 }
530
531 let content = SerializableEventContent::from_raw(content, event_type);
532
533 let created_at = MilliSecondsSinceUnixEpoch::now();
534 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
535 trace!(%transaction_id, "manager sends a raw event to the background task");
536
537 self.inner.notifier.notify_one();
538
539 let send_handle = SendHandle {
540 room: self.clone(),
541 transaction_id: transaction_id.clone(),
542 media_handles: vec![],
543 created_at,
544 };
545
546 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
547 transaction_id,
548 content: LocalEchoContent::Event {
549 serialized_event: content,
550 send_handle: send_handle.clone(),
551 send_error: None,
552 },
553 }));
554
555 Ok(send_handle)
556 }
557
558 pub async fn send(
573 &self,
574 content: AnyMessageLikeEventContent,
575 ) -> Result<SendHandle, RoomSendQueueError> {
576 self.send_raw(
577 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
578 content.event_type().to_string(),
579 )
580 .await
581 }
582
583 pub async fn subscribe(
589 &self,
590 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
591 {
592 let local_echoes = self.inner.queue.local_echoes(self).await?;
593
594 Ok((local_echoes, self.inner.update_sender.subscribe()))
595 }
596
597 #[allow(clippy::too_many_arguments)]
603 #[instrument(skip_all, fields(room_id = %room.room_id()))]
604 async fn sending_task(
605 room: WeakRoom,
606 queue: QueueStorage,
607 notifier: Arc<Notify>,
608 global_update_sender: broadcast::Sender<SendQueueUpdate>,
609 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
610 locally_enabled: Arc<AtomicBool>,
611 global_error_sender: broadcast::Sender<SendQueueRoomError>,
612 is_dropping: Arc<AtomicBool>,
613 report_media_upload_progress: Arc<AtomicBool>,
614 ) {
615 trace!("spawned the sending task");
616
617 let room_id = room.room_id();
618
619 loop {
620 if is_dropping.load(Ordering::SeqCst) {
622 trace!("shutting down!");
623 break;
624 }
625
626 let mut new_updates = Vec::new();
629 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
630 warn!("errors when applying dependent requests: {err}");
631 }
632
633 for up in new_updates {
634 send_update(&global_update_sender, &update_sender, room_id, up);
635 }
636
637 if !locally_enabled.load(Ordering::SeqCst) {
638 trace!("not enabled, sleeping");
639 notifier.notified().await;
641 continue;
642 }
643
644 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
645 Ok(Some(request)) => request,
646
647 Ok(None) => {
648 trace!("queue is empty, sleeping");
649 notifier.notified().await;
651 continue;
652 }
653
654 Err(err) => {
655 warn!("error when loading next request to send: {err}");
656 continue;
657 }
658 };
659
660 let txn_id = queued_request.transaction_id.clone();
661 trace!(txn_id = %txn_id, "received a request to send!");
662
663 let Some(room) = room.get() else {
664 if is_dropping.load(Ordering::SeqCst) {
665 break;
666 }
667 error!("the weak room couldn't be upgraded but we're not shutting down?");
668 continue;
669 };
670
671 let (related_txn_id, media_upload_progress_info, http_progress) =
676 if let QueuedRequestKind::MediaUpload {
677 cache_key,
678 thumbnail_source,
679 #[cfg(feature = "unstable-msc4274")]
680 accumulated,
681 related_to,
682 ..
683 } = &queued_request.kind
684 {
685 let (media_upload_progress_info, http_progress) =
688 if report_media_upload_progress.load(Ordering::SeqCst) {
689 let media_upload_progress_info =
690 RoomSendQueue::create_media_upload_progress_info(
691 &queued_request.transaction_id,
692 related_to,
693 cache_key,
694 thumbnail_source.as_ref(),
695 #[cfg(feature = "unstable-msc4274")]
696 accumulated,
697 &room,
698 &queue,
699 )
700 .await;
701
702 let progress = RoomSendQueue::create_media_upload_progress_observable(
703 &media_upload_progress_info,
704 related_to,
705 &update_sender,
706 );
707
708 (Some(media_upload_progress_info), Some(progress))
709 } else {
710 Default::default()
711 };
712
713 (Some(related_to.clone()), media_upload_progress_info, http_progress)
714 } else {
715 Default::default()
716 };
717
718 match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
719 {
720 Ok((Some(parent_key), encryption_info)) => match queue
721 .mark_as_sent(&txn_id, parent_key.clone())
722 .await
723 {
724 Ok(()) => match parent_key {
725 SentRequestKey::Event { event_id, event, event_type } => {
726 send_update(
727 &global_update_sender,
728 &update_sender,
729 room_id,
730 RoomSendQueueUpdate::SentEvent {
731 transaction_id: txn_id,
732 event_id: event_id.clone(),
733 },
734 );
735
736 if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
756 {
757 let timeline_event = match Raw::from_json_string(
758 format!(
760 "{{\
761 \"event_id\":\"{event_id}\",\
762 \"origin_server_ts\":{ts},\
763 \"sender\":\"{sender}\",\
764 \"type\":\"{type}\",\
765 \"content\":{content}\
766 }}",
767 event_id = event_id,
768 ts = MilliSecondsSinceUnixEpoch::now().get(),
769 sender = room.client().user_id().expect("Client must be logged-in"),
770 type = event_type,
771 content = event.into_json(),
772 ),
773 ) {
774 Ok(event) => match encryption_info {
775 #[cfg(feature = "e2e-encryption")]
776 Some(encryption_info) => {
777 use matrix_sdk_base::deserialized_responses::DecryptedRoomEvent;
778 let decrypted_event = DecryptedRoomEvent {
779 event: event.cast_unchecked(),
780 encryption_info: Arc::new(encryption_info),
781 unsigned_encryption_info: None,
782 };
783 Some(TimelineEvent::from_decrypted(
784 decrypted_event,
785 None,
786 ))
787 }
788 _ => Some(TimelineEvent::from_plaintext(event)),
789 },
790 Err(err) => {
791 error!(
792 ?err,
793 "Failed to build the (sync) event before the saving in the Event Cache"
794 );
795 None
796 }
797 };
798
799 if let Some(timeline_event) = timeline_event
803 && let Err(err) = room_event_cache
804 .insert_sent_event_from_send_queue(timeline_event)
805 .await
806 {
807 error!(
808 ?err,
809 "Failed to save the sent event in the Event Cache"
810 );
811 }
812 } else {
813 info!(
814 "Cannot insert the sent event in the Event Cache because \
815 either the room no longer exists, or the Room Event Cache cannot be retrieved"
816 );
817 }
818 }
819
820 SentRequestKey::Media(sent_media_info) => {
821 let index =
824 media_upload_progress_info.as_ref().map_or(0, |info| info.index);
825 let progress = media_upload_progress_info
826 .as_ref()
827 .map(|info| {
828 AbstractProgress { current: info.bytes, total: info.bytes }
829 + info.offsets
830 })
831 .unwrap_or(AbstractProgress { current: 1, total: 1 });
832
833 let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
836 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
837 file: Some(sent_media_info.file),
838 index,
839 progress,
840 });
841 }
842 },
843
844 Err(err) => {
845 warn!("unable to mark queued request as sent: {err}");
846 }
847 },
848
849 Ok((None, _)) => {
850 debug!("Request has been aborted while running, continuing.");
851 }
852
853 Err(err) => {
854 let is_recoverable = match err {
855 crate::Error::Http(ref http_err) => {
856 matches!(
858 http_err.retry_kind(),
859 RetryKind::Transient { .. } | RetryKind::NetworkFailure
860 )
861 }
862
863 crate::Error::ConcurrentRequestFailed => true,
868
869 _ => false,
871 };
872
873 locally_enabled.store(false, Ordering::SeqCst);
875
876 if is_recoverable {
877 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
878
879 queue.mark_as_not_being_sent(&txn_id).await;
882
883 } else {
889 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
890
891 if let Err(storage_error) =
893 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
894 {
895 warn!("unable to mark request as wedged: {storage_error}");
896 }
897 }
898
899 let error = Arc::new(err);
900
901 let _ = global_error_sender.send(SendQueueRoomError {
902 room_id: room_id.to_owned(),
903 error: error.clone(),
904 is_recoverable,
905 });
906
907 send_update(
908 &global_update_sender,
909 &update_sender,
910 room_id,
911 RoomSendQueueUpdate::SendError {
912 transaction_id: related_txn_id.unwrap_or(txn_id),
913 error,
914 is_recoverable,
915 },
916 );
917 }
918 }
919 }
920
921 info!("exited sending task");
922 }
923
924 async fn handle_request(
928 room: &Room,
929 request: QueuedRequest,
930 cancel_upload_rx: Option<oneshot::Receiver<()>>,
931 progress: Option<SharedObservable<TransmissionProgress>>,
932 ) -> Result<(Option<SentRequestKey>, Option<EncryptionInfo>), crate::Error> {
933 match request.kind {
934 QueuedRequestKind::Event { content } => {
935 let (event, event_type) = content.into_raw();
936
937 let result = room
938 .send_raw(&event_type, &event)
939 .with_transaction_id(&request.transaction_id)
940 .with_request_config(RequestConfig::short_retry())
941 .await?;
942
943 trace!(txn_id = %request.transaction_id, event_id = %result.response.event_id, "event successfully sent");
944
945 Ok((
946 Some(SentRequestKey::Event {
947 event_id: result.response.event_id,
948 event,
949 event_type,
950 }),
951 result.encryption_info,
952 ))
953 }
954
955 QueuedRequestKind::MediaUpload {
956 content_type,
957 cache_key,
958 thumbnail_source,
959 related_to: relates_to,
960 #[cfg(feature = "unstable-msc4274")]
961 accumulated,
962 } => {
963 trace!(%relates_to, "uploading media related to event");
964
965 let fut = async move {
966 let data = room
967 .client()
968 .media_store()
969 .lock()
970 .await?
971 .get_media_content(&cache_key)
972 .await?
973 .ok_or(crate::Error::SendQueueWedgeError(Box::new(
974 QueueWedgeError::MissingMediaContent,
975 )))?;
976
977 let mime = Mime::from_str(&content_type).map_err(|_| {
978 crate::Error::SendQueueWedgeError(Box::new(
979 QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
980 ))
981 })?;
982
983 #[cfg(feature = "e2e-encryption")]
984 let media_source = if room.latest_encryption_state().await?.is_encrypted() {
985 trace!("upload will be encrypted (encrypted room)");
986
987 let mut cursor = std::io::Cursor::new(data);
988 let mut req = room
989 .client
990 .upload_encrypted_file(&mut cursor)
991 .with_request_config(RequestConfig::short_retry());
992 if let Some(progress) = progress {
993 req = req.with_send_progress_observable(progress);
994 }
995 let encrypted_file = req.await?;
996
997 MediaSource::Encrypted(Box::new(encrypted_file))
998 } else {
999 trace!("upload will be in clear text (room without encryption)");
1000
1001 let request_config = RequestConfig::short_retry()
1002 .timeout(Media::reasonable_upload_timeout(&data));
1003 let mut req =
1004 room.client().media().upload(&mime, data, Some(request_config));
1005 if let Some(progress) = progress {
1006 req = req.with_send_progress_observable(progress);
1007 }
1008 let res = req.await?;
1009
1010 MediaSource::Plain(res.content_uri)
1011 };
1012
1013 #[cfg(not(feature = "e2e-encryption"))]
1014 let media_source = {
1015 let request_config = RequestConfig::short_retry()
1016 .timeout(Media::reasonable_upload_timeout(&data));
1017 let mut req =
1018 room.client().media().upload(&mime, data, Some(request_config));
1019 if let Some(progress) = progress {
1020 req = req.with_send_progress_observable(progress);
1021 }
1022 let res = req.await?;
1023 MediaSource::Plain(res.content_uri)
1024 };
1025
1026 let uri = match &media_source {
1027 MediaSource::Plain(uri) => uri,
1028 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
1029 };
1030 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
1031
1032 Ok((
1033 Some(SentRequestKey::Media(SentMediaInfo {
1034 file: media_source,
1035 thumbnail: thumbnail_source,
1036 #[cfg(feature = "unstable-msc4274")]
1037 accumulated,
1038 })),
1039 None,
1040 ))
1041 };
1042
1043 let wait_for_cancel = async move {
1044 if let Some(rx) = cancel_upload_rx {
1045 rx.await
1046 } else {
1047 std::future::pending().await
1048 }
1049 };
1050
1051 tokio::select! {
1052 biased;
1053
1054 _ = wait_for_cancel => {
1055 Ok((None, None))
1056 }
1057
1058 res = fut => {
1059 res
1060 }
1061 }
1062 }
1063 }
1064 }
1065
1066 pub fn is_enabled(&self) -> bool {
1068 self.inner.locally_enabled.load(Ordering::SeqCst)
1069 }
1070
1071 pub fn set_enabled(&self, enabled: bool) {
1073 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
1074
1075 if enabled {
1078 self.inner.notifier.notify_one();
1079 }
1080 }
1081
1082 fn send_update(&self, update: RoomSendQueueUpdate) {
1086 let _ = self.inner.update_sender.send(update.clone());
1087 let _ = self
1088 .inner
1089 .global_update_sender
1090 .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
1091 }
1092}
1093
1094fn send_update(
1095 global_update_sender: &broadcast::Sender<SendQueueUpdate>,
1096 update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
1097 room_id: &RoomId,
1098 update: RoomSendQueueUpdate,
1099) {
1100 let _ = update_sender.send(update.clone());
1101 let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
1102}
1103
1104impl From<&crate::Error> for QueueWedgeError {
1105 fn from(value: &crate::Error) -> Self {
1106 match value {
1107 #[cfg(feature = "e2e-encryption")]
1108 crate::Error::OlmError(error) => match &**error {
1109 OlmError::SessionRecipientCollectionError(error) => match error {
1110 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
1111 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
1112 }
1113
1114 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
1115 QueueWedgeError::IdentityViolations { users: users.clone() }
1116 }
1117
1118 SessionRecipientCollectionError::CrossSigningNotSetup
1119 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
1120 QueueWedgeError::CrossVerificationRequired
1121 }
1122 },
1123 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1124 },
1125
1126 crate::Error::SendQueueWedgeError(error) => *error.clone(),
1128
1129 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1130 }
1131 }
1132}
1133
1134struct RoomSendQueueInner {
1135 room: WeakRoom,
1137
1138 global_update_sender: broadcast::Sender<SendQueueUpdate>,
1142
1143 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1149
1150 queue: QueueStorage,
1157
1158 notifier: Arc<Notify>,
1161
1162 locally_enabled: Arc<AtomicBool>,
1165
1166 _task: BackgroundTaskHandle,
1169}
1170
1171struct BeingSentInfo {
1173 transaction_id: OwnedTransactionId,
1175
1176 cancel_upload: Option<oneshot::Sender<()>>,
1179}
1180
1181impl BeingSentInfo {
1182 fn cancel_upload(self) -> bool {
1187 if let Some(cancel_upload) = self.cancel_upload {
1188 let _ = cancel_upload.send(());
1189 true
1190 } else {
1191 false
1192 }
1193 }
1194}
1195
1196#[derive(Clone)]
1199struct StoreLock {
1200 client: WeakClient,
1202
1203 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1208}
1209
1210impl StoreLock {
1211 async fn lock(&self) -> StoreLockGuard {
1213 StoreLockGuard {
1214 client: self.client.clone(),
1215 being_sent: self.being_sent.clone().lock_owned().await,
1216 }
1217 }
1218}
1219
1220struct StoreLockGuard {
1223 client: WeakClient,
1225
1226 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1229}
1230
1231impl StoreLockGuard {
1232 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1234 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1235 }
1236}
1237
1238#[derive(Clone)]
1239struct QueueStorage {
1240 store: StoreLock,
1243
1244 room_id: OwnedRoomId,
1246
1247 thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1260}
1261
1262impl QueueStorage {
1263 const LOW_PRIORITY: usize = 0;
1265
1266 const HIGH_PRIORITY: usize = 10;
1268
1269 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1271 Self {
1272 room_id: room,
1273 store: StoreLock { client, being_sent: Default::default() },
1274 thumbnail_file_sizes: Default::default(),
1275 }
1276 }
1277
1278 async fn push(
1282 &self,
1283 request: QueuedRequestKind,
1284 created_at: MilliSecondsSinceUnixEpoch,
1285 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1286 let transaction_id = TransactionId::new();
1287
1288 self.store
1289 .lock()
1290 .await
1291 .client()?
1292 .state_store()
1293 .save_send_queue_request(
1294 &self.room_id,
1295 transaction_id.clone(),
1296 created_at,
1297 request,
1298 Self::LOW_PRIORITY,
1299 )
1300 .await?;
1301
1302 Ok(transaction_id)
1303 }
1304
1305 async fn peek_next_to_send(
1310 &self,
1311 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1312 {
1313 let mut guard = self.store.lock().await;
1314 let queued_requests =
1315 guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1316
1317 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1318 let (cancel_upload_tx, cancel_upload_rx) =
1319 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1320 let (tx, rx) = oneshot::channel();
1321 (Some(tx), Some(rx))
1322 } else {
1323 Default::default()
1324 };
1325
1326 let prev = guard.being_sent.replace(BeingSentInfo {
1327 transaction_id: request.transaction_id.clone(),
1328 cancel_upload: cancel_upload_tx,
1329 });
1330
1331 if let Some(prev) = prev {
1332 error!(
1333 prev_txn = ?prev.transaction_id,
1334 "a previous request was still active while picking a new one"
1335 );
1336 }
1337
1338 Ok(Some((request.clone(), cancel_upload_rx)))
1339 } else {
1340 Ok(None)
1341 }
1342 }
1343
1344 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1348 let was_being_sent = self.store.lock().await.being_sent.take();
1349
1350 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1351 if prev_txn != Some(transaction_id) {
1352 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1353 }
1354 }
1355
1356 async fn mark_as_wedged(
1360 &self,
1361 transaction_id: &TransactionId,
1362 reason: QueueWedgeError,
1363 ) -> Result<(), RoomSendQueueStorageError> {
1364 let mut guard = self.store.lock().await;
1366 let was_being_sent = guard.being_sent.take();
1367
1368 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1369 if prev_txn != Some(transaction_id) {
1370 error!(
1371 ?prev_txn,
1372 "previous active request didn't match that we expect (after permanent error)",
1373 );
1374 }
1375
1376 Ok(guard
1377 .client()?
1378 .state_store()
1379 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1380 .await?)
1381 }
1382
1383 async fn mark_as_unwedged(
1386 &self,
1387 transaction_id: &TransactionId,
1388 ) -> Result<(), RoomSendQueueStorageError> {
1389 Ok(self
1390 .store
1391 .lock()
1392 .await
1393 .client()?
1394 .state_store()
1395 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1396 .await?)
1397 }
1398
1399 async fn mark_as_sent(
1402 &self,
1403 transaction_id: &TransactionId,
1404 parent_key: SentRequestKey,
1405 ) -> Result<(), RoomSendQueueStorageError> {
1406 let mut guard = self.store.lock().await;
1408 let was_being_sent = guard.being_sent.take();
1409
1410 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1411 if prev_txn != Some(transaction_id) {
1412 error!(
1413 ?prev_txn,
1414 "previous active request didn't match that we expect (after successful send)",
1415 );
1416 }
1417
1418 let client = guard.client()?;
1419 let store = client.state_store();
1420
1421 store
1423 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1424 .await?;
1425
1426 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1427
1428 if !removed {
1429 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1430 }
1431
1432 self.thumbnail_file_sizes.lock().remove(transaction_id);
1433
1434 Ok(())
1435 }
1436
1437 async fn cancel_event(
1444 &self,
1445 transaction_id: &TransactionId,
1446 ) -> Result<bool, RoomSendQueueStorageError> {
1447 let guard = self.store.lock().await;
1448
1449 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1450 == Some(transaction_id)
1451 {
1452 guard
1454 .client()?
1455 .state_store()
1456 .save_dependent_queued_request(
1457 &self.room_id,
1458 transaction_id,
1459 ChildTransactionId::new(),
1460 MilliSecondsSinceUnixEpoch::now(),
1461 DependentQueuedRequestKind::RedactEvent,
1462 )
1463 .await?;
1464
1465 return Ok(true);
1466 }
1467
1468 let removed = guard
1469 .client()?
1470 .state_store()
1471 .remove_send_queue_request(&self.room_id, transaction_id)
1472 .await?;
1473
1474 self.thumbnail_file_sizes.lock().remove(transaction_id);
1475
1476 Ok(removed)
1477 }
1478
1479 async fn replace_event(
1486 &self,
1487 transaction_id: &TransactionId,
1488 serializable: SerializableEventContent,
1489 ) -> Result<bool, RoomSendQueueStorageError> {
1490 let guard = self.store.lock().await;
1491
1492 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1493 == Some(transaction_id)
1494 {
1495 guard
1497 .client()?
1498 .state_store()
1499 .save_dependent_queued_request(
1500 &self.room_id,
1501 transaction_id,
1502 ChildTransactionId::new(),
1503 MilliSecondsSinceUnixEpoch::now(),
1504 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1505 )
1506 .await?;
1507
1508 return Ok(true);
1509 }
1510
1511 let edited = guard
1512 .client()?
1513 .state_store()
1514 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1515 .await?;
1516
1517 Ok(edited)
1518 }
1519
1520 #[allow(clippy::too_many_arguments)]
1524 async fn push_media(
1525 &self,
1526 event: RoomMessageEventContent,
1527 content_type: Mime,
1528 send_event_txn: OwnedTransactionId,
1529 created_at: MilliSecondsSinceUnixEpoch,
1530 upload_file_txn: OwnedTransactionId,
1531 file_media_request: MediaRequestParameters,
1532 thumbnail: Option<QueueThumbnailInfo>,
1533 ) -> Result<(), RoomSendQueueStorageError> {
1534 let guard = self.store.lock().await;
1535 let client = guard.client()?;
1536 let store = client.state_store();
1537
1538 let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1540
1541 let thumbnail_info = self
1542 .push_thumbnail_and_media_uploads(
1543 store,
1544 &content_type,
1545 send_event_txn.clone(),
1546 created_at,
1547 upload_file_txn.clone(),
1548 file_media_request,
1549 thumbnail,
1550 )
1551 .await?;
1552
1553 store
1555 .save_dependent_queued_request(
1556 &self.room_id,
1557 &upload_file_txn,
1558 send_event_txn.clone().into(),
1559 created_at,
1560 DependentQueuedRequestKind::FinishUpload {
1561 local_echo: Box::new(event),
1562 file_upload: upload_file_txn.clone(),
1563 thumbnail_info,
1564 },
1565 )
1566 .await?;
1567
1568 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1569
1570 Ok(())
1571 }
1572
1573 #[cfg(feature = "unstable-msc4274")]
1577 #[allow(clippy::too_many_arguments)]
1578 async fn push_gallery(
1579 &self,
1580 event: RoomMessageEventContent,
1581 send_event_txn: OwnedTransactionId,
1582 created_at: MilliSecondsSinceUnixEpoch,
1583 item_queue_infos: Vec<GalleryItemQueueInfo>,
1584 ) -> Result<(), RoomSendQueueStorageError> {
1585 let guard = self.store.lock().await;
1586 let client = guard.client()?;
1587 let store = client.state_store();
1588
1589 let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1590 let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1591
1592 let Some((first, rest)) = item_queue_infos.split_first() else {
1593 return Ok(());
1594 };
1595
1596 let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1597 first;
1598
1599 let thumbnail_info = self
1600 .push_thumbnail_and_media_uploads(
1601 store,
1602 content_type,
1603 send_event_txn.clone(),
1604 created_at,
1605 upload_file_txn.clone(),
1606 file_media_request.clone(),
1607 thumbnail.clone(),
1608 )
1609 .await?;
1610
1611 finish_item_infos
1612 .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1613 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1614
1615 let mut last_upload_file_txn = upload_file_txn.clone();
1616
1617 for item_queue_info in rest {
1618 let GalleryItemQueueInfo {
1619 content_type,
1620 upload_file_txn,
1621 file_media_request,
1622 thumbnail,
1623 } = item_queue_info;
1624
1625 let thumbnail_info = if let Some(QueueThumbnailInfo {
1626 finish_upload_thumbnail_info: thumbnail_info,
1627 media_request_parameters: thumbnail_media_request,
1628 content_type: thumbnail_content_type,
1629 ..
1630 }) = thumbnail
1631 {
1632 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1633
1634 store
1637 .save_dependent_queued_request(
1638 &self.room_id,
1639 &last_upload_file_txn,
1640 upload_thumbnail_txn.clone().into(),
1641 created_at,
1642 DependentQueuedRequestKind::UploadFileOrThumbnail {
1643 content_type: thumbnail_content_type.to_string(),
1644 cache_key: thumbnail_media_request.clone(),
1645 related_to: send_event_txn.clone(),
1646 parent_is_thumbnail_upload: false,
1647 },
1648 )
1649 .await?;
1650
1651 last_upload_file_txn = upload_thumbnail_txn;
1652
1653 Some(thumbnail_info)
1654 } else {
1655 None
1656 };
1657
1658 store
1660 .save_dependent_queued_request(
1661 &self.room_id,
1662 &last_upload_file_txn,
1663 upload_file_txn.clone().into(),
1664 created_at,
1665 DependentQueuedRequestKind::UploadFileOrThumbnail {
1666 content_type: content_type.to_string(),
1667 cache_key: file_media_request.clone(),
1668 related_to: send_event_txn.clone(),
1669 parent_is_thumbnail_upload: thumbnail.is_some(),
1670 },
1671 )
1672 .await?;
1673
1674 finish_item_infos.push(FinishGalleryItemInfo {
1675 file_upload: upload_file_txn.clone(),
1676 thumbnail_info: thumbnail_info.cloned(),
1677 });
1678 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1679
1680 last_upload_file_txn = upload_file_txn.clone();
1681 }
1682
1683 store
1686 .save_dependent_queued_request(
1687 &self.room_id,
1688 &last_upload_file_txn,
1689 send_event_txn.clone().into(),
1690 created_at,
1691 DependentQueuedRequestKind::FinishGallery {
1692 local_echo: Box::new(event),
1693 item_infos: finish_item_infos,
1694 },
1695 )
1696 .await?;
1697
1698 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1699
1700 Ok(())
1701 }
1702
1703 #[allow(clippy::too_many_arguments)]
1709 async fn push_thumbnail_and_media_uploads(
1710 &self,
1711 store: &DynStateStore,
1712 content_type: &Mime,
1713 send_event_txn: OwnedTransactionId,
1714 created_at: MilliSecondsSinceUnixEpoch,
1715 upload_file_txn: OwnedTransactionId,
1716 file_media_request: MediaRequestParameters,
1717 thumbnail: Option<QueueThumbnailInfo>,
1718 ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1719 if let Some(QueueThumbnailInfo {
1720 finish_upload_thumbnail_info: thumbnail_info,
1721 media_request_parameters: thumbnail_media_request,
1722 content_type: thumbnail_content_type,
1723 ..
1724 }) = thumbnail
1725 {
1726 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1727
1728 store
1730 .save_send_queue_request(
1731 &self.room_id,
1732 upload_thumbnail_txn.clone(),
1733 created_at,
1734 QueuedRequestKind::MediaUpload {
1735 content_type: thumbnail_content_type.to_string(),
1736 cache_key: thumbnail_media_request,
1737 thumbnail_source: None, related_to: send_event_txn.clone(),
1739 #[cfg(feature = "unstable-msc4274")]
1740 accumulated: vec![],
1741 },
1742 Self::LOW_PRIORITY,
1743 )
1744 .await?;
1745
1746 store
1748 .save_dependent_queued_request(
1749 &self.room_id,
1750 &upload_thumbnail_txn,
1751 upload_file_txn.into(),
1752 created_at,
1753 DependentQueuedRequestKind::UploadFileOrThumbnail {
1754 content_type: content_type.to_string(),
1755 cache_key: file_media_request,
1756 related_to: send_event_txn,
1757 parent_is_thumbnail_upload: true,
1758 },
1759 )
1760 .await?;
1761
1762 Ok(Some(thumbnail_info))
1763 } else {
1764 store
1766 .save_send_queue_request(
1767 &self.room_id,
1768 upload_file_txn,
1769 created_at,
1770 QueuedRequestKind::MediaUpload {
1771 content_type: content_type.to_string(),
1772 cache_key: file_media_request,
1773 thumbnail_source: None,
1774 related_to: send_event_txn,
1775 #[cfg(feature = "unstable-msc4274")]
1776 accumulated: vec![],
1777 },
1778 Self::LOW_PRIORITY,
1779 )
1780 .await?;
1781
1782 Ok(None)
1783 }
1784 }
1785
1786 #[instrument(skip(self))]
1788 async fn react(
1789 &self,
1790 transaction_id: &TransactionId,
1791 key: String,
1792 created_at: MilliSecondsSinceUnixEpoch,
1793 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1794 let guard = self.store.lock().await;
1795 let client = guard.client()?;
1796 let store = client.state_store();
1797
1798 let requests = store.load_send_queue_requests(&self.room_id).await?;
1799
1800 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1802 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1805 if !dependent_requests
1806 .into_iter()
1807 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1808 .any(|child_txn| *child_txn == *transaction_id)
1809 {
1810 return Ok(None);
1812 }
1813 }
1814
1815 let reaction_txn_id = ChildTransactionId::new();
1817 store
1818 .save_dependent_queued_request(
1819 &self.room_id,
1820 transaction_id,
1821 reaction_txn_id.clone(),
1822 created_at,
1823 DependentQueuedRequestKind::ReactEvent { key },
1824 )
1825 .await?;
1826
1827 Ok(Some(reaction_txn_id))
1828 }
1829
1830 async fn local_echoes(
1833 &self,
1834 room: &RoomSendQueue,
1835 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1836 let guard = self.store.lock().await;
1837 let client = guard.client()?;
1838 let store = client.state_store();
1839
1840 let local_requests =
1841 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1842 Some(LocalEcho {
1843 transaction_id: queued.transaction_id.clone(),
1844 content: match queued.kind {
1845 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1846 serialized_event: content,
1847 send_handle: SendHandle {
1848 room: room.clone(),
1849 transaction_id: queued.transaction_id,
1850 media_handles: vec![],
1851 created_at: queued.created_at,
1852 },
1853 send_error: queued.error,
1854 },
1855
1856 QueuedRequestKind::MediaUpload { .. } => {
1857 return None;
1860 }
1861 },
1862 })
1863 });
1864
1865 let reactions_and_medias = store
1866 .load_dependent_queued_requests(&self.room_id)
1867 .await?
1868 .into_iter()
1869 .filter_map(|dep| match dep.kind {
1870 DependentQueuedRequestKind::EditEvent { .. }
1871 | DependentQueuedRequestKind::RedactEvent => {
1872 None
1874 }
1875
1876 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1877 transaction_id: dep.own_transaction_id.clone().into(),
1878 content: LocalEchoContent::React {
1879 key,
1880 send_handle: SendReactionHandle {
1881 room: room.clone(),
1882 transaction_id: dep.own_transaction_id,
1883 },
1884 applies_to: dep.parent_transaction_id,
1885 },
1886 }),
1887
1888 DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1889 None
1891 }
1892
1893 DependentQueuedRequestKind::FinishUpload {
1894 local_echo,
1895 file_upload,
1896 thumbnail_info,
1897 } => {
1898 Some(LocalEcho {
1900 transaction_id: dep.own_transaction_id.clone().into(),
1901 content: LocalEchoContent::Event {
1902 serialized_event: SerializableEventContent::new(&(*local_echo).into())
1903 .ok()?,
1904 send_handle: SendHandle {
1905 room: room.clone(),
1906 transaction_id: dep.own_transaction_id.into(),
1907 media_handles: vec![MediaHandles {
1908 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1909 upload_file_txn: file_upload,
1910 }],
1911 created_at: dep.created_at,
1912 },
1913 send_error: None,
1914 },
1915 })
1916 }
1917
1918 #[cfg(feature = "unstable-msc4274")]
1919 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1920 self.create_gallery_local_echo(
1922 dep.own_transaction_id,
1923 room,
1924 dep.created_at,
1925 local_echo,
1926 item_infos,
1927 )
1928 }
1929 });
1930
1931 Ok(local_requests.chain(reactions_and_medias).collect())
1932 }
1933
1934 #[cfg(feature = "unstable-msc4274")]
1936 fn create_gallery_local_echo(
1937 &self,
1938 transaction_id: ChildTransactionId,
1939 room: &RoomSendQueue,
1940 created_at: MilliSecondsSinceUnixEpoch,
1941 local_echo: Box<RoomMessageEventContent>,
1942 item_infos: Vec<FinishGalleryItemInfo>,
1943 ) -> Option<LocalEcho> {
1944 Some(LocalEcho {
1945 transaction_id: transaction_id.clone().into(),
1946 content: LocalEchoContent::Event {
1947 serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1948 send_handle: SendHandle {
1949 room: room.clone(),
1950 transaction_id: transaction_id.into(),
1951 media_handles: item_infos
1952 .into_iter()
1953 .map(|i| MediaHandles {
1954 upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1955 upload_file_txn: i.file_upload,
1956 })
1957 .collect(),
1958 created_at,
1959 },
1960 send_error: None,
1961 },
1962 })
1963 }
1964
1965 #[instrument(skip_all)]
1973 async fn try_apply_single_dependent_request(
1974 &self,
1975 client: &Client,
1976 dependent_request: DependentQueuedRequest,
1977 new_updates: &mut Vec<RoomSendQueueUpdate>,
1978 ) -> Result<bool, RoomSendQueueError> {
1979 let store = client.state_store();
1980
1981 let parent_key = dependent_request.parent_key;
1982
1983 match dependent_request.kind {
1984 DependentQueuedRequestKind::EditEvent { new_content } => {
1985 if let Some(parent_key) = parent_key {
1986 let Some(event_id) = parent_key.into_event_id() else {
1987 return Err(RoomSendQueueError::StorageError(
1988 RoomSendQueueStorageError::InvalidParentKey,
1989 ));
1990 };
1991
1992 let room = client
1994 .get_room(&self.room_id)
1995 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1996
1997 let edited_content = match new_content.deserialize() {
2001 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
2002 EditedContent::RoomMessage(c.into())
2004 }
2005
2006 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
2007 let poll_start = c.poll_start().clone();
2008 EditedContent::PollStart {
2009 fallback_text: poll_start.question.text.clone(),
2010 new_content: poll_start,
2011 }
2012 }
2013
2014 Ok(c) => {
2015 warn!("Unsupported edit content type: {:?}", c.event_type());
2016 return Ok(true);
2017 }
2018
2019 Err(err) => {
2020 warn!("Unable to deserialize: {err}");
2021 return Ok(true);
2022 }
2023 };
2024
2025 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
2026 Ok(e) => e,
2027 Err(err) => {
2028 warn!("couldn't create edited event: {err}");
2029 return Ok(true);
2030 }
2031 };
2032
2033 let serializable = SerializableEventContent::from_raw(
2035 Raw::new(&edit_event)
2036 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2037 edit_event.event_type().to_string(),
2038 );
2039
2040 store
2041 .save_send_queue_request(
2042 &self.room_id,
2043 dependent_request.own_transaction_id.into(),
2044 dependent_request.created_at,
2045 serializable.into(),
2046 Self::HIGH_PRIORITY,
2047 )
2048 .await
2049 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2050 } else {
2051 let edited = store
2053 .update_send_queue_request(
2054 &self.room_id,
2055 &dependent_request.parent_transaction_id,
2056 new_content.into(),
2057 )
2058 .await
2059 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2060
2061 if !edited {
2062 warn!("missing local echo upon dependent edit");
2063 }
2064 }
2065 }
2066
2067 DependentQueuedRequestKind::RedactEvent => {
2068 if let Some(parent_key) = parent_key {
2069 let Some(event_id) = parent_key.into_event_id() else {
2070 return Err(RoomSendQueueError::StorageError(
2071 RoomSendQueueStorageError::InvalidParentKey,
2072 ));
2073 };
2074
2075 let room = client
2077 .get_room(&self.room_id)
2078 .ok_or(RoomSendQueueError::RoomDisappeared)?;
2079
2080 if let Err(err) = room
2088 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
2089 .await
2090 {
2091 warn!("error when sending a redact for {event_id}: {err}");
2092 return Ok(false);
2093 }
2094 } else {
2095 let removed = store
2098 .remove_send_queue_request(
2099 &self.room_id,
2100 &dependent_request.parent_transaction_id,
2101 )
2102 .await
2103 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2104
2105 if !removed {
2106 warn!("missing local echo upon dependent redact");
2107 }
2108 }
2109 }
2110
2111 DependentQueuedRequestKind::ReactEvent { key } => {
2112 if let Some(parent_key) = parent_key {
2113 let Some(parent_event_id) = parent_key.into_event_id() else {
2114 return Err(RoomSendQueueError::StorageError(
2115 RoomSendQueueStorageError::InvalidParentKey,
2116 ));
2117 };
2118
2119 let react_event =
2121 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
2122 let serializable = SerializableEventContent::from_raw(
2123 Raw::new(&react_event)
2124 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2125 react_event.event_type().to_string(),
2126 );
2127
2128 store
2129 .save_send_queue_request(
2130 &self.room_id,
2131 dependent_request.own_transaction_id.into(),
2132 dependent_request.created_at,
2133 serializable.into(),
2134 Self::HIGH_PRIORITY,
2135 )
2136 .await
2137 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2138 } else {
2139 return Ok(false);
2141 }
2142 }
2143
2144 DependentQueuedRequestKind::UploadFileOrThumbnail {
2145 content_type,
2146 cache_key,
2147 related_to,
2148 parent_is_thumbnail_upload,
2149 } => {
2150 let Some(parent_key) = parent_key else {
2151 return Ok(false);
2153 };
2154 self.handle_dependent_file_or_thumbnail_upload(
2155 client,
2156 dependent_request.own_transaction_id.into(),
2157 parent_key,
2158 content_type,
2159 cache_key,
2160 related_to,
2161 parent_is_thumbnail_upload,
2162 )
2163 .await?;
2164 }
2165
2166 DependentQueuedRequestKind::FinishUpload {
2167 local_echo,
2168 file_upload,
2169 thumbnail_info,
2170 } => {
2171 let Some(parent_key) = parent_key else {
2172 return Ok(false);
2174 };
2175 self.handle_dependent_finish_upload(
2176 client,
2177 dependent_request.own_transaction_id.into(),
2178 parent_key,
2179 *local_echo,
2180 file_upload,
2181 thumbnail_info,
2182 new_updates,
2183 )
2184 .await?;
2185 }
2186
2187 #[cfg(feature = "unstable-msc4274")]
2188 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2189 let Some(parent_key) = parent_key else {
2190 return Ok(false);
2192 };
2193 self.handle_dependent_finish_gallery_upload(
2194 client,
2195 dependent_request.own_transaction_id.into(),
2196 parent_key,
2197 *local_echo,
2198 item_infos,
2199 new_updates,
2200 )
2201 .await?;
2202 }
2203 }
2204
2205 Ok(true)
2206 }
2207
2208 #[instrument(skip(self))]
2209 async fn apply_dependent_requests(
2210 &self,
2211 new_updates: &mut Vec<RoomSendQueueUpdate>,
2212 ) -> Result<(), RoomSendQueueError> {
2213 let guard = self.store.lock().await;
2214
2215 let client = guard.client()?;
2216 let store = client.state_store();
2217
2218 let dependent_requests = store
2219 .load_dependent_queued_requests(&self.room_id)
2220 .await
2221 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2222
2223 let num_initial_dependent_requests = dependent_requests.len();
2224 if num_initial_dependent_requests == 0 {
2225 return Ok(());
2227 }
2228
2229 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2230
2231 for original in &dependent_requests {
2233 if !canonicalized_dependent_requests
2234 .iter()
2235 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2236 {
2237 store
2238 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2239 .await
2240 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2241 }
2242 }
2243
2244 let mut num_dependent_requests = canonicalized_dependent_requests.len();
2245
2246 debug!(
2247 num_dependent_requests,
2248 num_initial_dependent_requests, "starting handling of dependent requests"
2249 );
2250
2251 for dependent in canonicalized_dependent_requests {
2252 let dependent_id = dependent.own_transaction_id.clone();
2253
2254 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2255 Ok(should_remove) => {
2256 if should_remove {
2257 store
2259 .remove_dependent_queued_request(&self.room_id, &dependent_id)
2260 .await
2261 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2262
2263 num_dependent_requests -= 1;
2264 }
2265 }
2266
2267 Err(err) => {
2268 warn!("error when applying single dependent request: {err}");
2269 }
2270 }
2271 }
2272
2273 debug!(
2274 leftover_dependent_requests = num_dependent_requests,
2275 "stopped handling dependent request"
2276 );
2277
2278 Ok(())
2279 }
2280
2281 async fn remove_dependent_send_queue_request(
2283 &self,
2284 dependent_event_id: &ChildTransactionId,
2285 ) -> Result<bool, RoomSendQueueStorageError> {
2286 Ok(self
2287 .store
2288 .lock()
2289 .await
2290 .client()?
2291 .state_store()
2292 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2293 .await?)
2294 }
2295}
2296
2297#[cfg(feature = "unstable-msc4274")]
2298struct GalleryItemQueueInfo {
2300 content_type: Mime,
2301 upload_file_txn: OwnedTransactionId,
2302 file_media_request: MediaRequestParameters,
2303 thumbnail: Option<QueueThumbnailInfo>,
2304}
2305
2306#[derive(Clone, Debug)]
2308pub enum LocalEchoContent {
2309 Event {
2311 serialized_event: SerializableEventContent,
2314 send_handle: SendHandle,
2316 send_error: Option<QueueWedgeError>,
2319 },
2320
2321 React {
2323 key: String,
2325 send_handle: SendReactionHandle,
2327 applies_to: OwnedTransactionId,
2329 },
2330}
2331
2332#[derive(Clone, Debug)]
2335pub struct LocalEcho {
2336 pub transaction_id: OwnedTransactionId,
2338 pub content: LocalEchoContent,
2340}
2341
2342#[derive(Clone, Debug)]
2345pub enum RoomSendQueueUpdate {
2346 NewLocalEvent(LocalEcho),
2351
2352 CancelledLocalEvent {
2355 transaction_id: OwnedTransactionId,
2357 },
2358
2359 ReplacedLocalEvent {
2361 transaction_id: OwnedTransactionId,
2363
2364 new_content: SerializableEventContent,
2366 },
2367
2368 SendError {
2373 transaction_id: OwnedTransactionId,
2375 error: Arc<crate::Error>,
2377 is_recoverable: bool,
2383 },
2384
2385 RetryEvent {
2387 transaction_id: OwnedTransactionId,
2389 },
2390
2391 SentEvent {
2394 transaction_id: OwnedTransactionId,
2396 event_id: OwnedEventId,
2398 },
2399
2400 MediaUpload {
2403 related_to: OwnedTransactionId,
2405
2406 file: Option<MediaSource>,
2408
2409 index: u64,
2413
2414 progress: AbstractProgress,
2418 },
2419}
2420
2421#[derive(Clone, Debug)]
2426pub struct SendQueueUpdate {
2427 pub room_id: OwnedRoomId,
2429
2430 pub update: RoomSendQueueUpdate,
2432}
2433
2434#[derive(Debug, thiserror::Error)]
2436pub enum RoomSendQueueError {
2437 #[error("the room isn't in the joined state")]
2439 RoomNotJoined,
2440
2441 #[error("the room is now missing from the client")]
2445 RoomDisappeared,
2446
2447 #[error(transparent)]
2449 StorageError(#[from] RoomSendQueueStorageError),
2450
2451 #[error("the attachment event could not be created")]
2453 FailedToCreateAttachment,
2454
2455 #[cfg(feature = "unstable-msc4274")]
2457 #[error("the gallery contains no items")]
2458 EmptyGallery,
2459
2460 #[cfg(feature = "unstable-msc4274")]
2462 #[error("the gallery event could not be created")]
2463 FailedToCreateGallery,
2464}
2465
2466#[derive(Debug, thiserror::Error)]
2468pub enum RoomSendQueueStorageError {
2469 #[error(transparent)]
2471 StateStoreError(#[from] StoreError),
2472
2473 #[error(transparent)]
2475 EventCacheStoreError(#[from] EventCacheStoreError),
2476
2477 #[error(transparent)]
2479 MediaStoreError(#[from] MediaStoreError),
2480
2481 #[error(transparent)]
2483 LockError(#[from] CrossProcessLockError),
2484
2485 #[error(transparent)]
2487 JsonSerialization(#[from] serde_json::Error),
2488
2489 #[error("a dependent event had an invalid parent key type")]
2492 InvalidParentKey,
2493
2494 #[error("The client is shutting down.")]
2496 ClientShuttingDown,
2497
2498 #[error("This operation is not implemented for media uploads")]
2500 OperationNotImplementedYet,
2501
2502 #[error("Can't edit a media caption when the underlying event isn't a media")]
2504 InvalidMediaCaptionEdit,
2505}
2506
2507#[derive(Clone, Debug)]
2509struct MediaHandles {
2510 upload_thumbnail_txn: Option<OwnedTransactionId>,
2514
2515 upload_file_txn: OwnedTransactionId,
2517}
2518
2519#[derive(Clone, Debug)]
2521pub struct SendHandle {
2522 room: RoomSendQueue,
2524
2525 transaction_id: OwnedTransactionId,
2530
2531 media_handles: Vec<MediaHandles>,
2533
2534 pub created_at: MilliSecondsSinceUnixEpoch,
2536}
2537
2538impl SendHandle {
2539 #[cfg(test)]
2541 pub(crate) fn new(
2542 room: RoomSendQueue,
2543 transaction_id: OwnedTransactionId,
2544 created_at: MilliSecondsSinceUnixEpoch,
2545 ) -> Self {
2546 Self { room, transaction_id, media_handles: vec![], created_at }
2547 }
2548
2549 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2550 if !self.media_handles.is_empty() {
2551 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2552 } else {
2553 Ok(())
2554 }
2555 }
2556
2557 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2562 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2563 trace!("received an abort request");
2564
2565 let queue = &self.room.inner.queue;
2566
2567 for handles in &self.media_handles {
2568 if queue.abort_upload(&self.transaction_id, handles).await? {
2569 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2571 transaction_id: self.transaction_id.clone(),
2572 });
2573
2574 return Ok(true);
2575 }
2576
2577 }
2581
2582 if queue.cancel_event(&self.transaction_id).await? {
2583 trace!("successful abort");
2584
2585 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2587 transaction_id: self.transaction_id.clone(),
2588 });
2589
2590 Ok(true)
2591 } else {
2592 debug!("local echo didn't exist anymore, can't abort");
2593 Ok(false)
2594 }
2595 }
2596
2597 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2602 pub async fn edit_raw(
2603 &self,
2604 new_content: Raw<AnyMessageLikeEventContent>,
2605 event_type: String,
2606 ) -> Result<bool, RoomSendQueueStorageError> {
2607 trace!("received an edit request");
2608 self.nyi_for_uploads()?;
2609
2610 let serializable = SerializableEventContent::from_raw(new_content, event_type);
2611
2612 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2613 trace!("successful edit");
2614
2615 self.room.inner.notifier.notify_one();
2617
2618 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2620 transaction_id: self.transaction_id.clone(),
2621 new_content: serializable,
2622 });
2623
2624 Ok(true)
2625 } else {
2626 debug!("local echo doesn't exist anymore, can't edit");
2627 Ok(false)
2628 }
2629 }
2630
2631 pub async fn edit(
2636 &self,
2637 new_content: AnyMessageLikeEventContent,
2638 ) -> Result<bool, RoomSendQueueStorageError> {
2639 self.edit_raw(
2640 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2641 new_content.event_type().to_string(),
2642 )
2643 .await
2644 }
2645
2646 pub async fn edit_media_caption(
2651 &self,
2652 caption: Option<String>,
2653 formatted_caption: Option<FormattedBody>,
2654 mentions: Option<Mentions>,
2655 ) -> Result<bool, RoomSendQueueStorageError> {
2656 if let Some(new_content) = self
2657 .room
2658 .inner
2659 .queue
2660 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2661 .await?
2662 {
2663 trace!("successful edit of media caption");
2664
2665 self.room.inner.notifier.notify_one();
2667
2668 let new_content = SerializableEventContent::new(&new_content)
2669 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2670
2671 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2673 transaction_id: self.transaction_id.clone(),
2674 new_content,
2675 });
2676
2677 Ok(true)
2678 } else {
2679 debug!("local echo doesn't exist anymore, can't edit media caption");
2680 Ok(false)
2681 }
2682 }
2683
2684 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2687 let room = &self.room.inner;
2688 room.queue
2689 .mark_as_unwedged(&self.transaction_id)
2690 .await
2691 .map_err(RoomSendQueueError::StorageError)?;
2692
2693 for handles in &self.media_handles {
2701 room.queue
2702 .mark_as_unwedged(&handles.upload_file_txn)
2703 .await
2704 .map_err(RoomSendQueueError::StorageError)?;
2705
2706 if let Some(txn) = &handles.upload_thumbnail_txn {
2707 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2708 }
2709 }
2710
2711 room.notifier.notify_one();
2713
2714 self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2715 transaction_id: self.transaction_id.clone(),
2716 });
2717
2718 Ok(())
2719 }
2720
2721 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2726 pub async fn react(
2727 &self,
2728 key: String,
2729 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2730 trace!("received an intent to react");
2731
2732 let created_at = MilliSecondsSinceUnixEpoch::now();
2733 if let Some(reaction_txn_id) =
2734 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2735 {
2736 trace!("successfully queued react");
2737
2738 self.room.inner.notifier.notify_one();
2740
2741 let send_handle = SendReactionHandle {
2743 room: self.room.clone(),
2744 transaction_id: reaction_txn_id.clone(),
2745 };
2746
2747 self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2748 transaction_id: reaction_txn_id.into(),
2751 content: LocalEchoContent::React {
2752 key,
2753 send_handle: send_handle.clone(),
2754 applies_to: self.transaction_id.clone(),
2755 },
2756 }));
2757
2758 Ok(Some(send_handle))
2759 } else {
2760 debug!("local echo doesn't exist anymore, can't react");
2761 Ok(None)
2762 }
2763 }
2764}
2765
2766#[derive(Clone, Debug)]
2768pub struct SendReactionHandle {
2769 room: RoomSendQueue,
2771 transaction_id: ChildTransactionId,
2773}
2774
2775impl SendReactionHandle {
2776 #[cfg(test)]
2778 pub(crate) fn new(room: RoomSendQueue, transaction_id: ChildTransactionId) -> Self {
2779 Self { room, transaction_id }
2780 }
2781
2782 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2787 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2788 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2792 transaction_id: self.transaction_id.clone().into(),
2793 });
2794
2795 return Ok(true);
2796 }
2797
2798 let handle = SendHandle {
2801 room: self.room.clone(),
2802 transaction_id: self.transaction_id.clone().into(),
2803 media_handles: vec![],
2804 created_at: MilliSecondsSinceUnixEpoch::now(),
2805 };
2806
2807 handle.abort().await
2808 }
2809
2810 pub fn transaction_id(&self) -> &TransactionId {
2812 &self.transaction_id
2813 }
2814}
2815
2816fn canonicalize_dependent_requests(
2820 dependent: &[DependentQueuedRequest],
2821) -> Vec<DependentQueuedRequest> {
2822 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2823
2824 for d in dependent {
2825 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2826
2827 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2828 continue;
2831 }
2832
2833 match &d.kind {
2834 DependentQueuedRequestKind::EditEvent { .. } => {
2835 if let Some(prev_edit) = prevs
2837 .iter_mut()
2838 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2839 {
2840 *prev_edit = d;
2841 } else {
2842 prevs.insert(0, d);
2843 }
2844 }
2845
2846 DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2847 | DependentQueuedRequestKind::FinishUpload { .. }
2848 | DependentQueuedRequestKind::ReactEvent { .. } => {
2849 prevs.push(d);
2851 }
2852
2853 #[cfg(feature = "unstable-msc4274")]
2854 DependentQueuedRequestKind::FinishGallery { .. } => {
2855 prevs.push(d);
2857 }
2858
2859 DependentQueuedRequestKind::RedactEvent => {
2860 prevs.clear();
2862 prevs.push(d);
2863 }
2864 }
2865 }
2866
2867 by_txn.into_values().flat_map(|entries| entries.into_iter().cloned()).collect()
2868}
2869
2870#[cfg(all(test, not(target_family = "wasm")))]
2871mod tests {
2872 use std::{sync::Arc, time::Duration};
2873
2874 use assert_matches2::{assert_let, assert_matches};
2875 use matrix_sdk_base::store::{
2876 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2877 SerializableEventContent,
2878 };
2879 use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
2880 use ruma::{
2881 MilliSecondsSinceUnixEpoch, TransactionId,
2882 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
2883 room_id,
2884 };
2885
2886 use super::canonicalize_dependent_requests;
2887 use crate::{client::WeakClient, test_utils::logged_in_client};
2888
2889 #[test]
2890 fn test_canonicalize_dependent_events_created_at() {
2891 let txn = TransactionId::new();
2894 let created_at = MilliSecondsSinceUnixEpoch::now();
2895
2896 let edit = DependentQueuedRequest {
2897 own_transaction_id: ChildTransactionId::new(),
2898 parent_transaction_id: txn.clone(),
2899 kind: DependentQueuedRequestKind::EditEvent {
2900 new_content: SerializableEventContent::new(
2901 &RoomMessageEventContent::text_plain("edit").into(),
2902 )
2903 .unwrap(),
2904 },
2905 parent_key: None,
2906 created_at,
2907 };
2908
2909 let res = canonicalize_dependent_requests(&[edit]);
2910
2911 assert_eq!(res.len(), 1);
2912 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2913 assert_let!(
2914 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2915 );
2916 assert_eq!(msg.body(), "edit");
2917 assert_eq!(res[0].parent_transaction_id, txn);
2918 assert_eq!(res[0].created_at, created_at);
2919 }
2920
2921 #[async_test]
2922 async fn test_client_no_cycle_with_send_queue() {
2923 for enabled in [true, false] {
2924 let client = logged_in_client(None).await;
2925 let weak_client = WeakClient::from_client(&client);
2926
2927 {
2928 let mut sync_response_builder = SyncResponseBuilder::new();
2929
2930 let room_id = room_id!("!a:b.c");
2931
2932 client
2934 .base_client()
2935 .receive_sync_response(
2936 sync_response_builder
2937 .add_joined_room(JoinedRoomBuilder::new(room_id))
2938 .build_sync_response(),
2939 )
2940 .await
2941 .unwrap();
2942
2943 let room = client.get_room(room_id).unwrap();
2944 let q = room.send_queue();
2945
2946 let _watcher = q.subscribe().await;
2947
2948 client.send_queue().set_enabled(enabled).await;
2949 }
2950
2951 drop(client);
2952
2953 tokio::time::sleep(Duration::from_millis(500)).await;
2955
2956 let client = weak_client.get();
2958 assert!(
2959 client.is_none(),
2960 "too many strong references to the client: {}",
2961 Arc::strong_count(&client.unwrap().inner)
2962 );
2963 }
2964 }
2965
2966 #[test]
2967 fn test_canonicalize_dependent_events_smoke_test() {
2968 let txn = TransactionId::new();
2970
2971 let edit = DependentQueuedRequest {
2972 own_transaction_id: ChildTransactionId::new(),
2973 parent_transaction_id: txn.clone(),
2974 kind: DependentQueuedRequestKind::EditEvent {
2975 new_content: SerializableEventContent::new(
2976 &RoomMessageEventContent::text_plain("edit").into(),
2977 )
2978 .unwrap(),
2979 },
2980 parent_key: None,
2981 created_at: MilliSecondsSinceUnixEpoch::now(),
2982 };
2983 let res = canonicalize_dependent_requests(&[edit]);
2984
2985 assert_eq!(res.len(), 1);
2986 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2987 assert_eq!(res[0].parent_transaction_id, txn);
2988 assert!(res[0].parent_key.is_none());
2989 }
2990
2991 #[test]
2992 fn test_canonicalize_dependent_events_redaction_preferred() {
2993 let txn = TransactionId::new();
2995
2996 let mut inputs = Vec::with_capacity(100);
2997 let redact = DependentQueuedRequest {
2998 own_transaction_id: ChildTransactionId::new(),
2999 parent_transaction_id: txn.clone(),
3000 kind: DependentQueuedRequestKind::RedactEvent,
3001 parent_key: None,
3002 created_at: MilliSecondsSinceUnixEpoch::now(),
3003 };
3004
3005 let edit = DependentQueuedRequest {
3006 own_transaction_id: ChildTransactionId::new(),
3007 parent_transaction_id: txn.clone(),
3008 kind: DependentQueuedRequestKind::EditEvent {
3009 new_content: SerializableEventContent::new(
3010 &RoomMessageEventContent::text_plain("edit").into(),
3011 )
3012 .unwrap(),
3013 },
3014 parent_key: None,
3015 created_at: MilliSecondsSinceUnixEpoch::now(),
3016 };
3017
3018 inputs.push({
3019 let mut edit = edit.clone();
3020 edit.own_transaction_id = ChildTransactionId::new();
3021 edit
3022 });
3023
3024 inputs.push(redact);
3025
3026 for _ in 0..98 {
3027 let mut edit = edit.clone();
3028 edit.own_transaction_id = ChildTransactionId::new();
3029 inputs.push(edit);
3030 }
3031
3032 let res = canonicalize_dependent_requests(&inputs);
3033
3034 assert_eq!(res.len(), 1);
3035 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
3036 assert_eq!(res[0].parent_transaction_id, txn);
3037 }
3038
3039 #[test]
3040 fn test_canonicalize_dependent_events_last_edit_preferred() {
3041 let parent_txn = TransactionId::new();
3042
3043 let inputs = (0..10)
3045 .map(|i| DependentQueuedRequest {
3046 own_transaction_id: ChildTransactionId::new(),
3047 parent_transaction_id: parent_txn.clone(),
3048 kind: DependentQueuedRequestKind::EditEvent {
3049 new_content: SerializableEventContent::new(
3050 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
3051 )
3052 .unwrap(),
3053 },
3054 parent_key: None,
3055 created_at: MilliSecondsSinceUnixEpoch::now(),
3056 })
3057 .collect::<Vec<_>>();
3058
3059 let txn = inputs[9].parent_transaction_id.clone();
3060
3061 let res = canonicalize_dependent_requests(&inputs);
3062
3063 assert_eq!(res.len(), 1);
3064 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
3065 assert_let!(
3066 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
3067 );
3068 assert_eq!(msg.body(), "edit9");
3069 assert_eq!(res[0].parent_transaction_id, txn);
3070 }
3071
3072 #[test]
3073 fn test_canonicalize_multiple_local_echoes() {
3074 let txn1 = TransactionId::new();
3075 let txn2 = TransactionId::new();
3076
3077 let child1 = ChildTransactionId::new();
3078 let child2 = ChildTransactionId::new();
3079
3080 let inputs = vec![
3081 DependentQueuedRequest {
3083 own_transaction_id: child1.clone(),
3084 kind: DependentQueuedRequestKind::RedactEvent,
3085 parent_transaction_id: txn1.clone(),
3086 parent_key: None,
3087 created_at: MilliSecondsSinceUnixEpoch::now(),
3088 },
3089 DependentQueuedRequest {
3091 own_transaction_id: child2,
3092 kind: DependentQueuedRequestKind::EditEvent {
3093 new_content: SerializableEventContent::new(
3094 &RoomMessageEventContent::text_plain("edit").into(),
3095 )
3096 .unwrap(),
3097 },
3098 parent_transaction_id: txn2.clone(),
3099 parent_key: None,
3100 created_at: MilliSecondsSinceUnixEpoch::now(),
3101 },
3102 ];
3103
3104 let res = canonicalize_dependent_requests(&inputs);
3105
3106 assert_eq!(res.len(), 2);
3108
3109 for dependent in res {
3110 if dependent.own_transaction_id == child1 {
3111 assert_eq!(dependent.parent_transaction_id, txn1);
3112 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
3113 } else {
3114 assert_eq!(dependent.parent_transaction_id, txn2);
3115 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
3116 }
3117 }
3118 }
3119
3120 #[test]
3121 fn test_canonicalize_reactions_after_edits() {
3122 let txn = TransactionId::new();
3124
3125 let react_id = ChildTransactionId::new();
3126 let react = DependentQueuedRequest {
3127 own_transaction_id: react_id.clone(),
3128 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
3129 parent_transaction_id: txn.clone(),
3130 parent_key: None,
3131 created_at: MilliSecondsSinceUnixEpoch::now(),
3132 };
3133
3134 let edit_id = ChildTransactionId::new();
3135 let edit = DependentQueuedRequest {
3136 own_transaction_id: edit_id.clone(),
3137 kind: DependentQueuedRequestKind::EditEvent {
3138 new_content: SerializableEventContent::new(
3139 &RoomMessageEventContent::text_plain("edit").into(),
3140 )
3141 .unwrap(),
3142 },
3143 parent_transaction_id: txn,
3144 parent_key: None,
3145 created_at: MilliSecondsSinceUnixEpoch::now(),
3146 };
3147
3148 let res = canonicalize_dependent_requests(&[react, edit]);
3149
3150 assert_eq!(res.len(), 2);
3151 assert_eq!(res[0].own_transaction_id, edit_id);
3152 assert_eq!(res[1].own_transaction_id, react_id);
3153 }
3154}