1use std::{
132 collections::{BTreeMap, HashMap},
133 str::FromStr as _,
134 sync::{
135 Arc, RwLock,
136 atomic::{AtomicBool, Ordering},
137 },
138};
139
140use eyeball::SharedObservable;
141#[cfg(feature = "e2e-encryption")]
142use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
143#[cfg(feature = "unstable-msc4274")]
144use matrix_sdk_base::store::FinishGalleryItemInfo;
145use matrix_sdk_base::{
146 RoomState, StoreError,
147 cross_process_lock::CrossProcessLockError,
148 event_cache::store::EventCacheStoreError,
149 media::{MediaRequestParameters, store::MediaStoreError},
150 store::{
151 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
152 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
153 SentMediaInfo, SentRequestKey, SerializableEventContent,
154 },
155};
156use matrix_sdk_common::{
157 executor::{JoinHandle, spawn},
158 locks::Mutex as SyncMutex,
159};
160use mime::Mime;
161use ruma::{
162 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
163 TransactionId,
164 events::{
165 AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
166 reaction::ReactionEventContent,
167 relation::Annotation,
168 room::{
169 MediaSource,
170 message::{FormattedBody, RoomMessageEventContent},
171 },
172 },
173 serde::Raw,
174};
175use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
176use tracing::{debug, error, info, instrument, trace, warn};
177
178use crate::{
179 Client, Media, Room, TransmissionProgress,
180 client::WeakClient,
181 config::RequestConfig,
182 error::RetryKind,
183 room::{WeakRoom, edit::EditedContent},
184};
185
186mod progress;
187mod upload;
188
189pub use progress::AbstractProgress;
190
191pub struct SendQueue {
193 client: Client,
194}
195
196#[cfg(not(tarpaulin_include))]
197impl std::fmt::Debug for SendQueue {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("SendQueue").finish_non_exhaustive()
200 }
201}
202
203impl SendQueue {
204 pub(super) fn new(client: Client) -> Self {
205 Self { client }
206 }
207
208 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
211 if !self.is_enabled() {
212 return;
213 }
214
215 let room_ids =
216 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
217 |err| {
218 warn!("error when loading rooms with unsent requests: {err}");
219 Vec::new()
220 },
221 );
222
223 for room_id in room_ids {
225 if let Some(room) = self.client.get_room(&room_id) {
226 let _ = self.for_room(room);
227 }
228 }
229 }
230
231 #[inline(always)]
233 fn data(&self) -> &SendQueueData {
234 &self.client.inner.send_queue_data
235 }
236
237 pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
240 let data = self.data();
241
242 let mut map = data.rooms.write().unwrap();
243
244 let room_id = room.room_id();
245 if let Some(room_q) = map.get(room_id).cloned() {
246 return room_q;
247 }
248
249 let owned_room_id = room_id.to_owned();
250 let room_q = RoomSendQueue::new(
251 self.is_enabled(),
252 data.global_update_sender.clone(),
253 data.error_sender.clone(),
254 data.is_dropping.clone(),
255 &self.client,
256 owned_room_id.clone(),
257 data.report_media_upload_progress.clone(),
258 );
259
260 map.insert(owned_room_id, room_q.clone());
261
262 room_q
263 }
264
265 pub async fn set_enabled(&self, enabled: bool) {
275 debug!(?enabled, "setting global send queue enablement");
276
277 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
278
279 for room in self.data().rooms.read().unwrap().values() {
281 room.set_enabled(enabled);
282 }
283
284 self.respawn_tasks_for_rooms_with_unsent_requests().await;
287 }
288
289 pub fn is_enabled(&self) -> bool {
292 self.data().globally_enabled.load(Ordering::SeqCst)
293 }
294
295 pub fn enable_upload_progress(&self, enabled: bool) {
297 self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
298 }
299
300 pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
305 self.data().global_update_sender.subscribe()
306 }
307
308 pub async fn local_echoes(
310 &self,
311 ) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
312 let room_ids =
313 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
314 |err| {
315 warn!("error when loading rooms with unsent requests: {err}");
316 Vec::new()
317 },
318 );
319
320 let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
321
322 for room_id in room_ids {
323 if let Some(room) = self.client.get_room(&room_id) {
324 let queue = self.for_room(room);
325 local_echoes
326 .insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
327 }
328 }
329
330 Ok(local_echoes)
331 }
332
333 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
336 self.data().error_sender.subscribe()
337 }
338}
339
340#[derive(Clone, Debug)]
343struct QueueThumbnailInfo {
344 finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
346
347 media_request_parameters: MediaRequestParameters,
349
350 content_type: Mime,
352
353 file_size: usize,
355}
356
357#[derive(Clone, Debug)]
359pub struct SendQueueRoomError {
360 pub room_id: OwnedRoomId,
362
363 pub error: Arc<crate::Error>,
365
366 pub is_recoverable: bool,
372}
373
374impl Client {
375 pub fn send_queue(&self) -> SendQueue {
378 SendQueue::new(self.clone())
379 }
380}
381
382pub(super) struct SendQueueData {
383 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
385
386 globally_enabled: AtomicBool,
391
392 global_update_sender: broadcast::Sender<SendQueueUpdate>,
396
397 error_sender: broadcast::Sender<SendQueueRoomError>,
399
400 is_dropping: Arc<AtomicBool>,
402
403 report_media_upload_progress: Arc<AtomicBool>,
405}
406
407impl SendQueueData {
408 pub fn new(globally_enabled: bool) -> Self {
410 let (global_update_sender, _) = broadcast::channel(32);
411 let (error_sender, _) = broadcast::channel(32);
412
413 Self {
414 rooms: Default::default(),
415 globally_enabled: AtomicBool::new(globally_enabled),
416 global_update_sender,
417 error_sender,
418 is_dropping: Arc::new(false.into()),
419 report_media_upload_progress: Arc::new(false.into()),
420 }
421 }
422}
423
424impl Drop for SendQueueData {
425 fn drop(&mut self) {
426 debug!("globally dropping the send queue");
429 self.is_dropping.store(true, Ordering::SeqCst);
430
431 let rooms = self.rooms.read().unwrap();
432 for room in rooms.values() {
433 room.inner.notifier.notify_one();
434 }
435 }
436}
437
438impl Room {
439 pub fn send_queue(&self) -> RoomSendQueue {
441 self.client.send_queue().for_room(self.clone())
442 }
443}
444
445#[derive(Clone)]
449pub struct RoomSendQueue {
450 inner: Arc<RoomSendQueueInner>,
451}
452
453#[cfg(not(tarpaulin_include))]
454impl std::fmt::Debug for RoomSendQueue {
455 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
457 }
458}
459
460impl RoomSendQueue {
461 fn new(
462 globally_enabled: bool,
463 global_update_sender: broadcast::Sender<SendQueueUpdate>,
464 global_error_sender: broadcast::Sender<SendQueueRoomError>,
465 is_dropping: Arc<AtomicBool>,
466 client: &Client,
467 room_id: OwnedRoomId,
468 report_media_upload_progress: Arc<AtomicBool>,
469 ) -> Self {
470 let (update_sender, _) = broadcast::channel(32);
471
472 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
473 let notifier = Arc::new(Notify::new());
474
475 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
476 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
477
478 let task = spawn(Self::sending_task(
479 weak_room.clone(),
480 queue.clone(),
481 notifier.clone(),
482 global_update_sender.clone(),
483 update_sender.clone(),
484 locally_enabled.clone(),
485 global_error_sender,
486 is_dropping,
487 report_media_upload_progress,
488 ));
489
490 Self {
491 inner: Arc::new(RoomSendQueueInner {
492 room: weak_room,
493 global_update_sender,
494 update_sender,
495 _task: task,
496 queue,
497 notifier,
498 locally_enabled,
499 }),
500 }
501 }
502
503 pub async fn send_raw(
518 &self,
519 content: Raw<AnyMessageLikeEventContent>,
520 event_type: String,
521 ) -> Result<SendHandle, RoomSendQueueError> {
522 let Some(room) = self.inner.room.get() else {
523 return Err(RoomSendQueueError::RoomDisappeared);
524 };
525 if room.state() != RoomState::Joined {
526 return Err(RoomSendQueueError::RoomNotJoined);
527 }
528
529 let content = SerializableEventContent::from_raw(content, event_type);
530
531 let created_at = MilliSecondsSinceUnixEpoch::now();
532 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
533 trace!(%transaction_id, "manager sends a raw event to the background task");
534
535 self.inner.notifier.notify_one();
536
537 let send_handle = SendHandle {
538 room: self.clone(),
539 transaction_id: transaction_id.clone(),
540 media_handles: vec![],
541 created_at,
542 };
543
544 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
545 transaction_id,
546 content: LocalEchoContent::Event {
547 serialized_event: content,
548 send_handle: send_handle.clone(),
549 send_error: None,
550 },
551 }));
552
553 Ok(send_handle)
554 }
555
556 pub async fn send(
571 &self,
572 content: AnyMessageLikeEventContent,
573 ) -> Result<SendHandle, RoomSendQueueError> {
574 self.send_raw(
575 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
576 content.event_type().to_string(),
577 )
578 .await
579 }
580
581 pub async fn subscribe(
587 &self,
588 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
589 {
590 let local_echoes = self.inner.queue.local_echoes(self).await?;
591
592 Ok((local_echoes, self.inner.update_sender.subscribe()))
593 }
594
595 #[allow(clippy::too_many_arguments)]
601 #[instrument(skip_all, fields(room_id = %room.room_id()))]
602 async fn sending_task(
603 room: WeakRoom,
604 queue: QueueStorage,
605 notifier: Arc<Notify>,
606 global_update_sender: broadcast::Sender<SendQueueUpdate>,
607 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
608 locally_enabled: Arc<AtomicBool>,
609 global_error_sender: broadcast::Sender<SendQueueRoomError>,
610 is_dropping: Arc<AtomicBool>,
611 report_media_upload_progress: Arc<AtomicBool>,
612 ) {
613 trace!("spawned the sending task");
614
615 let room_id = room.room_id();
616
617 loop {
618 if is_dropping.load(Ordering::SeqCst) {
620 trace!("shutting down!");
621 break;
622 }
623
624 let mut new_updates = Vec::new();
627 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
628 warn!("errors when applying dependent requests: {err}");
629 }
630
631 for up in new_updates {
632 send_update(&global_update_sender, &update_sender, room_id, up);
633 }
634
635 if !locally_enabled.load(Ordering::SeqCst) {
636 trace!("not enabled, sleeping");
637 notifier.notified().await;
639 continue;
640 }
641
642 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
643 Ok(Some(request)) => request,
644
645 Ok(None) => {
646 trace!("queue is empty, sleeping");
647 notifier.notified().await;
649 continue;
650 }
651
652 Err(err) => {
653 warn!("error when loading next request to send: {err}");
654 continue;
655 }
656 };
657
658 let txn_id = queued_request.transaction_id.clone();
659 trace!(txn_id = %txn_id, "received a request to send!");
660
661 let Some(room) = room.get() else {
662 if is_dropping.load(Ordering::SeqCst) {
663 break;
664 }
665 error!("the weak room couldn't be upgraded but we're not shutting down?");
666 continue;
667 };
668
669 let (related_txn_id, media_upload_progress_info, http_progress) =
674 if let QueuedRequestKind::MediaUpload {
675 cache_key,
676 thumbnail_source,
677 #[cfg(feature = "unstable-msc4274")]
678 accumulated,
679 related_to,
680 ..
681 } = &queued_request.kind
682 {
683 let (media_upload_progress_info, http_progress) =
686 if report_media_upload_progress.load(Ordering::SeqCst) {
687 let media_upload_progress_info =
688 RoomSendQueue::create_media_upload_progress_info(
689 &queued_request.transaction_id,
690 related_to,
691 cache_key,
692 thumbnail_source.as_ref(),
693 #[cfg(feature = "unstable-msc4274")]
694 accumulated,
695 &room,
696 &queue,
697 )
698 .await;
699
700 let progress = RoomSendQueue::create_media_upload_progress_observable(
701 &media_upload_progress_info,
702 related_to,
703 &update_sender,
704 );
705
706 (Some(media_upload_progress_info), Some(progress))
707 } else {
708 Default::default()
709 };
710
711 (Some(related_to.clone()), media_upload_progress_info, http_progress)
712 } else {
713 Default::default()
714 };
715
716 match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
717 {
718 Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
719 {
720 Ok(()) => match parent_key {
721 SentRequestKey::Event(event_id) => {
722 send_update(
723 &global_update_sender,
724 &update_sender,
725 room_id,
726 RoomSendQueueUpdate::SentEvent { transaction_id: txn_id, event_id },
727 );
728 }
729
730 SentRequestKey::Media(sent_media_info) => {
731 let index =
734 media_upload_progress_info.as_ref().map_or(0, |info| info.index);
735 let progress = media_upload_progress_info
736 .as_ref()
737 .map(|info| {
738 AbstractProgress { current: info.bytes, total: info.bytes }
739 + info.offsets
740 })
741 .unwrap_or(AbstractProgress { current: 1, total: 1 });
742
743 let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
746 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
747 file: Some(sent_media_info.file),
748 index,
749 progress,
750 });
751 }
752 },
753
754 Err(err) => {
755 warn!("unable to mark queued request as sent: {err}");
756 }
757 },
758
759 Ok(None) => {
760 debug!("Request has been aborted while running, continuing.");
761 }
762
763 Err(err) => {
764 let is_recoverable = match err {
765 crate::Error::Http(ref http_err) => {
766 matches!(
768 http_err.retry_kind(),
769 RetryKind::Transient { .. } | RetryKind::NetworkFailure
770 )
771 }
772
773 crate::Error::ConcurrentRequestFailed => true,
778
779 _ => false,
781 };
782
783 locally_enabled.store(false, Ordering::SeqCst);
785
786 if is_recoverable {
787 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
788
789 queue.mark_as_not_being_sent(&txn_id).await;
792
793 } else {
799 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
800
801 if let Err(storage_error) =
803 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
804 {
805 warn!("unable to mark request as wedged: {storage_error}");
806 }
807 }
808
809 let error = Arc::new(err);
810
811 let _ = global_error_sender.send(SendQueueRoomError {
812 room_id: room_id.to_owned(),
813 error: error.clone(),
814 is_recoverable,
815 });
816
817 send_update(
818 &global_update_sender,
819 &update_sender,
820 room_id,
821 RoomSendQueueUpdate::SendError {
822 transaction_id: related_txn_id.unwrap_or(txn_id),
823 error,
824 is_recoverable,
825 },
826 );
827 }
828 }
829 }
830
831 info!("exited sending task");
832 }
833
834 async fn handle_request(
838 room: &Room,
839 request: QueuedRequest,
840 cancel_upload_rx: Option<oneshot::Receiver<()>>,
841 progress: Option<SharedObservable<TransmissionProgress>>,
842 ) -> Result<Option<SentRequestKey>, crate::Error> {
843 match request.kind {
844 QueuedRequestKind::Event { content } => {
845 let (event, event_type) = content.raw();
846
847 let res = room
848 .send_raw(event_type, event)
849 .with_transaction_id(&request.transaction_id)
850 .with_request_config(RequestConfig::short_retry())
851 .await?;
852
853 trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
854 Ok(Some(SentRequestKey::Event(res.event_id)))
855 }
856
857 QueuedRequestKind::MediaUpload {
858 content_type,
859 cache_key,
860 thumbnail_source,
861 related_to: relates_to,
862 #[cfg(feature = "unstable-msc4274")]
863 accumulated,
864 } => {
865 trace!(%relates_to, "uploading media related to event");
866
867 let fut = async move {
868 let data = room
869 .client()
870 .media_store()
871 .lock()
872 .await?
873 .get_media_content(&cache_key)
874 .await?
875 .ok_or(crate::Error::SendQueueWedgeError(Box::new(
876 QueueWedgeError::MissingMediaContent,
877 )))?;
878
879 let mime = Mime::from_str(&content_type).map_err(|_| {
880 crate::Error::SendQueueWedgeError(Box::new(
881 QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
882 ))
883 })?;
884
885 #[cfg(feature = "e2e-encryption")]
886 let media_source = if room.latest_encryption_state().await?.is_encrypted() {
887 trace!("upload will be encrypted (encrypted room)");
888
889 let mut cursor = std::io::Cursor::new(data);
890 let mut req = room
891 .client
892 .upload_encrypted_file(&mut cursor)
893 .with_request_config(RequestConfig::short_retry());
894 if let Some(progress) = progress {
895 req = req.with_send_progress_observable(progress);
896 }
897 let encrypted_file = req.await?;
898
899 MediaSource::Encrypted(Box::new(encrypted_file))
900 } else {
901 trace!("upload will be in clear text (room without encryption)");
902
903 let request_config = RequestConfig::short_retry()
904 .timeout(Media::reasonable_upload_timeout(&data));
905 let mut req =
906 room.client().media().upload(&mime, data, Some(request_config));
907 if let Some(progress) = progress {
908 req = req.with_send_progress_observable(progress);
909 }
910 let res = req.await?;
911
912 MediaSource::Plain(res.content_uri)
913 };
914
915 #[cfg(not(feature = "e2e-encryption"))]
916 let media_source = {
917 let request_config = RequestConfig::short_retry()
918 .timeout(Media::reasonable_upload_timeout(&data));
919 let mut req =
920 room.client().media().upload(&mime, data, Some(request_config));
921 if let Some(progress) = progress {
922 req = req.with_send_progress_observable(progress);
923 }
924 let res = req.await?;
925 MediaSource::Plain(res.content_uri)
926 };
927
928 let uri = match &media_source {
929 MediaSource::Plain(uri) => uri,
930 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
931 };
932 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
933
934 Ok(SentRequestKey::Media(SentMediaInfo {
935 file: media_source,
936 thumbnail: thumbnail_source,
937 #[cfg(feature = "unstable-msc4274")]
938 accumulated,
939 }))
940 };
941
942 let wait_for_cancel = async move {
943 if let Some(rx) = cancel_upload_rx {
944 rx.await
945 } else {
946 std::future::pending().await
947 }
948 };
949
950 tokio::select! {
951 biased;
952
953 _ = wait_for_cancel => {
954 Ok(None)
955 }
956
957 res = fut => {
958 res.map(Some)
959 }
960 }
961 }
962 }
963 }
964
965 pub fn is_enabled(&self) -> bool {
967 self.inner.locally_enabled.load(Ordering::SeqCst)
968 }
969
970 pub fn set_enabled(&self, enabled: bool) {
972 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
973
974 if enabled {
977 self.inner.notifier.notify_one();
978 }
979 }
980
981 fn send_update(&self, update: RoomSendQueueUpdate) {
985 let _ = self.inner.update_sender.send(update.clone());
986 let _ = self
987 .inner
988 .global_update_sender
989 .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
990 }
991}
992
993fn send_update(
994 global_update_sender: &broadcast::Sender<SendQueueUpdate>,
995 update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
996 room_id: &RoomId,
997 update: RoomSendQueueUpdate,
998) {
999 let _ = update_sender.send(update.clone());
1000 let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
1001}
1002
1003impl From<&crate::Error> for QueueWedgeError {
1004 fn from(value: &crate::Error) -> Self {
1005 match value {
1006 #[cfg(feature = "e2e-encryption")]
1007 crate::Error::OlmError(error) => match &**error {
1008 OlmError::SessionRecipientCollectionError(error) => match error {
1009 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
1010 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
1011 }
1012
1013 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
1014 QueueWedgeError::IdentityViolations { users: users.clone() }
1015 }
1016
1017 SessionRecipientCollectionError::CrossSigningNotSetup
1018 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
1019 QueueWedgeError::CrossVerificationRequired
1020 }
1021 },
1022 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1023 },
1024
1025 crate::Error::SendQueueWedgeError(error) => *error.clone(),
1027
1028 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1029 }
1030 }
1031}
1032
1033struct RoomSendQueueInner {
1034 room: WeakRoom,
1036
1037 global_update_sender: broadcast::Sender<SendQueueUpdate>,
1041
1042 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1048
1049 queue: QueueStorage,
1056
1057 notifier: Arc<Notify>,
1060
1061 locally_enabled: Arc<AtomicBool>,
1064
1065 _task: JoinHandle<()>,
1068}
1069
1070struct BeingSentInfo {
1072 transaction_id: OwnedTransactionId,
1074
1075 cancel_upload: Option<oneshot::Sender<()>>,
1078}
1079
1080impl BeingSentInfo {
1081 fn cancel_upload(self) -> bool {
1086 if let Some(cancel_upload) = self.cancel_upload {
1087 let _ = cancel_upload.send(());
1088 true
1089 } else {
1090 false
1091 }
1092 }
1093}
1094
1095#[derive(Clone)]
1098struct StoreLock {
1099 client: WeakClient,
1101
1102 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1107}
1108
1109impl StoreLock {
1110 async fn lock(&self) -> StoreLockGuard {
1112 StoreLockGuard {
1113 client: self.client.clone(),
1114 being_sent: self.being_sent.clone().lock_owned().await,
1115 }
1116 }
1117}
1118
1119struct StoreLockGuard {
1122 client: WeakClient,
1124
1125 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1128}
1129
1130impl StoreLockGuard {
1131 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1133 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1134 }
1135}
1136
1137#[derive(Clone)]
1138struct QueueStorage {
1139 store: StoreLock,
1142
1143 room_id: OwnedRoomId,
1145
1146 thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1159}
1160
1161impl QueueStorage {
1162 const LOW_PRIORITY: usize = 0;
1164
1165 const HIGH_PRIORITY: usize = 10;
1167
1168 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1170 Self {
1171 room_id: room,
1172 store: StoreLock { client, being_sent: Default::default() },
1173 thumbnail_file_sizes: Default::default(),
1174 }
1175 }
1176
1177 async fn push(
1181 &self,
1182 request: QueuedRequestKind,
1183 created_at: MilliSecondsSinceUnixEpoch,
1184 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1185 let transaction_id = TransactionId::new();
1186
1187 self.store
1188 .lock()
1189 .await
1190 .client()?
1191 .state_store()
1192 .save_send_queue_request(
1193 &self.room_id,
1194 transaction_id.clone(),
1195 created_at,
1196 request,
1197 Self::LOW_PRIORITY,
1198 )
1199 .await?;
1200
1201 Ok(transaction_id)
1202 }
1203
1204 async fn peek_next_to_send(
1209 &self,
1210 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1211 {
1212 let mut guard = self.store.lock().await;
1213 let queued_requests =
1214 guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1215
1216 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1217 let (cancel_upload_tx, cancel_upload_rx) =
1218 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1219 let (tx, rx) = oneshot::channel();
1220 (Some(tx), Some(rx))
1221 } else {
1222 Default::default()
1223 };
1224
1225 let prev = guard.being_sent.replace(BeingSentInfo {
1226 transaction_id: request.transaction_id.clone(),
1227 cancel_upload: cancel_upload_tx,
1228 });
1229
1230 if let Some(prev) = prev {
1231 error!(
1232 prev_txn = ?prev.transaction_id,
1233 "a previous request was still active while picking a new one"
1234 );
1235 }
1236
1237 Ok(Some((request.clone(), cancel_upload_rx)))
1238 } else {
1239 Ok(None)
1240 }
1241 }
1242
1243 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1247 let was_being_sent = self.store.lock().await.being_sent.take();
1248
1249 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1250 if prev_txn != Some(transaction_id) {
1251 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1252 }
1253 }
1254
1255 async fn mark_as_wedged(
1259 &self,
1260 transaction_id: &TransactionId,
1261 reason: QueueWedgeError,
1262 ) -> Result<(), RoomSendQueueStorageError> {
1263 let mut guard = self.store.lock().await;
1265 let was_being_sent = guard.being_sent.take();
1266
1267 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1268 if prev_txn != Some(transaction_id) {
1269 error!(
1270 ?prev_txn,
1271 "previous active request didn't match that we expect (after permanent error)",
1272 );
1273 }
1274
1275 Ok(guard
1276 .client()?
1277 .state_store()
1278 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1279 .await?)
1280 }
1281
1282 async fn mark_as_unwedged(
1285 &self,
1286 transaction_id: &TransactionId,
1287 ) -> Result<(), RoomSendQueueStorageError> {
1288 Ok(self
1289 .store
1290 .lock()
1291 .await
1292 .client()?
1293 .state_store()
1294 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1295 .await?)
1296 }
1297
1298 async fn mark_as_sent(
1301 &self,
1302 transaction_id: &TransactionId,
1303 parent_key: SentRequestKey,
1304 ) -> Result<(), RoomSendQueueStorageError> {
1305 let mut guard = self.store.lock().await;
1307 let was_being_sent = guard.being_sent.take();
1308
1309 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1310 if prev_txn != Some(transaction_id) {
1311 error!(
1312 ?prev_txn,
1313 "previous active request didn't match that we expect (after successful send)",
1314 );
1315 }
1316
1317 let client = guard.client()?;
1318 let store = client.state_store();
1319
1320 store
1322 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1323 .await?;
1324
1325 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1326
1327 if !removed {
1328 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1329 }
1330
1331 self.thumbnail_file_sizes.lock().remove(transaction_id);
1332
1333 Ok(())
1334 }
1335
1336 async fn cancel_event(
1343 &self,
1344 transaction_id: &TransactionId,
1345 ) -> Result<bool, RoomSendQueueStorageError> {
1346 let guard = self.store.lock().await;
1347
1348 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1349 == Some(transaction_id)
1350 {
1351 guard
1353 .client()?
1354 .state_store()
1355 .save_dependent_queued_request(
1356 &self.room_id,
1357 transaction_id,
1358 ChildTransactionId::new(),
1359 MilliSecondsSinceUnixEpoch::now(),
1360 DependentQueuedRequestKind::RedactEvent,
1361 )
1362 .await?;
1363
1364 return Ok(true);
1365 }
1366
1367 let removed = guard
1368 .client()?
1369 .state_store()
1370 .remove_send_queue_request(&self.room_id, transaction_id)
1371 .await?;
1372
1373 self.thumbnail_file_sizes.lock().remove(transaction_id);
1374
1375 Ok(removed)
1376 }
1377
1378 async fn replace_event(
1385 &self,
1386 transaction_id: &TransactionId,
1387 serializable: SerializableEventContent,
1388 ) -> Result<bool, RoomSendQueueStorageError> {
1389 let guard = self.store.lock().await;
1390
1391 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1392 == Some(transaction_id)
1393 {
1394 guard
1396 .client()?
1397 .state_store()
1398 .save_dependent_queued_request(
1399 &self.room_id,
1400 transaction_id,
1401 ChildTransactionId::new(),
1402 MilliSecondsSinceUnixEpoch::now(),
1403 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1404 )
1405 .await?;
1406
1407 return Ok(true);
1408 }
1409
1410 let edited = guard
1411 .client()?
1412 .state_store()
1413 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1414 .await?;
1415
1416 Ok(edited)
1417 }
1418
1419 #[allow(clippy::too_many_arguments)]
1423 async fn push_media(
1424 &self,
1425 event: RoomMessageEventContent,
1426 content_type: Mime,
1427 send_event_txn: OwnedTransactionId,
1428 created_at: MilliSecondsSinceUnixEpoch,
1429 upload_file_txn: OwnedTransactionId,
1430 file_media_request: MediaRequestParameters,
1431 thumbnail: Option<QueueThumbnailInfo>,
1432 ) -> Result<(), RoomSendQueueStorageError> {
1433 let guard = self.store.lock().await;
1434 let client = guard.client()?;
1435 let store = client.state_store();
1436
1437 let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1439
1440 let thumbnail_info = self
1441 .push_thumbnail_and_media_uploads(
1442 store,
1443 &content_type,
1444 send_event_txn.clone(),
1445 created_at,
1446 upload_file_txn.clone(),
1447 file_media_request,
1448 thumbnail,
1449 )
1450 .await?;
1451
1452 store
1454 .save_dependent_queued_request(
1455 &self.room_id,
1456 &upload_file_txn,
1457 send_event_txn.clone().into(),
1458 created_at,
1459 DependentQueuedRequestKind::FinishUpload {
1460 local_echo: Box::new(event),
1461 file_upload: upload_file_txn.clone(),
1462 thumbnail_info,
1463 },
1464 )
1465 .await?;
1466
1467 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1468
1469 Ok(())
1470 }
1471
1472 #[cfg(feature = "unstable-msc4274")]
1476 #[allow(clippy::too_many_arguments)]
1477 async fn push_gallery(
1478 &self,
1479 event: RoomMessageEventContent,
1480 send_event_txn: OwnedTransactionId,
1481 created_at: MilliSecondsSinceUnixEpoch,
1482 item_queue_infos: Vec<GalleryItemQueueInfo>,
1483 ) -> Result<(), RoomSendQueueStorageError> {
1484 let guard = self.store.lock().await;
1485 let client = guard.client()?;
1486 let store = client.state_store();
1487
1488 let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1489 let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1490
1491 let Some((first, rest)) = item_queue_infos.split_first() else {
1492 return Ok(());
1493 };
1494
1495 let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1496 first;
1497
1498 let thumbnail_info = self
1499 .push_thumbnail_and_media_uploads(
1500 store,
1501 content_type,
1502 send_event_txn.clone(),
1503 created_at,
1504 upload_file_txn.clone(),
1505 file_media_request.clone(),
1506 thumbnail.clone(),
1507 )
1508 .await?;
1509
1510 finish_item_infos
1511 .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1512 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1513
1514 let mut last_upload_file_txn = upload_file_txn.clone();
1515
1516 for item_queue_info in rest {
1517 let GalleryItemQueueInfo {
1518 content_type,
1519 upload_file_txn,
1520 file_media_request,
1521 thumbnail,
1522 } = item_queue_info;
1523
1524 let thumbnail_info = if let Some(QueueThumbnailInfo {
1525 finish_upload_thumbnail_info: thumbnail_info,
1526 media_request_parameters: thumbnail_media_request,
1527 content_type: thumbnail_content_type,
1528 ..
1529 }) = thumbnail
1530 {
1531 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1532
1533 store
1536 .save_dependent_queued_request(
1537 &self.room_id,
1538 &last_upload_file_txn,
1539 upload_thumbnail_txn.clone().into(),
1540 created_at,
1541 DependentQueuedRequestKind::UploadFileOrThumbnail {
1542 content_type: thumbnail_content_type.to_string(),
1543 cache_key: thumbnail_media_request.clone(),
1544 related_to: send_event_txn.clone(),
1545 parent_is_thumbnail_upload: false,
1546 },
1547 )
1548 .await?;
1549
1550 last_upload_file_txn = upload_thumbnail_txn;
1551
1552 Some(thumbnail_info)
1553 } else {
1554 None
1555 };
1556
1557 store
1559 .save_dependent_queued_request(
1560 &self.room_id,
1561 &last_upload_file_txn,
1562 upload_file_txn.clone().into(),
1563 created_at,
1564 DependentQueuedRequestKind::UploadFileOrThumbnail {
1565 content_type: content_type.to_string(),
1566 cache_key: file_media_request.clone(),
1567 related_to: send_event_txn.clone(),
1568 parent_is_thumbnail_upload: thumbnail.is_some(),
1569 },
1570 )
1571 .await?;
1572
1573 finish_item_infos.push(FinishGalleryItemInfo {
1574 file_upload: upload_file_txn.clone(),
1575 thumbnail_info: thumbnail_info.cloned(),
1576 });
1577 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1578
1579 last_upload_file_txn = upload_file_txn.clone();
1580 }
1581
1582 store
1585 .save_dependent_queued_request(
1586 &self.room_id,
1587 &last_upload_file_txn,
1588 send_event_txn.clone().into(),
1589 created_at,
1590 DependentQueuedRequestKind::FinishGallery {
1591 local_echo: Box::new(event),
1592 item_infos: finish_item_infos,
1593 },
1594 )
1595 .await?;
1596
1597 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1598
1599 Ok(())
1600 }
1601
1602 #[allow(clippy::too_many_arguments)]
1608 async fn push_thumbnail_and_media_uploads(
1609 &self,
1610 store: &DynStateStore,
1611 content_type: &Mime,
1612 send_event_txn: OwnedTransactionId,
1613 created_at: MilliSecondsSinceUnixEpoch,
1614 upload_file_txn: OwnedTransactionId,
1615 file_media_request: MediaRequestParameters,
1616 thumbnail: Option<QueueThumbnailInfo>,
1617 ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1618 if let Some(QueueThumbnailInfo {
1619 finish_upload_thumbnail_info: thumbnail_info,
1620 media_request_parameters: thumbnail_media_request,
1621 content_type: thumbnail_content_type,
1622 ..
1623 }) = thumbnail
1624 {
1625 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1626
1627 store
1629 .save_send_queue_request(
1630 &self.room_id,
1631 upload_thumbnail_txn.clone(),
1632 created_at,
1633 QueuedRequestKind::MediaUpload {
1634 content_type: thumbnail_content_type.to_string(),
1635 cache_key: thumbnail_media_request,
1636 thumbnail_source: None, related_to: send_event_txn.clone(),
1638 #[cfg(feature = "unstable-msc4274")]
1639 accumulated: vec![],
1640 },
1641 Self::LOW_PRIORITY,
1642 )
1643 .await?;
1644
1645 store
1647 .save_dependent_queued_request(
1648 &self.room_id,
1649 &upload_thumbnail_txn,
1650 upload_file_txn.into(),
1651 created_at,
1652 DependentQueuedRequestKind::UploadFileOrThumbnail {
1653 content_type: content_type.to_string(),
1654 cache_key: file_media_request,
1655 related_to: send_event_txn,
1656 parent_is_thumbnail_upload: true,
1657 },
1658 )
1659 .await?;
1660
1661 Ok(Some(thumbnail_info))
1662 } else {
1663 store
1665 .save_send_queue_request(
1666 &self.room_id,
1667 upload_file_txn,
1668 created_at,
1669 QueuedRequestKind::MediaUpload {
1670 content_type: content_type.to_string(),
1671 cache_key: file_media_request,
1672 thumbnail_source: None,
1673 related_to: send_event_txn,
1674 #[cfg(feature = "unstable-msc4274")]
1675 accumulated: vec![],
1676 },
1677 Self::LOW_PRIORITY,
1678 )
1679 .await?;
1680
1681 Ok(None)
1682 }
1683 }
1684
1685 #[instrument(skip(self))]
1687 async fn react(
1688 &self,
1689 transaction_id: &TransactionId,
1690 key: String,
1691 created_at: MilliSecondsSinceUnixEpoch,
1692 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1693 let guard = self.store.lock().await;
1694 let client = guard.client()?;
1695 let store = client.state_store();
1696
1697 let requests = store.load_send_queue_requests(&self.room_id).await?;
1698
1699 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1701 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1704 if !dependent_requests
1705 .into_iter()
1706 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1707 .any(|child_txn| *child_txn == *transaction_id)
1708 {
1709 return Ok(None);
1711 }
1712 }
1713
1714 let reaction_txn_id = ChildTransactionId::new();
1716 store
1717 .save_dependent_queued_request(
1718 &self.room_id,
1719 transaction_id,
1720 reaction_txn_id.clone(),
1721 created_at,
1722 DependentQueuedRequestKind::ReactEvent { key },
1723 )
1724 .await?;
1725
1726 Ok(Some(reaction_txn_id))
1727 }
1728
1729 async fn local_echoes(
1732 &self,
1733 room: &RoomSendQueue,
1734 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1735 let guard = self.store.lock().await;
1736 let client = guard.client()?;
1737 let store = client.state_store();
1738
1739 let local_requests =
1740 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1741 Some(LocalEcho {
1742 transaction_id: queued.transaction_id.clone(),
1743 content: match queued.kind {
1744 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1745 serialized_event: content,
1746 send_handle: SendHandle {
1747 room: room.clone(),
1748 transaction_id: queued.transaction_id,
1749 media_handles: vec![],
1750 created_at: queued.created_at,
1751 },
1752 send_error: queued.error,
1753 },
1754
1755 QueuedRequestKind::MediaUpload { .. } => {
1756 return None;
1759 }
1760 },
1761 })
1762 });
1763
1764 let reactions_and_medias = store
1765 .load_dependent_queued_requests(&self.room_id)
1766 .await?
1767 .into_iter()
1768 .filter_map(|dep| match dep.kind {
1769 DependentQueuedRequestKind::EditEvent { .. }
1770 | DependentQueuedRequestKind::RedactEvent => {
1771 None
1773 }
1774
1775 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1776 transaction_id: dep.own_transaction_id.clone().into(),
1777 content: LocalEchoContent::React {
1778 key,
1779 send_handle: SendReactionHandle {
1780 room: room.clone(),
1781 transaction_id: dep.own_transaction_id,
1782 },
1783 applies_to: dep.parent_transaction_id,
1784 },
1785 }),
1786
1787 DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1788 None
1790 }
1791
1792 DependentQueuedRequestKind::FinishUpload {
1793 local_echo,
1794 file_upload,
1795 thumbnail_info,
1796 } => {
1797 Some(LocalEcho {
1799 transaction_id: dep.own_transaction_id.clone().into(),
1800 content: LocalEchoContent::Event {
1801 serialized_event: SerializableEventContent::new(&(*local_echo).into())
1802 .ok()?,
1803 send_handle: SendHandle {
1804 room: room.clone(),
1805 transaction_id: dep.own_transaction_id.into(),
1806 media_handles: vec![MediaHandles {
1807 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1808 upload_file_txn: file_upload,
1809 }],
1810 created_at: dep.created_at,
1811 },
1812 send_error: None,
1813 },
1814 })
1815 }
1816
1817 #[cfg(feature = "unstable-msc4274")]
1818 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1819 self.create_gallery_local_echo(
1821 dep.own_transaction_id,
1822 room,
1823 dep.created_at,
1824 local_echo,
1825 item_infos,
1826 )
1827 }
1828 });
1829
1830 Ok(local_requests.chain(reactions_and_medias).collect())
1831 }
1832
1833 #[cfg(feature = "unstable-msc4274")]
1835 fn create_gallery_local_echo(
1836 &self,
1837 transaction_id: ChildTransactionId,
1838 room: &RoomSendQueue,
1839 created_at: MilliSecondsSinceUnixEpoch,
1840 local_echo: Box<RoomMessageEventContent>,
1841 item_infos: Vec<FinishGalleryItemInfo>,
1842 ) -> Option<LocalEcho> {
1843 Some(LocalEcho {
1844 transaction_id: transaction_id.clone().into(),
1845 content: LocalEchoContent::Event {
1846 serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1847 send_handle: SendHandle {
1848 room: room.clone(),
1849 transaction_id: transaction_id.into(),
1850 media_handles: item_infos
1851 .into_iter()
1852 .map(|i| MediaHandles {
1853 upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1854 upload_file_txn: i.file_upload,
1855 })
1856 .collect(),
1857 created_at,
1858 },
1859 send_error: None,
1860 },
1861 })
1862 }
1863
1864 #[instrument(skip_all)]
1872 async fn try_apply_single_dependent_request(
1873 &self,
1874 client: &Client,
1875 dependent_request: DependentQueuedRequest,
1876 new_updates: &mut Vec<RoomSendQueueUpdate>,
1877 ) -> Result<bool, RoomSendQueueError> {
1878 let store = client.state_store();
1879
1880 let parent_key = dependent_request.parent_key;
1881
1882 match dependent_request.kind {
1883 DependentQueuedRequestKind::EditEvent { new_content } => {
1884 if let Some(parent_key) = parent_key {
1885 let Some(event_id) = parent_key.into_event_id() else {
1886 return Err(RoomSendQueueError::StorageError(
1887 RoomSendQueueStorageError::InvalidParentKey,
1888 ));
1889 };
1890
1891 let room = client
1893 .get_room(&self.room_id)
1894 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1895
1896 let edited_content = match new_content.deserialize() {
1900 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1901 EditedContent::RoomMessage(c.into())
1903 }
1904
1905 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1906 let poll_start = c.poll_start().clone();
1907 EditedContent::PollStart {
1908 fallback_text: poll_start.question.text.clone(),
1909 new_content: poll_start,
1910 }
1911 }
1912
1913 Ok(c) => {
1914 warn!("Unsupported edit content type: {:?}", c.event_type());
1915 return Ok(true);
1916 }
1917
1918 Err(err) => {
1919 warn!("Unable to deserialize: {err}");
1920 return Ok(true);
1921 }
1922 };
1923
1924 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1925 Ok(e) => e,
1926 Err(err) => {
1927 warn!("couldn't create edited event: {err}");
1928 return Ok(true);
1929 }
1930 };
1931
1932 let serializable = SerializableEventContent::from_raw(
1934 Raw::new(&edit_event)
1935 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1936 edit_event.event_type().to_string(),
1937 );
1938
1939 store
1940 .save_send_queue_request(
1941 &self.room_id,
1942 dependent_request.own_transaction_id.into(),
1943 dependent_request.created_at,
1944 serializable.into(),
1945 Self::HIGH_PRIORITY,
1946 )
1947 .await
1948 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1949 } else {
1950 let edited = store
1952 .update_send_queue_request(
1953 &self.room_id,
1954 &dependent_request.parent_transaction_id,
1955 new_content.into(),
1956 )
1957 .await
1958 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1959
1960 if !edited {
1961 warn!("missing local echo upon dependent edit");
1962 }
1963 }
1964 }
1965
1966 DependentQueuedRequestKind::RedactEvent => {
1967 if let Some(parent_key) = parent_key {
1968 let Some(event_id) = parent_key.into_event_id() else {
1969 return Err(RoomSendQueueError::StorageError(
1970 RoomSendQueueStorageError::InvalidParentKey,
1971 ));
1972 };
1973
1974 let room = client
1976 .get_room(&self.room_id)
1977 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1978
1979 if let Err(err) = room
1987 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1988 .await
1989 {
1990 warn!("error when sending a redact for {event_id}: {err}");
1991 return Ok(false);
1992 }
1993 } else {
1994 let removed = store
1997 .remove_send_queue_request(
1998 &self.room_id,
1999 &dependent_request.parent_transaction_id,
2000 )
2001 .await
2002 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2003
2004 if !removed {
2005 warn!("missing local echo upon dependent redact");
2006 }
2007 }
2008 }
2009
2010 DependentQueuedRequestKind::ReactEvent { key } => {
2011 if let Some(parent_key) = parent_key {
2012 let Some(parent_event_id) = parent_key.into_event_id() else {
2013 return Err(RoomSendQueueError::StorageError(
2014 RoomSendQueueStorageError::InvalidParentKey,
2015 ));
2016 };
2017
2018 let react_event =
2020 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
2021 let serializable = SerializableEventContent::from_raw(
2022 Raw::new(&react_event)
2023 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2024 react_event.event_type().to_string(),
2025 );
2026
2027 store
2028 .save_send_queue_request(
2029 &self.room_id,
2030 dependent_request.own_transaction_id.into(),
2031 dependent_request.created_at,
2032 serializable.into(),
2033 Self::HIGH_PRIORITY,
2034 )
2035 .await
2036 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2037 } else {
2038 return Ok(false);
2040 }
2041 }
2042
2043 DependentQueuedRequestKind::UploadFileOrThumbnail {
2044 content_type,
2045 cache_key,
2046 related_to,
2047 parent_is_thumbnail_upload,
2048 } => {
2049 let Some(parent_key) = parent_key else {
2050 return Ok(false);
2052 };
2053 self.handle_dependent_file_or_thumbnail_upload(
2054 client,
2055 dependent_request.own_transaction_id.into(),
2056 parent_key,
2057 content_type,
2058 cache_key,
2059 related_to,
2060 parent_is_thumbnail_upload,
2061 )
2062 .await?;
2063 }
2064
2065 DependentQueuedRequestKind::FinishUpload {
2066 local_echo,
2067 file_upload,
2068 thumbnail_info,
2069 } => {
2070 let Some(parent_key) = parent_key else {
2071 return Ok(false);
2073 };
2074 self.handle_dependent_finish_upload(
2075 client,
2076 dependent_request.own_transaction_id.into(),
2077 parent_key,
2078 *local_echo,
2079 file_upload,
2080 thumbnail_info,
2081 new_updates,
2082 )
2083 .await?;
2084 }
2085
2086 #[cfg(feature = "unstable-msc4274")]
2087 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2088 let Some(parent_key) = parent_key else {
2089 return Ok(false);
2091 };
2092 self.handle_dependent_finish_gallery_upload(
2093 client,
2094 dependent_request.own_transaction_id.into(),
2095 parent_key,
2096 *local_echo,
2097 item_infos,
2098 new_updates,
2099 )
2100 .await?;
2101 }
2102 }
2103
2104 Ok(true)
2105 }
2106
2107 #[instrument(skip(self))]
2108 async fn apply_dependent_requests(
2109 &self,
2110 new_updates: &mut Vec<RoomSendQueueUpdate>,
2111 ) -> Result<(), RoomSendQueueError> {
2112 let guard = self.store.lock().await;
2113
2114 let client = guard.client()?;
2115 let store = client.state_store();
2116
2117 let dependent_requests = store
2118 .load_dependent_queued_requests(&self.room_id)
2119 .await
2120 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2121
2122 let num_initial_dependent_requests = dependent_requests.len();
2123 if num_initial_dependent_requests == 0 {
2124 return Ok(());
2126 }
2127
2128 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2129
2130 for original in &dependent_requests {
2132 if !canonicalized_dependent_requests
2133 .iter()
2134 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2135 {
2136 store
2137 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2138 .await
2139 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2140 }
2141 }
2142
2143 let mut num_dependent_requests = canonicalized_dependent_requests.len();
2144
2145 debug!(
2146 num_dependent_requests,
2147 num_initial_dependent_requests, "starting handling of dependent requests"
2148 );
2149
2150 for dependent in canonicalized_dependent_requests {
2151 let dependent_id = dependent.own_transaction_id.clone();
2152
2153 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2154 Ok(should_remove) => {
2155 if should_remove {
2156 store
2158 .remove_dependent_queued_request(&self.room_id, &dependent_id)
2159 .await
2160 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2161
2162 num_dependent_requests -= 1;
2163 }
2164 }
2165
2166 Err(err) => {
2167 warn!("error when applying single dependent request: {err}");
2168 }
2169 }
2170 }
2171
2172 debug!(
2173 leftover_dependent_requests = num_dependent_requests,
2174 "stopped handling dependent request"
2175 );
2176
2177 Ok(())
2178 }
2179
2180 async fn remove_dependent_send_queue_request(
2182 &self,
2183 dependent_event_id: &ChildTransactionId,
2184 ) -> Result<bool, RoomSendQueueStorageError> {
2185 Ok(self
2186 .store
2187 .lock()
2188 .await
2189 .client()?
2190 .state_store()
2191 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2192 .await?)
2193 }
2194}
2195
2196#[cfg(feature = "unstable-msc4274")]
2197struct GalleryItemQueueInfo {
2199 content_type: Mime,
2200 upload_file_txn: OwnedTransactionId,
2201 file_media_request: MediaRequestParameters,
2202 thumbnail: Option<QueueThumbnailInfo>,
2203}
2204
2205#[derive(Clone, Debug)]
2207pub enum LocalEchoContent {
2208 Event {
2210 serialized_event: SerializableEventContent,
2213 send_handle: SendHandle,
2215 send_error: Option<QueueWedgeError>,
2218 },
2219
2220 React {
2222 key: String,
2224 send_handle: SendReactionHandle,
2226 applies_to: OwnedTransactionId,
2228 },
2229}
2230
2231#[derive(Clone, Debug)]
2234pub struct LocalEcho {
2235 pub transaction_id: OwnedTransactionId,
2237 pub content: LocalEchoContent,
2239}
2240
2241#[derive(Clone, Debug)]
2244pub enum RoomSendQueueUpdate {
2245 NewLocalEvent(LocalEcho),
2250
2251 CancelledLocalEvent {
2254 transaction_id: OwnedTransactionId,
2256 },
2257
2258 ReplacedLocalEvent {
2260 transaction_id: OwnedTransactionId,
2262
2263 new_content: SerializableEventContent,
2265 },
2266
2267 SendError {
2272 transaction_id: OwnedTransactionId,
2274 error: Arc<crate::Error>,
2276 is_recoverable: bool,
2282 },
2283
2284 RetryEvent {
2286 transaction_id: OwnedTransactionId,
2288 },
2289
2290 SentEvent {
2293 transaction_id: OwnedTransactionId,
2295 event_id: OwnedEventId,
2297 },
2298
2299 MediaUpload {
2302 related_to: OwnedTransactionId,
2304
2305 file: Option<MediaSource>,
2307
2308 index: u64,
2312
2313 progress: AbstractProgress,
2317 },
2318}
2319
2320#[derive(Clone, Debug)]
2325pub struct SendQueueUpdate {
2326 pub room_id: OwnedRoomId,
2328
2329 pub update: RoomSendQueueUpdate,
2331}
2332
2333#[derive(Debug, thiserror::Error)]
2335pub enum RoomSendQueueError {
2336 #[error("the room isn't in the joined state")]
2338 RoomNotJoined,
2339
2340 #[error("the room is now missing from the client")]
2344 RoomDisappeared,
2345
2346 #[error(transparent)]
2348 StorageError(#[from] RoomSendQueueStorageError),
2349
2350 #[error("the attachment event could not be created")]
2352 FailedToCreateAttachment,
2353
2354 #[cfg(feature = "unstable-msc4274")]
2356 #[error("the gallery contains no items")]
2357 EmptyGallery,
2358
2359 #[cfg(feature = "unstable-msc4274")]
2361 #[error("the gallery event could not be created")]
2362 FailedToCreateGallery,
2363}
2364
2365#[derive(Debug, thiserror::Error)]
2367pub enum RoomSendQueueStorageError {
2368 #[error(transparent)]
2370 StateStoreError(#[from] StoreError),
2371
2372 #[error(transparent)]
2374 EventCacheStoreError(#[from] EventCacheStoreError),
2375
2376 #[error(transparent)]
2378 MediaStoreError(#[from] MediaStoreError),
2379
2380 #[error(transparent)]
2382 LockError(#[from] CrossProcessLockError),
2383
2384 #[error(transparent)]
2386 JsonSerialization(#[from] serde_json::Error),
2387
2388 #[error("a dependent event had an invalid parent key type")]
2391 InvalidParentKey,
2392
2393 #[error("The client is shutting down.")]
2395 ClientShuttingDown,
2396
2397 #[error("This operation is not implemented for media uploads")]
2399 OperationNotImplementedYet,
2400
2401 #[error("Can't edit a media caption when the underlying event isn't a media")]
2403 InvalidMediaCaptionEdit,
2404}
2405
2406#[derive(Clone, Debug)]
2408struct MediaHandles {
2409 upload_thumbnail_txn: Option<OwnedTransactionId>,
2413
2414 upload_file_txn: OwnedTransactionId,
2416}
2417
2418#[derive(Clone, Debug)]
2420pub struct SendHandle {
2421 room: RoomSendQueue,
2423
2424 transaction_id: OwnedTransactionId,
2429
2430 media_handles: Vec<MediaHandles>,
2432
2433 pub created_at: MilliSecondsSinceUnixEpoch,
2435}
2436
2437impl SendHandle {
2438 #[cfg(test)]
2440 pub(crate) fn new(
2441 room: RoomSendQueue,
2442 transaction_id: OwnedTransactionId,
2443 created_at: MilliSecondsSinceUnixEpoch,
2444 ) -> Self {
2445 Self { room, transaction_id, media_handles: vec![], created_at }
2446 }
2447
2448 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2449 if !self.media_handles.is_empty() {
2450 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2451 } else {
2452 Ok(())
2453 }
2454 }
2455
2456 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2461 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2462 trace!("received an abort request");
2463
2464 let queue = &self.room.inner.queue;
2465
2466 for handles in &self.media_handles {
2467 if queue.abort_upload(&self.transaction_id, handles).await? {
2468 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2470 transaction_id: self.transaction_id.clone(),
2471 });
2472
2473 return Ok(true);
2474 }
2475
2476 }
2480
2481 if queue.cancel_event(&self.transaction_id).await? {
2482 trace!("successful abort");
2483
2484 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2486 transaction_id: self.transaction_id.clone(),
2487 });
2488
2489 Ok(true)
2490 } else {
2491 debug!("local echo didn't exist anymore, can't abort");
2492 Ok(false)
2493 }
2494 }
2495
2496 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2501 pub async fn edit_raw(
2502 &self,
2503 new_content: Raw<AnyMessageLikeEventContent>,
2504 event_type: String,
2505 ) -> Result<bool, RoomSendQueueStorageError> {
2506 trace!("received an edit request");
2507 self.nyi_for_uploads()?;
2508
2509 let serializable = SerializableEventContent::from_raw(new_content, event_type);
2510
2511 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2512 trace!("successful edit");
2513
2514 self.room.inner.notifier.notify_one();
2516
2517 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2519 transaction_id: self.transaction_id.clone(),
2520 new_content: serializable,
2521 });
2522
2523 Ok(true)
2524 } else {
2525 debug!("local echo doesn't exist anymore, can't edit");
2526 Ok(false)
2527 }
2528 }
2529
2530 pub async fn edit(
2535 &self,
2536 new_content: AnyMessageLikeEventContent,
2537 ) -> Result<bool, RoomSendQueueStorageError> {
2538 self.edit_raw(
2539 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2540 new_content.event_type().to_string(),
2541 )
2542 .await
2543 }
2544
2545 pub async fn edit_media_caption(
2550 &self,
2551 caption: Option<String>,
2552 formatted_caption: Option<FormattedBody>,
2553 mentions: Option<Mentions>,
2554 ) -> Result<bool, RoomSendQueueStorageError> {
2555 if let Some(new_content) = self
2556 .room
2557 .inner
2558 .queue
2559 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2560 .await?
2561 {
2562 trace!("successful edit of media caption");
2563
2564 self.room.inner.notifier.notify_one();
2566
2567 let new_content = SerializableEventContent::new(&new_content)
2568 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2569
2570 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2572 transaction_id: self.transaction_id.clone(),
2573 new_content,
2574 });
2575
2576 Ok(true)
2577 } else {
2578 debug!("local echo doesn't exist anymore, can't edit media caption");
2579 Ok(false)
2580 }
2581 }
2582
2583 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2586 let room = &self.room.inner;
2587 room.queue
2588 .mark_as_unwedged(&self.transaction_id)
2589 .await
2590 .map_err(RoomSendQueueError::StorageError)?;
2591
2592 for handles in &self.media_handles {
2600 room.queue
2601 .mark_as_unwedged(&handles.upload_file_txn)
2602 .await
2603 .map_err(RoomSendQueueError::StorageError)?;
2604
2605 if let Some(txn) = &handles.upload_thumbnail_txn {
2606 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2607 }
2608 }
2609
2610 room.notifier.notify_one();
2612
2613 self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2614 transaction_id: self.transaction_id.clone(),
2615 });
2616
2617 Ok(())
2618 }
2619
2620 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2625 pub async fn react(
2626 &self,
2627 key: String,
2628 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2629 trace!("received an intent to react");
2630
2631 let created_at = MilliSecondsSinceUnixEpoch::now();
2632 if let Some(reaction_txn_id) =
2633 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2634 {
2635 trace!("successfully queued react");
2636
2637 self.room.inner.notifier.notify_one();
2639
2640 let send_handle = SendReactionHandle {
2642 room: self.room.clone(),
2643 transaction_id: reaction_txn_id.clone(),
2644 };
2645
2646 self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2647 transaction_id: reaction_txn_id.into(),
2650 content: LocalEchoContent::React {
2651 key,
2652 send_handle: send_handle.clone(),
2653 applies_to: self.transaction_id.clone(),
2654 },
2655 }));
2656
2657 Ok(Some(send_handle))
2658 } else {
2659 debug!("local echo doesn't exist anymore, can't react");
2660 Ok(None)
2661 }
2662 }
2663}
2664
2665#[derive(Clone, Debug)]
2667pub struct SendReactionHandle {
2668 room: RoomSendQueue,
2670 transaction_id: ChildTransactionId,
2672}
2673
2674impl SendReactionHandle {
2675 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2680 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2681 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2685 transaction_id: self.transaction_id.clone().into(),
2686 });
2687
2688 return Ok(true);
2689 }
2690
2691 let handle = SendHandle {
2694 room: self.room.clone(),
2695 transaction_id: self.transaction_id.clone().into(),
2696 media_handles: vec![],
2697 created_at: MilliSecondsSinceUnixEpoch::now(),
2698 };
2699
2700 handle.abort().await
2701 }
2702
2703 pub fn transaction_id(&self) -> &TransactionId {
2705 &self.transaction_id
2706 }
2707}
2708
2709fn canonicalize_dependent_requests(
2713 dependent: &[DependentQueuedRequest],
2714) -> Vec<DependentQueuedRequest> {
2715 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2716
2717 for d in dependent {
2718 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2719
2720 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2721 continue;
2724 }
2725
2726 match &d.kind {
2727 DependentQueuedRequestKind::EditEvent { .. } => {
2728 if let Some(prev_edit) = prevs
2730 .iter_mut()
2731 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2732 {
2733 *prev_edit = d;
2734 } else {
2735 prevs.insert(0, d);
2736 }
2737 }
2738
2739 DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2740 | DependentQueuedRequestKind::FinishUpload { .. }
2741 | DependentQueuedRequestKind::ReactEvent { .. } => {
2742 prevs.push(d);
2744 }
2745
2746 #[cfg(feature = "unstable-msc4274")]
2747 DependentQueuedRequestKind::FinishGallery { .. } => {
2748 prevs.push(d);
2750 }
2751
2752 DependentQueuedRequestKind::RedactEvent => {
2753 prevs.clear();
2755 prevs.push(d);
2756 }
2757 }
2758 }
2759
2760 by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2761}
2762
2763#[cfg(all(test, not(target_family = "wasm")))]
2764mod tests {
2765 use std::{sync::Arc, time::Duration};
2766
2767 use assert_matches2::{assert_let, assert_matches};
2768 use matrix_sdk_base::store::{
2769 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2770 SerializableEventContent,
2771 };
2772 use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
2773 use ruma::{
2774 MilliSecondsSinceUnixEpoch, TransactionId,
2775 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
2776 room_id,
2777 };
2778
2779 use super::canonicalize_dependent_requests;
2780 use crate::{client::WeakClient, test_utils::logged_in_client};
2781
2782 #[test]
2783 fn test_canonicalize_dependent_events_created_at() {
2784 let txn = TransactionId::new();
2787 let created_at = MilliSecondsSinceUnixEpoch::now();
2788
2789 let edit = DependentQueuedRequest {
2790 own_transaction_id: ChildTransactionId::new(),
2791 parent_transaction_id: txn.clone(),
2792 kind: DependentQueuedRequestKind::EditEvent {
2793 new_content: SerializableEventContent::new(
2794 &RoomMessageEventContent::text_plain("edit").into(),
2795 )
2796 .unwrap(),
2797 },
2798 parent_key: None,
2799 created_at,
2800 };
2801
2802 let res = canonicalize_dependent_requests(&[edit]);
2803
2804 assert_eq!(res.len(), 1);
2805 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2806 assert_let!(
2807 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2808 );
2809 assert_eq!(msg.body(), "edit");
2810 assert_eq!(res[0].parent_transaction_id, txn);
2811 assert_eq!(res[0].created_at, created_at);
2812 }
2813
2814 #[async_test]
2815 async fn test_client_no_cycle_with_send_queue() {
2816 for enabled in [true, false] {
2817 let client = logged_in_client(None).await;
2818 let weak_client = WeakClient::from_client(&client);
2819
2820 {
2821 let mut sync_response_builder = SyncResponseBuilder::new();
2822
2823 let room_id = room_id!("!a:b.c");
2824
2825 client
2827 .base_client()
2828 .receive_sync_response(
2829 sync_response_builder
2830 .add_joined_room(JoinedRoomBuilder::new(room_id))
2831 .build_sync_response(),
2832 )
2833 .await
2834 .unwrap();
2835
2836 let room = client.get_room(room_id).unwrap();
2837 let q = room.send_queue();
2838
2839 let _watcher = q.subscribe().await;
2840
2841 client.send_queue().set_enabled(enabled).await;
2842 }
2843
2844 drop(client);
2845
2846 tokio::time::sleep(Duration::from_millis(500)).await;
2848
2849 let client = weak_client.get();
2851 assert!(
2852 client.is_none(),
2853 "too many strong references to the client: {}",
2854 Arc::strong_count(&client.unwrap().inner)
2855 );
2856 }
2857 }
2858
2859 #[test]
2860 fn test_canonicalize_dependent_events_smoke_test() {
2861 let txn = TransactionId::new();
2863
2864 let edit = DependentQueuedRequest {
2865 own_transaction_id: ChildTransactionId::new(),
2866 parent_transaction_id: txn.clone(),
2867 kind: DependentQueuedRequestKind::EditEvent {
2868 new_content: SerializableEventContent::new(
2869 &RoomMessageEventContent::text_plain("edit").into(),
2870 )
2871 .unwrap(),
2872 },
2873 parent_key: None,
2874 created_at: MilliSecondsSinceUnixEpoch::now(),
2875 };
2876 let res = canonicalize_dependent_requests(&[edit]);
2877
2878 assert_eq!(res.len(), 1);
2879 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2880 assert_eq!(res[0].parent_transaction_id, txn);
2881 assert!(res[0].parent_key.is_none());
2882 }
2883
2884 #[test]
2885 fn test_canonicalize_dependent_events_redaction_preferred() {
2886 let txn = TransactionId::new();
2888
2889 let mut inputs = Vec::with_capacity(100);
2890 let redact = DependentQueuedRequest {
2891 own_transaction_id: ChildTransactionId::new(),
2892 parent_transaction_id: txn.clone(),
2893 kind: DependentQueuedRequestKind::RedactEvent,
2894 parent_key: None,
2895 created_at: MilliSecondsSinceUnixEpoch::now(),
2896 };
2897
2898 let edit = DependentQueuedRequest {
2899 own_transaction_id: ChildTransactionId::new(),
2900 parent_transaction_id: txn.clone(),
2901 kind: DependentQueuedRequestKind::EditEvent {
2902 new_content: SerializableEventContent::new(
2903 &RoomMessageEventContent::text_plain("edit").into(),
2904 )
2905 .unwrap(),
2906 },
2907 parent_key: None,
2908 created_at: MilliSecondsSinceUnixEpoch::now(),
2909 };
2910
2911 inputs.push({
2912 let mut edit = edit.clone();
2913 edit.own_transaction_id = ChildTransactionId::new();
2914 edit
2915 });
2916
2917 inputs.push(redact);
2918
2919 for _ in 0..98 {
2920 let mut edit = edit.clone();
2921 edit.own_transaction_id = ChildTransactionId::new();
2922 inputs.push(edit);
2923 }
2924
2925 let res = canonicalize_dependent_requests(&inputs);
2926
2927 assert_eq!(res.len(), 1);
2928 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2929 assert_eq!(res[0].parent_transaction_id, txn);
2930 }
2931
2932 #[test]
2933 fn test_canonicalize_dependent_events_last_edit_preferred() {
2934 let parent_txn = TransactionId::new();
2935
2936 let inputs = (0..10)
2938 .map(|i| DependentQueuedRequest {
2939 own_transaction_id: ChildTransactionId::new(),
2940 parent_transaction_id: parent_txn.clone(),
2941 kind: DependentQueuedRequestKind::EditEvent {
2942 new_content: SerializableEventContent::new(
2943 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2944 )
2945 .unwrap(),
2946 },
2947 parent_key: None,
2948 created_at: MilliSecondsSinceUnixEpoch::now(),
2949 })
2950 .collect::<Vec<_>>();
2951
2952 let txn = inputs[9].parent_transaction_id.clone();
2953
2954 let res = canonicalize_dependent_requests(&inputs);
2955
2956 assert_eq!(res.len(), 1);
2957 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2958 assert_let!(
2959 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2960 );
2961 assert_eq!(msg.body(), "edit9");
2962 assert_eq!(res[0].parent_transaction_id, txn);
2963 }
2964
2965 #[test]
2966 fn test_canonicalize_multiple_local_echoes() {
2967 let txn1 = TransactionId::new();
2968 let txn2 = TransactionId::new();
2969
2970 let child1 = ChildTransactionId::new();
2971 let child2 = ChildTransactionId::new();
2972
2973 let inputs = vec![
2974 DependentQueuedRequest {
2976 own_transaction_id: child1.clone(),
2977 kind: DependentQueuedRequestKind::RedactEvent,
2978 parent_transaction_id: txn1.clone(),
2979 parent_key: None,
2980 created_at: MilliSecondsSinceUnixEpoch::now(),
2981 },
2982 DependentQueuedRequest {
2984 own_transaction_id: child2,
2985 kind: DependentQueuedRequestKind::EditEvent {
2986 new_content: SerializableEventContent::new(
2987 &RoomMessageEventContent::text_plain("edit").into(),
2988 )
2989 .unwrap(),
2990 },
2991 parent_transaction_id: txn2.clone(),
2992 parent_key: None,
2993 created_at: MilliSecondsSinceUnixEpoch::now(),
2994 },
2995 ];
2996
2997 let res = canonicalize_dependent_requests(&inputs);
2998
2999 assert_eq!(res.len(), 2);
3001
3002 for dependent in res {
3003 if dependent.own_transaction_id == child1 {
3004 assert_eq!(dependent.parent_transaction_id, txn1);
3005 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
3006 } else {
3007 assert_eq!(dependent.parent_transaction_id, txn2);
3008 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
3009 }
3010 }
3011 }
3012
3013 #[test]
3014 fn test_canonicalize_reactions_after_edits() {
3015 let txn = TransactionId::new();
3017
3018 let react_id = ChildTransactionId::new();
3019 let react = DependentQueuedRequest {
3020 own_transaction_id: react_id.clone(),
3021 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
3022 parent_transaction_id: txn.clone(),
3023 parent_key: None,
3024 created_at: MilliSecondsSinceUnixEpoch::now(),
3025 };
3026
3027 let edit_id = ChildTransactionId::new();
3028 let edit = DependentQueuedRequest {
3029 own_transaction_id: edit_id.clone(),
3030 kind: DependentQueuedRequestKind::EditEvent {
3031 new_content: SerializableEventContent::new(
3032 &RoomMessageEventContent::text_plain("edit").into(),
3033 )
3034 .unwrap(),
3035 },
3036 parent_transaction_id: txn,
3037 parent_key: None,
3038 created_at: MilliSecondsSinceUnixEpoch::now(),
3039 };
3040
3041 let res = canonicalize_dependent_requests(&[react, edit]);
3042
3043 assert_eq!(res.len(), 2);
3044 assert_eq!(res[0].own_transaction_id, edit_id);
3045 assert_eq!(res[1].own_transaction_id, react_id);
3046 }
3047}