1use std::{
132 collections::{BTreeMap, HashMap},
133 str::FromStr as _,
134 sync::{
135 atomic::{AtomicBool, Ordering},
136 Arc, RwLock,
137 },
138};
139
140use eyeball::SharedObservable;
141#[cfg(feature = "unstable-msc4274")]
142use matrix_sdk_base::store::FinishGalleryItemInfo;
143use matrix_sdk_base::{
144 event_cache::store::EventCacheStoreError,
145 media::MediaRequestParameters,
146 store::{
147 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
148 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
149 SentMediaInfo, SentRequestKey, SerializableEventContent,
150 },
151 store_locks::LockStoreError,
152 RoomState, StoreError,
153};
154use matrix_sdk_common::{
155 executor::{spawn, JoinHandle},
156 locks::Mutex as SyncMutex,
157};
158use mime::Mime;
159use ruma::{
160 events::{
161 reaction::ReactionEventContent,
162 relation::Annotation,
163 room::{
164 message::{FormattedBody, RoomMessageEventContent},
165 MediaSource,
166 },
167 AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
168 },
169 serde::Raw,
170 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
171 TransactionId,
172};
173use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
174use tracing::{debug, error, info, instrument, trace, warn};
175
176#[cfg(feature = "e2e-encryption")]
177use crate::crypto::{OlmError, SessionRecipientCollectionError};
178use crate::{
179 client::WeakClient,
180 config::RequestConfig,
181 error::RetryKind,
182 room::{edit::EditedContent, WeakRoom},
183 Client, Media, Room, TransmissionProgress,
184};
185
186mod progress;
187mod upload;
188
189pub use progress::AbstractProgress;
190
191pub struct SendQueue {
193 client: Client,
194}
195
196#[cfg(not(tarpaulin_include))]
197impl std::fmt::Debug for SendQueue {
198 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("SendQueue").finish_non_exhaustive()
200 }
201}
202
203impl SendQueue {
204 pub(super) fn new(client: Client) -> Self {
205 Self { client }
206 }
207
208 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
211 if !self.is_enabled() {
212 return;
213 }
214
215 let room_ids =
216 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
217 |err| {
218 warn!("error when loading rooms with unsent requests: {err}");
219 Vec::new()
220 },
221 );
222
223 for room_id in room_ids {
225 if let Some(room) = self.client.get_room(&room_id) {
226 let _ = self.for_room(room);
227 }
228 }
229 }
230
231 #[inline(always)]
233 fn data(&self) -> &SendQueueData {
234 &self.client.inner.send_queue_data
235 }
236
237 pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
240 let data = self.data();
241
242 let mut map = data.rooms.write().unwrap();
243
244 let room_id = room.room_id();
245 if let Some(room_q) = map.get(room_id).cloned() {
246 return room_q;
247 }
248
249 let owned_room_id = room_id.to_owned();
250 let room_q = RoomSendQueue::new(
251 self.is_enabled(),
252 data.global_update_sender.clone(),
253 data.error_sender.clone(),
254 data.is_dropping.clone(),
255 &self.client,
256 owned_room_id.clone(),
257 data.report_media_upload_progress.clone(),
258 );
259
260 map.insert(owned_room_id, room_q.clone());
261
262 room_q
263 }
264
265 pub async fn set_enabled(&self, enabled: bool) {
275 debug!(?enabled, "setting global send queue enablement");
276
277 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
278
279 for room in self.data().rooms.read().unwrap().values() {
281 room.set_enabled(enabled);
282 }
283
284 self.respawn_tasks_for_rooms_with_unsent_requests().await;
287 }
288
289 pub fn is_enabled(&self) -> bool {
292 self.data().globally_enabled.load(Ordering::SeqCst)
293 }
294
295 pub fn enable_upload_progress(&self, enabled: bool) {
297 self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
298 }
299
300 pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
305 self.data().global_update_sender.subscribe()
306 }
307
308 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
311 self.data().error_sender.subscribe()
312 }
313}
314
315#[derive(Clone, Debug)]
318struct QueueThumbnailInfo {
319 finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
321
322 media_request_parameters: MediaRequestParameters,
324
325 content_type: Mime,
327
328 file_size: usize,
330}
331
332#[derive(Clone, Debug)]
334pub struct SendQueueRoomError {
335 pub room_id: OwnedRoomId,
337
338 pub error: Arc<crate::Error>,
340
341 pub is_recoverable: bool,
347}
348
349impl Client {
350 pub fn send_queue(&self) -> SendQueue {
353 SendQueue::new(self.clone())
354 }
355}
356
357pub(super) struct SendQueueData {
358 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
360
361 globally_enabled: AtomicBool,
366
367 global_update_sender: broadcast::Sender<SendQueueUpdate>,
371
372 error_sender: broadcast::Sender<SendQueueRoomError>,
374
375 is_dropping: Arc<AtomicBool>,
377
378 report_media_upload_progress: Arc<AtomicBool>,
380}
381
382impl SendQueueData {
383 pub fn new(globally_enabled: bool) -> Self {
385 let (global_update_sender, _) = broadcast::channel(32);
386 let (error_sender, _) = broadcast::channel(32);
387
388 Self {
389 rooms: Default::default(),
390 globally_enabled: AtomicBool::new(globally_enabled),
391 global_update_sender,
392 error_sender,
393 is_dropping: Arc::new(false.into()),
394 report_media_upload_progress: Arc::new(false.into()),
395 }
396 }
397}
398
399impl Drop for SendQueueData {
400 fn drop(&mut self) {
401 debug!("globally dropping the send queue");
404 self.is_dropping.store(true, Ordering::SeqCst);
405
406 let rooms = self.rooms.read().unwrap();
407 for room in rooms.values() {
408 room.inner.notifier.notify_one();
409 }
410 }
411}
412
413impl Room {
414 pub fn send_queue(&self) -> RoomSendQueue {
416 self.client.send_queue().for_room(self.clone())
417 }
418}
419
420#[derive(Clone)]
424pub struct RoomSendQueue {
425 inner: Arc<RoomSendQueueInner>,
426}
427
428#[cfg(not(tarpaulin_include))]
429impl std::fmt::Debug for RoomSendQueue {
430 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
432 }
433}
434
435impl RoomSendQueue {
436 fn new(
437 globally_enabled: bool,
438 global_update_sender: broadcast::Sender<SendQueueUpdate>,
439 global_error_sender: broadcast::Sender<SendQueueRoomError>,
440 is_dropping: Arc<AtomicBool>,
441 client: &Client,
442 room_id: OwnedRoomId,
443 report_media_upload_progress: Arc<AtomicBool>,
444 ) -> Self {
445 let (update_sender, _) = broadcast::channel(32);
446
447 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
448 let notifier = Arc::new(Notify::new());
449
450 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
451 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
452
453 let task = spawn(Self::sending_task(
454 weak_room.clone(),
455 queue.clone(),
456 notifier.clone(),
457 global_update_sender.clone(),
458 update_sender.clone(),
459 locally_enabled.clone(),
460 global_error_sender,
461 is_dropping,
462 report_media_upload_progress,
463 ));
464
465 Self {
466 inner: Arc::new(RoomSendQueueInner {
467 room: weak_room,
468 global_update_sender,
469 update_sender,
470 _task: task,
471 queue,
472 notifier,
473 locally_enabled,
474 }),
475 }
476 }
477
478 pub async fn send_raw(
493 &self,
494 content: Raw<AnyMessageLikeEventContent>,
495 event_type: String,
496 ) -> Result<SendHandle, RoomSendQueueError> {
497 let Some(room) = self.inner.room.get() else {
498 return Err(RoomSendQueueError::RoomDisappeared);
499 };
500 if room.state() != RoomState::Joined {
501 return Err(RoomSendQueueError::RoomNotJoined);
502 }
503
504 let content = SerializableEventContent::from_raw(content, event_type);
505
506 let created_at = MilliSecondsSinceUnixEpoch::now();
507 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
508 trace!(%transaction_id, "manager sends a raw event to the background task");
509
510 self.inner.notifier.notify_one();
511
512 let send_handle = SendHandle {
513 room: self.clone(),
514 transaction_id: transaction_id.clone(),
515 media_handles: vec![],
516 created_at,
517 };
518
519 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
520 transaction_id,
521 content: LocalEchoContent::Event {
522 serialized_event: content,
523 send_handle: send_handle.clone(),
524 send_error: None,
525 },
526 }));
527
528 Ok(send_handle)
529 }
530
531 pub async fn send(
546 &self,
547 content: AnyMessageLikeEventContent,
548 ) -> Result<SendHandle, RoomSendQueueError> {
549 self.send_raw(
550 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
551 content.event_type().to_string(),
552 )
553 .await
554 }
555
556 pub async fn subscribe(
562 &self,
563 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
564 {
565 let local_echoes = self.inner.queue.local_echoes(self).await?;
566
567 Ok((local_echoes, self.inner.update_sender.subscribe()))
568 }
569
570 #[allow(clippy::too_many_arguments)]
576 #[instrument(skip_all, fields(room_id = %room.room_id()))]
577 async fn sending_task(
578 room: WeakRoom,
579 queue: QueueStorage,
580 notifier: Arc<Notify>,
581 global_update_sender: broadcast::Sender<SendQueueUpdate>,
582 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
583 locally_enabled: Arc<AtomicBool>,
584 global_error_sender: broadcast::Sender<SendQueueRoomError>,
585 is_dropping: Arc<AtomicBool>,
586 report_media_upload_progress: Arc<AtomicBool>,
587 ) {
588 trace!("spawned the sending task");
589
590 let room_id = room.room_id();
591
592 loop {
593 if is_dropping.load(Ordering::SeqCst) {
595 trace!("shutting down!");
596 break;
597 }
598
599 let mut new_updates = Vec::new();
602 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
603 warn!("errors when applying dependent requests: {err}");
604 }
605
606 for up in new_updates {
607 send_update(&global_update_sender, &update_sender, room_id, up);
608 }
609
610 if !locally_enabled.load(Ordering::SeqCst) {
611 trace!("not enabled, sleeping");
612 notifier.notified().await;
614 continue;
615 }
616
617 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
618 Ok(Some(request)) => request,
619
620 Ok(None) => {
621 trace!("queue is empty, sleeping");
622 notifier.notified().await;
624 continue;
625 }
626
627 Err(err) => {
628 warn!("error when loading next request to send: {err}");
629 continue;
630 }
631 };
632
633 let txn_id = queued_request.transaction_id.clone();
634 trace!(txn_id = %txn_id, "received a request to send!");
635
636 let Some(room) = room.get() else {
637 if is_dropping.load(Ordering::SeqCst) {
638 break;
639 }
640 error!("the weak room couldn't be upgraded but we're not shutting down?");
641 continue;
642 };
643
644 let (related_txn_id, media_upload_progress_info, http_progress) =
649 if let QueuedRequestKind::MediaUpload {
650 cache_key,
651 thumbnail_source,
652 #[cfg(feature = "unstable-msc4274")]
653 accumulated,
654 related_to,
655 ..
656 } = &queued_request.kind
657 {
658 let (media_upload_progress_info, http_progress) =
661 if report_media_upload_progress.load(Ordering::SeqCst) {
662 let media_upload_progress_info =
663 RoomSendQueue::create_media_upload_progress_info(
664 &queued_request.transaction_id,
665 related_to,
666 cache_key,
667 thumbnail_source.as_ref(),
668 #[cfg(feature = "unstable-msc4274")]
669 accumulated,
670 &room,
671 &queue,
672 )
673 .await;
674
675 let progress = RoomSendQueue::create_media_upload_progress_observable(
676 &media_upload_progress_info,
677 related_to,
678 &update_sender,
679 );
680
681 (Some(media_upload_progress_info), Some(progress))
682 } else {
683 Default::default()
684 };
685
686 (Some(related_to.clone()), media_upload_progress_info, http_progress)
687 } else {
688 Default::default()
689 };
690
691 match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
692 {
693 Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
694 {
695 Ok(()) => match parent_key {
696 SentRequestKey::Event(event_id) => {
697 send_update(
698 &global_update_sender,
699 &update_sender,
700 room_id,
701 RoomSendQueueUpdate::SentEvent { transaction_id: txn_id, event_id },
702 );
703 }
704
705 SentRequestKey::Media(sent_media_info) => {
706 let index =
709 media_upload_progress_info.as_ref().map_or(0, |info| info.index);
710 let progress = media_upload_progress_info
711 .as_ref()
712 .map(|info| {
713 AbstractProgress { current: info.bytes, total: info.bytes }
714 + info.offsets
715 })
716 .unwrap_or(AbstractProgress { current: 1, total: 1 });
717
718 let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
721 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
722 file: Some(sent_media_info.file),
723 index,
724 progress,
725 });
726 }
727 },
728
729 Err(err) => {
730 warn!("unable to mark queued request as sent: {err}");
731 }
732 },
733
734 Ok(None) => {
735 debug!("Request has been aborted while running, continuing.");
736 }
737
738 Err(err) => {
739 let is_recoverable = match err {
740 crate::Error::Http(ref http_err) => {
741 matches!(
743 http_err.retry_kind(),
744 RetryKind::Transient { .. } | RetryKind::NetworkFailure
745 )
746 }
747
748 crate::Error::ConcurrentRequestFailed => true,
753
754 _ => false,
756 };
757
758 locally_enabled.store(false, Ordering::SeqCst);
760
761 if is_recoverable {
762 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
763
764 queue.mark_as_not_being_sent(&txn_id).await;
767
768 } else {
774 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
775
776 if let Err(storage_error) =
778 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
779 {
780 warn!("unable to mark request as wedged: {storage_error}");
781 }
782 }
783
784 let error = Arc::new(err);
785
786 let _ = global_error_sender.send(SendQueueRoomError {
787 room_id: room_id.to_owned(),
788 error: error.clone(),
789 is_recoverable,
790 });
791
792 send_update(
793 &global_update_sender,
794 &update_sender,
795 room_id,
796 RoomSendQueueUpdate::SendError {
797 transaction_id: related_txn_id.unwrap_or(txn_id),
798 error,
799 is_recoverable,
800 },
801 );
802 }
803 }
804 }
805
806 info!("exited sending task");
807 }
808
809 async fn handle_request(
813 room: &Room,
814 request: QueuedRequest,
815 cancel_upload_rx: Option<oneshot::Receiver<()>>,
816 progress: Option<SharedObservable<TransmissionProgress>>,
817 ) -> Result<Option<SentRequestKey>, crate::Error> {
818 match request.kind {
819 QueuedRequestKind::Event { content } => {
820 let (event, event_type) = content.raw();
821
822 let res = room
823 .send_raw(event_type, event)
824 .with_transaction_id(&request.transaction_id)
825 .with_request_config(RequestConfig::short_retry())
826 .await?;
827
828 trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
829 Ok(Some(SentRequestKey::Event(res.event_id)))
830 }
831
832 QueuedRequestKind::MediaUpload {
833 content_type,
834 cache_key,
835 thumbnail_source,
836 related_to: relates_to,
837 #[cfg(feature = "unstable-msc4274")]
838 accumulated,
839 } => {
840 trace!(%relates_to, "uploading media related to event");
841
842 let fut = async move {
843 let data = room
844 .client()
845 .event_cache_store()
846 .lock()
847 .await?
848 .get_media_content(&cache_key)
849 .await?
850 .ok_or(crate::Error::SendQueueWedgeError(Box::new(
851 QueueWedgeError::MissingMediaContent,
852 )))?;
853
854 let mime = Mime::from_str(&content_type).map_err(|_| {
855 crate::Error::SendQueueWedgeError(Box::new(
856 QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
857 ))
858 })?;
859
860 #[cfg(feature = "e2e-encryption")]
861 let media_source = if room.latest_encryption_state().await?.is_encrypted() {
862 trace!("upload will be encrypted (encrypted room)");
863
864 let mut cursor = std::io::Cursor::new(data);
865 let mut req = room
866 .client
867 .upload_encrypted_file(&mut cursor)
868 .with_request_config(RequestConfig::short_retry());
869 if let Some(progress) = progress {
870 req = req.with_send_progress_observable(progress);
871 }
872 let encrypted_file = req.await?;
873
874 MediaSource::Encrypted(Box::new(encrypted_file))
875 } else {
876 trace!("upload will be in clear text (room without encryption)");
877
878 let request_config = RequestConfig::short_retry()
879 .timeout(Media::reasonable_upload_timeout(&data));
880 let mut req =
881 room.client().media().upload(&mime, data, Some(request_config));
882 if let Some(progress) = progress {
883 req = req.with_send_progress_observable(progress);
884 }
885 let res = req.await?;
886
887 MediaSource::Plain(res.content_uri)
888 };
889
890 #[cfg(not(feature = "e2e-encryption"))]
891 let media_source = {
892 let request_config = RequestConfig::short_retry()
893 .timeout(Media::reasonable_upload_timeout(&data));
894 let mut req =
895 room.client().media().upload(&mime, data, Some(request_config));
896 if let Some(progress) = progress {
897 req = req.with_send_progress_observable(progress);
898 }
899 let res = req.await?;
900 MediaSource::Plain(res.content_uri)
901 };
902
903 let uri = match &media_source {
904 MediaSource::Plain(uri) => uri,
905 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
906 };
907 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
908
909 Ok(SentRequestKey::Media(SentMediaInfo {
910 file: media_source,
911 thumbnail: thumbnail_source,
912 #[cfg(feature = "unstable-msc4274")]
913 accumulated,
914 }))
915 };
916
917 let wait_for_cancel = async move {
918 if let Some(rx) = cancel_upload_rx {
919 rx.await
920 } else {
921 std::future::pending().await
922 }
923 };
924
925 tokio::select! {
926 biased;
927
928 _ = wait_for_cancel => {
929 Ok(None)
930 }
931
932 res = fut => {
933 res.map(Some)
934 }
935 }
936 }
937 }
938 }
939
940 pub fn is_enabled(&self) -> bool {
942 self.inner.locally_enabled.load(Ordering::SeqCst)
943 }
944
945 pub fn set_enabled(&self, enabled: bool) {
947 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
948
949 if enabled {
952 self.inner.notifier.notify_one();
953 }
954 }
955
956 fn send_update(&self, update: RoomSendQueueUpdate) {
960 let _ = self.inner.update_sender.send(update.clone());
961 let _ = self
962 .inner
963 .global_update_sender
964 .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
965 }
966}
967
968fn send_update(
969 global_update_sender: &broadcast::Sender<SendQueueUpdate>,
970 update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
971 room_id: &RoomId,
972 update: RoomSendQueueUpdate,
973) {
974 let _ = update_sender.send(update.clone());
975 let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
976}
977
978impl From<&crate::Error> for QueueWedgeError {
979 fn from(value: &crate::Error) -> Self {
980 match value {
981 #[cfg(feature = "e2e-encryption")]
982 crate::Error::OlmError(error) => match &**error {
983 OlmError::SessionRecipientCollectionError(error) => match error {
984 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
985 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
986 }
987
988 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
989 QueueWedgeError::IdentityViolations { users: users.clone() }
990 }
991
992 SessionRecipientCollectionError::CrossSigningNotSetup
993 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
994 QueueWedgeError::CrossVerificationRequired
995 }
996 },
997 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
998 },
999
1000 crate::Error::SendQueueWedgeError(error) => *error.clone(),
1002
1003 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1004 }
1005 }
1006}
1007
1008struct RoomSendQueueInner {
1009 room: WeakRoom,
1011
1012 global_update_sender: broadcast::Sender<SendQueueUpdate>,
1016
1017 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1023
1024 queue: QueueStorage,
1031
1032 notifier: Arc<Notify>,
1035
1036 locally_enabled: Arc<AtomicBool>,
1039
1040 _task: JoinHandle<()>,
1043}
1044
1045struct BeingSentInfo {
1047 transaction_id: OwnedTransactionId,
1049
1050 cancel_upload: Option<oneshot::Sender<()>>,
1053}
1054
1055impl BeingSentInfo {
1056 fn cancel_upload(self) -> bool {
1061 if let Some(cancel_upload) = self.cancel_upload {
1062 let _ = cancel_upload.send(());
1063 true
1064 } else {
1065 false
1066 }
1067 }
1068}
1069
1070#[derive(Clone)]
1073struct StoreLock {
1074 client: WeakClient,
1076
1077 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1082}
1083
1084impl StoreLock {
1085 async fn lock(&self) -> StoreLockGuard {
1087 StoreLockGuard {
1088 client: self.client.clone(),
1089 being_sent: self.being_sent.clone().lock_owned().await,
1090 }
1091 }
1092}
1093
1094struct StoreLockGuard {
1097 client: WeakClient,
1099
1100 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1103}
1104
1105impl StoreLockGuard {
1106 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1108 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1109 }
1110}
1111
1112#[derive(Clone)]
1113struct QueueStorage {
1114 store: StoreLock,
1117
1118 room_id: OwnedRoomId,
1120
1121 thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1134}
1135
1136impl QueueStorage {
1137 const LOW_PRIORITY: usize = 0;
1139
1140 const HIGH_PRIORITY: usize = 10;
1142
1143 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1145 Self {
1146 room_id: room,
1147 store: StoreLock { client, being_sent: Default::default() },
1148 thumbnail_file_sizes: Default::default(),
1149 }
1150 }
1151
1152 async fn push(
1156 &self,
1157 request: QueuedRequestKind,
1158 created_at: MilliSecondsSinceUnixEpoch,
1159 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1160 let transaction_id = TransactionId::new();
1161
1162 self.store
1163 .lock()
1164 .await
1165 .client()?
1166 .state_store()
1167 .save_send_queue_request(
1168 &self.room_id,
1169 transaction_id.clone(),
1170 created_at,
1171 request,
1172 Self::LOW_PRIORITY,
1173 )
1174 .await?;
1175
1176 Ok(transaction_id)
1177 }
1178
1179 async fn peek_next_to_send(
1184 &self,
1185 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1186 {
1187 let mut guard = self.store.lock().await;
1188 let queued_requests =
1189 guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1190
1191 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1192 let (cancel_upload_tx, cancel_upload_rx) =
1193 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1194 let (tx, rx) = oneshot::channel();
1195 (Some(tx), Some(rx))
1196 } else {
1197 Default::default()
1198 };
1199
1200 let prev = guard.being_sent.replace(BeingSentInfo {
1201 transaction_id: request.transaction_id.clone(),
1202 cancel_upload: cancel_upload_tx,
1203 });
1204
1205 if let Some(prev) = prev {
1206 error!(
1207 prev_txn = ?prev.transaction_id,
1208 "a previous request was still active while picking a new one"
1209 );
1210 }
1211
1212 Ok(Some((request.clone(), cancel_upload_rx)))
1213 } else {
1214 Ok(None)
1215 }
1216 }
1217
1218 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1222 let was_being_sent = self.store.lock().await.being_sent.take();
1223
1224 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1225 if prev_txn != Some(transaction_id) {
1226 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1227 }
1228 }
1229
1230 async fn mark_as_wedged(
1234 &self,
1235 transaction_id: &TransactionId,
1236 reason: QueueWedgeError,
1237 ) -> Result<(), RoomSendQueueStorageError> {
1238 let mut guard = self.store.lock().await;
1240 let was_being_sent = guard.being_sent.take();
1241
1242 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1243 if prev_txn != Some(transaction_id) {
1244 error!(
1245 ?prev_txn,
1246 "previous active request didn't match that we expect (after permanent error)",
1247 );
1248 }
1249
1250 Ok(guard
1251 .client()?
1252 .state_store()
1253 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1254 .await?)
1255 }
1256
1257 async fn mark_as_unwedged(
1260 &self,
1261 transaction_id: &TransactionId,
1262 ) -> Result<(), RoomSendQueueStorageError> {
1263 Ok(self
1264 .store
1265 .lock()
1266 .await
1267 .client()?
1268 .state_store()
1269 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1270 .await?)
1271 }
1272
1273 async fn mark_as_sent(
1276 &self,
1277 transaction_id: &TransactionId,
1278 parent_key: SentRequestKey,
1279 ) -> Result<(), RoomSendQueueStorageError> {
1280 let mut guard = self.store.lock().await;
1282 let was_being_sent = guard.being_sent.take();
1283
1284 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1285 if prev_txn != Some(transaction_id) {
1286 error!(
1287 ?prev_txn,
1288 "previous active request didn't match that we expect (after successful send)",
1289 );
1290 }
1291
1292 let client = guard.client()?;
1293 let store = client.state_store();
1294
1295 store
1297 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1298 .await?;
1299
1300 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1301
1302 if !removed {
1303 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1304 }
1305
1306 self.thumbnail_file_sizes.lock().remove(transaction_id);
1307
1308 Ok(())
1309 }
1310
1311 async fn cancel_event(
1318 &self,
1319 transaction_id: &TransactionId,
1320 ) -> Result<bool, RoomSendQueueStorageError> {
1321 let guard = self.store.lock().await;
1322
1323 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1324 == Some(transaction_id)
1325 {
1326 guard
1328 .client()?
1329 .state_store()
1330 .save_dependent_queued_request(
1331 &self.room_id,
1332 transaction_id,
1333 ChildTransactionId::new(),
1334 MilliSecondsSinceUnixEpoch::now(),
1335 DependentQueuedRequestKind::RedactEvent,
1336 )
1337 .await?;
1338
1339 return Ok(true);
1340 }
1341
1342 let removed = guard
1343 .client()?
1344 .state_store()
1345 .remove_send_queue_request(&self.room_id, transaction_id)
1346 .await?;
1347
1348 self.thumbnail_file_sizes.lock().remove(transaction_id);
1349
1350 Ok(removed)
1351 }
1352
1353 async fn replace_event(
1360 &self,
1361 transaction_id: &TransactionId,
1362 serializable: SerializableEventContent,
1363 ) -> Result<bool, RoomSendQueueStorageError> {
1364 let guard = self.store.lock().await;
1365
1366 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1367 == Some(transaction_id)
1368 {
1369 guard
1371 .client()?
1372 .state_store()
1373 .save_dependent_queued_request(
1374 &self.room_id,
1375 transaction_id,
1376 ChildTransactionId::new(),
1377 MilliSecondsSinceUnixEpoch::now(),
1378 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1379 )
1380 .await?;
1381
1382 return Ok(true);
1383 }
1384
1385 let edited = guard
1386 .client()?
1387 .state_store()
1388 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1389 .await?;
1390
1391 Ok(edited)
1392 }
1393
1394 #[allow(clippy::too_many_arguments)]
1398 async fn push_media(
1399 &self,
1400 event: RoomMessageEventContent,
1401 content_type: Mime,
1402 send_event_txn: OwnedTransactionId,
1403 created_at: MilliSecondsSinceUnixEpoch,
1404 upload_file_txn: OwnedTransactionId,
1405 file_media_request: MediaRequestParameters,
1406 thumbnail: Option<QueueThumbnailInfo>,
1407 ) -> Result<(), RoomSendQueueStorageError> {
1408 let guard = self.store.lock().await;
1409 let client = guard.client()?;
1410 let store = client.state_store();
1411
1412 let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1414
1415 let thumbnail_info = self
1416 .push_thumbnail_and_media_uploads(
1417 store,
1418 &content_type,
1419 send_event_txn.clone(),
1420 created_at,
1421 upload_file_txn.clone(),
1422 file_media_request,
1423 thumbnail,
1424 )
1425 .await?;
1426
1427 store
1429 .save_dependent_queued_request(
1430 &self.room_id,
1431 &upload_file_txn,
1432 send_event_txn.clone().into(),
1433 created_at,
1434 DependentQueuedRequestKind::FinishUpload {
1435 local_echo: Box::new(event),
1436 file_upload: upload_file_txn.clone(),
1437 thumbnail_info,
1438 },
1439 )
1440 .await?;
1441
1442 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1443
1444 Ok(())
1445 }
1446
1447 #[cfg(feature = "unstable-msc4274")]
1451 #[allow(clippy::too_many_arguments)]
1452 async fn push_gallery(
1453 &self,
1454 event: RoomMessageEventContent,
1455 send_event_txn: OwnedTransactionId,
1456 created_at: MilliSecondsSinceUnixEpoch,
1457 item_queue_infos: Vec<GalleryItemQueueInfo>,
1458 ) -> Result<(), RoomSendQueueStorageError> {
1459 let guard = self.store.lock().await;
1460 let client = guard.client()?;
1461 let store = client.state_store();
1462
1463 let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1464 let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1465
1466 let Some((first, rest)) = item_queue_infos.split_first() else {
1467 return Ok(());
1468 };
1469
1470 let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1471 first;
1472
1473 let thumbnail_info = self
1474 .push_thumbnail_and_media_uploads(
1475 store,
1476 content_type,
1477 send_event_txn.clone(),
1478 created_at,
1479 upload_file_txn.clone(),
1480 file_media_request.clone(),
1481 thumbnail.clone(),
1482 )
1483 .await?;
1484
1485 finish_item_infos
1486 .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1487 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1488
1489 let mut last_upload_file_txn = upload_file_txn.clone();
1490
1491 for item_queue_info in rest {
1492 let GalleryItemQueueInfo {
1493 content_type,
1494 upload_file_txn,
1495 file_media_request,
1496 thumbnail,
1497 } = item_queue_info;
1498
1499 let thumbnail_info = if let Some(QueueThumbnailInfo {
1500 finish_upload_thumbnail_info: thumbnail_info,
1501 media_request_parameters: thumbnail_media_request,
1502 content_type: thumbnail_content_type,
1503 ..
1504 }) = thumbnail
1505 {
1506 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1507
1508 store
1511 .save_dependent_queued_request(
1512 &self.room_id,
1513 &last_upload_file_txn,
1514 upload_thumbnail_txn.clone().into(),
1515 created_at,
1516 DependentQueuedRequestKind::UploadFileOrThumbnail {
1517 content_type: thumbnail_content_type.to_string(),
1518 cache_key: thumbnail_media_request.clone(),
1519 related_to: send_event_txn.clone(),
1520 parent_is_thumbnail_upload: false,
1521 },
1522 )
1523 .await?;
1524
1525 last_upload_file_txn = upload_thumbnail_txn;
1526
1527 Some(thumbnail_info)
1528 } else {
1529 None
1530 };
1531
1532 store
1534 .save_dependent_queued_request(
1535 &self.room_id,
1536 &last_upload_file_txn,
1537 upload_file_txn.clone().into(),
1538 created_at,
1539 DependentQueuedRequestKind::UploadFileOrThumbnail {
1540 content_type: content_type.to_string(),
1541 cache_key: file_media_request.clone(),
1542 related_to: send_event_txn.clone(),
1543 parent_is_thumbnail_upload: thumbnail.is_some(),
1544 },
1545 )
1546 .await?;
1547
1548 finish_item_infos.push(FinishGalleryItemInfo {
1549 file_upload: upload_file_txn.clone(),
1550 thumbnail_info: thumbnail_info.cloned(),
1551 });
1552 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1553
1554 last_upload_file_txn = upload_file_txn.clone();
1555 }
1556
1557 store
1560 .save_dependent_queued_request(
1561 &self.room_id,
1562 &last_upload_file_txn,
1563 send_event_txn.clone().into(),
1564 created_at,
1565 DependentQueuedRequestKind::FinishGallery {
1566 local_echo: Box::new(event),
1567 item_infos: finish_item_infos,
1568 },
1569 )
1570 .await?;
1571
1572 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1573
1574 Ok(())
1575 }
1576
1577 #[allow(clippy::too_many_arguments)]
1583 async fn push_thumbnail_and_media_uploads(
1584 &self,
1585 store: &DynStateStore,
1586 content_type: &Mime,
1587 send_event_txn: OwnedTransactionId,
1588 created_at: MilliSecondsSinceUnixEpoch,
1589 upload_file_txn: OwnedTransactionId,
1590 file_media_request: MediaRequestParameters,
1591 thumbnail: Option<QueueThumbnailInfo>,
1592 ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1593 if let Some(QueueThumbnailInfo {
1594 finish_upload_thumbnail_info: thumbnail_info,
1595 media_request_parameters: thumbnail_media_request,
1596 content_type: thumbnail_content_type,
1597 ..
1598 }) = thumbnail
1599 {
1600 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1601
1602 store
1604 .save_send_queue_request(
1605 &self.room_id,
1606 upload_thumbnail_txn.clone(),
1607 created_at,
1608 QueuedRequestKind::MediaUpload {
1609 content_type: thumbnail_content_type.to_string(),
1610 cache_key: thumbnail_media_request,
1611 thumbnail_source: None, related_to: send_event_txn.clone(),
1613 #[cfg(feature = "unstable-msc4274")]
1614 accumulated: vec![],
1615 },
1616 Self::LOW_PRIORITY,
1617 )
1618 .await?;
1619
1620 store
1622 .save_dependent_queued_request(
1623 &self.room_id,
1624 &upload_thumbnail_txn,
1625 upload_file_txn.into(),
1626 created_at,
1627 DependentQueuedRequestKind::UploadFileOrThumbnail {
1628 content_type: content_type.to_string(),
1629 cache_key: file_media_request,
1630 related_to: send_event_txn,
1631 parent_is_thumbnail_upload: true,
1632 },
1633 )
1634 .await?;
1635
1636 Ok(Some(thumbnail_info))
1637 } else {
1638 store
1640 .save_send_queue_request(
1641 &self.room_id,
1642 upload_file_txn,
1643 created_at,
1644 QueuedRequestKind::MediaUpload {
1645 content_type: content_type.to_string(),
1646 cache_key: file_media_request,
1647 thumbnail_source: None,
1648 related_to: send_event_txn,
1649 #[cfg(feature = "unstable-msc4274")]
1650 accumulated: vec![],
1651 },
1652 Self::LOW_PRIORITY,
1653 )
1654 .await?;
1655
1656 Ok(None)
1657 }
1658 }
1659
1660 #[instrument(skip(self))]
1662 async fn react(
1663 &self,
1664 transaction_id: &TransactionId,
1665 key: String,
1666 created_at: MilliSecondsSinceUnixEpoch,
1667 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1668 let guard = self.store.lock().await;
1669 let client = guard.client()?;
1670 let store = client.state_store();
1671
1672 let requests = store.load_send_queue_requests(&self.room_id).await?;
1673
1674 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1676 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1679 if !dependent_requests
1680 .into_iter()
1681 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1682 .any(|child_txn| *child_txn == *transaction_id)
1683 {
1684 return Ok(None);
1686 }
1687 }
1688
1689 let reaction_txn_id = ChildTransactionId::new();
1691 store
1692 .save_dependent_queued_request(
1693 &self.room_id,
1694 transaction_id,
1695 reaction_txn_id.clone(),
1696 created_at,
1697 DependentQueuedRequestKind::ReactEvent { key },
1698 )
1699 .await?;
1700
1701 Ok(Some(reaction_txn_id))
1702 }
1703
1704 async fn local_echoes(
1707 &self,
1708 room: &RoomSendQueue,
1709 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1710 let guard = self.store.lock().await;
1711 let client = guard.client()?;
1712 let store = client.state_store();
1713
1714 let local_requests =
1715 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1716 Some(LocalEcho {
1717 transaction_id: queued.transaction_id.clone(),
1718 content: match queued.kind {
1719 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1720 serialized_event: content,
1721 send_handle: SendHandle {
1722 room: room.clone(),
1723 transaction_id: queued.transaction_id,
1724 media_handles: vec![],
1725 created_at: queued.created_at,
1726 },
1727 send_error: queued.error,
1728 },
1729
1730 QueuedRequestKind::MediaUpload { .. } => {
1731 return None;
1734 }
1735 },
1736 })
1737 });
1738
1739 let reactions_and_medias = store
1740 .load_dependent_queued_requests(&self.room_id)
1741 .await?
1742 .into_iter()
1743 .filter_map(|dep| match dep.kind {
1744 DependentQueuedRequestKind::EditEvent { .. }
1745 | DependentQueuedRequestKind::RedactEvent => {
1746 None
1748 }
1749
1750 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1751 transaction_id: dep.own_transaction_id.clone().into(),
1752 content: LocalEchoContent::React {
1753 key,
1754 send_handle: SendReactionHandle {
1755 room: room.clone(),
1756 transaction_id: dep.own_transaction_id,
1757 },
1758 applies_to: dep.parent_transaction_id,
1759 },
1760 }),
1761
1762 DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1763 None
1765 }
1766
1767 DependentQueuedRequestKind::FinishUpload {
1768 local_echo,
1769 file_upload,
1770 thumbnail_info,
1771 } => {
1772 Some(LocalEcho {
1774 transaction_id: dep.own_transaction_id.clone().into(),
1775 content: LocalEchoContent::Event {
1776 serialized_event: SerializableEventContent::new(&(*local_echo).into())
1777 .ok()?,
1778 send_handle: SendHandle {
1779 room: room.clone(),
1780 transaction_id: dep.own_transaction_id.into(),
1781 media_handles: vec![MediaHandles {
1782 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1783 upload_file_txn: file_upload,
1784 }],
1785 created_at: dep.created_at,
1786 },
1787 send_error: None,
1788 },
1789 })
1790 }
1791
1792 #[cfg(feature = "unstable-msc4274")]
1793 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1794 self.create_gallery_local_echo(
1796 dep.own_transaction_id,
1797 room,
1798 dep.created_at,
1799 local_echo,
1800 item_infos,
1801 )
1802 }
1803 });
1804
1805 Ok(local_requests.chain(reactions_and_medias).collect())
1806 }
1807
1808 #[cfg(feature = "unstable-msc4274")]
1810 fn create_gallery_local_echo(
1811 &self,
1812 transaction_id: ChildTransactionId,
1813 room: &RoomSendQueue,
1814 created_at: MilliSecondsSinceUnixEpoch,
1815 local_echo: Box<RoomMessageEventContent>,
1816 item_infos: Vec<FinishGalleryItemInfo>,
1817 ) -> Option<LocalEcho> {
1818 Some(LocalEcho {
1819 transaction_id: transaction_id.clone().into(),
1820 content: LocalEchoContent::Event {
1821 serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1822 send_handle: SendHandle {
1823 room: room.clone(),
1824 transaction_id: transaction_id.into(),
1825 media_handles: item_infos
1826 .into_iter()
1827 .map(|i| MediaHandles {
1828 upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1829 upload_file_txn: i.file_upload,
1830 })
1831 .collect(),
1832 created_at,
1833 },
1834 send_error: None,
1835 },
1836 })
1837 }
1838
1839 #[instrument(skip_all)]
1847 async fn try_apply_single_dependent_request(
1848 &self,
1849 client: &Client,
1850 dependent_request: DependentQueuedRequest,
1851 new_updates: &mut Vec<RoomSendQueueUpdate>,
1852 ) -> Result<bool, RoomSendQueueError> {
1853 let store = client.state_store();
1854
1855 let parent_key = dependent_request.parent_key;
1856
1857 match dependent_request.kind {
1858 DependentQueuedRequestKind::EditEvent { new_content } => {
1859 if let Some(parent_key) = parent_key {
1860 let Some(event_id) = parent_key.into_event_id() else {
1861 return Err(RoomSendQueueError::StorageError(
1862 RoomSendQueueStorageError::InvalidParentKey,
1863 ));
1864 };
1865
1866 let room = client
1868 .get_room(&self.room_id)
1869 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1870
1871 let edited_content = match new_content.deserialize() {
1875 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1876 EditedContent::RoomMessage(c.into())
1878 }
1879
1880 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1881 let poll_start = c.poll_start().clone();
1882 EditedContent::PollStart {
1883 fallback_text: poll_start.question.text.clone(),
1884 new_content: poll_start,
1885 }
1886 }
1887
1888 Ok(c) => {
1889 warn!("Unsupported edit content type: {:?}", c.event_type());
1890 return Ok(true);
1891 }
1892
1893 Err(err) => {
1894 warn!("Unable to deserialize: {err}");
1895 return Ok(true);
1896 }
1897 };
1898
1899 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1900 Ok(e) => e,
1901 Err(err) => {
1902 warn!("couldn't create edited event: {err}");
1903 return Ok(true);
1904 }
1905 };
1906
1907 let serializable = SerializableEventContent::from_raw(
1909 Raw::new(&edit_event)
1910 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1911 edit_event.event_type().to_string(),
1912 );
1913
1914 store
1915 .save_send_queue_request(
1916 &self.room_id,
1917 dependent_request.own_transaction_id.into(),
1918 dependent_request.created_at,
1919 serializable.into(),
1920 Self::HIGH_PRIORITY,
1921 )
1922 .await
1923 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1924 } else {
1925 let edited = store
1927 .update_send_queue_request(
1928 &self.room_id,
1929 &dependent_request.parent_transaction_id,
1930 new_content.into(),
1931 )
1932 .await
1933 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1934
1935 if !edited {
1936 warn!("missing local echo upon dependent edit");
1937 }
1938 }
1939 }
1940
1941 DependentQueuedRequestKind::RedactEvent => {
1942 if let Some(parent_key) = parent_key {
1943 let Some(event_id) = parent_key.into_event_id() else {
1944 return Err(RoomSendQueueError::StorageError(
1945 RoomSendQueueStorageError::InvalidParentKey,
1946 ));
1947 };
1948
1949 let room = client
1951 .get_room(&self.room_id)
1952 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1953
1954 if let Err(err) = room
1962 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1963 .await
1964 {
1965 warn!("error when sending a redact for {event_id}: {err}");
1966 return Ok(false);
1967 }
1968 } else {
1969 let removed = store
1972 .remove_send_queue_request(
1973 &self.room_id,
1974 &dependent_request.parent_transaction_id,
1975 )
1976 .await
1977 .map_err(RoomSendQueueStorageError::StateStoreError)?;
1978
1979 if !removed {
1980 warn!("missing local echo upon dependent redact");
1981 }
1982 }
1983 }
1984
1985 DependentQueuedRequestKind::ReactEvent { key } => {
1986 if let Some(parent_key) = parent_key {
1987 let Some(parent_event_id) = parent_key.into_event_id() else {
1988 return Err(RoomSendQueueError::StorageError(
1989 RoomSendQueueStorageError::InvalidParentKey,
1990 ));
1991 };
1992
1993 let react_event =
1995 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1996 let serializable = SerializableEventContent::from_raw(
1997 Raw::new(&react_event)
1998 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1999 react_event.event_type().to_string(),
2000 );
2001
2002 store
2003 .save_send_queue_request(
2004 &self.room_id,
2005 dependent_request.own_transaction_id.into(),
2006 dependent_request.created_at,
2007 serializable.into(),
2008 Self::HIGH_PRIORITY,
2009 )
2010 .await
2011 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2012 } else {
2013 return Ok(false);
2015 }
2016 }
2017
2018 DependentQueuedRequestKind::UploadFileOrThumbnail {
2019 content_type,
2020 cache_key,
2021 related_to,
2022 parent_is_thumbnail_upload,
2023 } => {
2024 let Some(parent_key) = parent_key else {
2025 return Ok(false);
2027 };
2028 self.handle_dependent_file_or_thumbnail_upload(
2029 client,
2030 dependent_request.own_transaction_id.into(),
2031 parent_key,
2032 content_type,
2033 cache_key,
2034 related_to,
2035 parent_is_thumbnail_upload,
2036 )
2037 .await?;
2038 }
2039
2040 DependentQueuedRequestKind::FinishUpload {
2041 local_echo,
2042 file_upload,
2043 thumbnail_info,
2044 } => {
2045 let Some(parent_key) = parent_key else {
2046 return Ok(false);
2048 };
2049 self.handle_dependent_finish_upload(
2050 client,
2051 dependent_request.own_transaction_id.into(),
2052 parent_key,
2053 *local_echo,
2054 file_upload,
2055 thumbnail_info,
2056 new_updates,
2057 )
2058 .await?;
2059 }
2060
2061 #[cfg(feature = "unstable-msc4274")]
2062 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2063 let Some(parent_key) = parent_key else {
2064 return Ok(false);
2066 };
2067 self.handle_dependent_finish_gallery_upload(
2068 client,
2069 dependent_request.own_transaction_id.into(),
2070 parent_key,
2071 *local_echo,
2072 item_infos,
2073 new_updates,
2074 )
2075 .await?;
2076 }
2077 }
2078
2079 Ok(true)
2080 }
2081
2082 #[instrument(skip(self))]
2083 async fn apply_dependent_requests(
2084 &self,
2085 new_updates: &mut Vec<RoomSendQueueUpdate>,
2086 ) -> Result<(), RoomSendQueueError> {
2087 let guard = self.store.lock().await;
2088
2089 let client = guard.client()?;
2090 let store = client.state_store();
2091
2092 let dependent_requests = store
2093 .load_dependent_queued_requests(&self.room_id)
2094 .await
2095 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2096
2097 let num_initial_dependent_requests = dependent_requests.len();
2098 if num_initial_dependent_requests == 0 {
2099 return Ok(());
2101 }
2102
2103 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2104
2105 for original in &dependent_requests {
2107 if !canonicalized_dependent_requests
2108 .iter()
2109 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2110 {
2111 store
2112 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2113 .await
2114 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2115 }
2116 }
2117
2118 let mut num_dependent_requests = canonicalized_dependent_requests.len();
2119
2120 debug!(
2121 num_dependent_requests,
2122 num_initial_dependent_requests, "starting handling of dependent requests"
2123 );
2124
2125 for dependent in canonicalized_dependent_requests {
2126 let dependent_id = dependent.own_transaction_id.clone();
2127
2128 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2129 Ok(should_remove) => {
2130 if should_remove {
2131 store
2133 .remove_dependent_queued_request(&self.room_id, &dependent_id)
2134 .await
2135 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2136
2137 num_dependent_requests -= 1;
2138 }
2139 }
2140
2141 Err(err) => {
2142 warn!("error when applying single dependent request: {err}");
2143 }
2144 }
2145 }
2146
2147 debug!(
2148 leftover_dependent_requests = num_dependent_requests,
2149 "stopped handling dependent request"
2150 );
2151
2152 Ok(())
2153 }
2154
2155 async fn remove_dependent_send_queue_request(
2157 &self,
2158 dependent_event_id: &ChildTransactionId,
2159 ) -> Result<bool, RoomSendQueueStorageError> {
2160 Ok(self
2161 .store
2162 .lock()
2163 .await
2164 .client()?
2165 .state_store()
2166 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2167 .await?)
2168 }
2169}
2170
2171#[cfg(feature = "unstable-msc4274")]
2172struct GalleryItemQueueInfo {
2174 content_type: Mime,
2175 upload_file_txn: OwnedTransactionId,
2176 file_media_request: MediaRequestParameters,
2177 thumbnail: Option<QueueThumbnailInfo>,
2178}
2179
2180#[derive(Clone, Debug)]
2182pub enum LocalEchoContent {
2183 Event {
2185 serialized_event: SerializableEventContent,
2188 send_handle: SendHandle,
2190 send_error: Option<QueueWedgeError>,
2193 },
2194
2195 React {
2197 key: String,
2199 send_handle: SendReactionHandle,
2201 applies_to: OwnedTransactionId,
2203 },
2204}
2205
2206#[derive(Clone, Debug)]
2209pub struct LocalEcho {
2210 pub transaction_id: OwnedTransactionId,
2212 pub content: LocalEchoContent,
2214}
2215
2216#[derive(Clone, Debug)]
2219pub enum RoomSendQueueUpdate {
2220 NewLocalEvent(LocalEcho),
2225
2226 CancelledLocalEvent {
2229 transaction_id: OwnedTransactionId,
2231 },
2232
2233 ReplacedLocalEvent {
2235 transaction_id: OwnedTransactionId,
2237
2238 new_content: SerializableEventContent,
2240 },
2241
2242 SendError {
2247 transaction_id: OwnedTransactionId,
2249 error: Arc<crate::Error>,
2251 is_recoverable: bool,
2257 },
2258
2259 RetryEvent {
2261 transaction_id: OwnedTransactionId,
2263 },
2264
2265 SentEvent {
2268 transaction_id: OwnedTransactionId,
2270 event_id: OwnedEventId,
2272 },
2273
2274 MediaUpload {
2277 related_to: OwnedTransactionId,
2279
2280 file: Option<MediaSource>,
2282
2283 index: u64,
2287
2288 progress: AbstractProgress,
2292 },
2293}
2294
2295#[derive(Clone, Debug)]
2300pub struct SendQueueUpdate {
2301 pub room_id: OwnedRoomId,
2303
2304 pub update: RoomSendQueueUpdate,
2306}
2307
2308#[derive(Debug, thiserror::Error)]
2310pub enum RoomSendQueueError {
2311 #[error("the room isn't in the joined state")]
2313 RoomNotJoined,
2314
2315 #[error("the room is now missing from the client")]
2319 RoomDisappeared,
2320
2321 #[error(transparent)]
2323 StorageError(#[from] RoomSendQueueStorageError),
2324
2325 #[error("the attachment event could not be created")]
2327 FailedToCreateAttachment,
2328
2329 #[cfg(feature = "unstable-msc4274")]
2331 #[error("the gallery contains no items")]
2332 EmptyGallery,
2333
2334 #[cfg(feature = "unstable-msc4274")]
2336 #[error("the gallery event could not be created")]
2337 FailedToCreateGallery,
2338}
2339
2340#[derive(Debug, thiserror::Error)]
2342pub enum RoomSendQueueStorageError {
2343 #[error(transparent)]
2345 StateStoreError(#[from] StoreError),
2346
2347 #[error(transparent)]
2349 EventCacheStoreError(#[from] EventCacheStoreError),
2350
2351 #[error(transparent)]
2353 LockError(#[from] LockStoreError),
2354
2355 #[error(transparent)]
2357 JsonSerialization(#[from] serde_json::Error),
2358
2359 #[error("a dependent event had an invalid parent key type")]
2362 InvalidParentKey,
2363
2364 #[error("The client is shutting down.")]
2366 ClientShuttingDown,
2367
2368 #[error("This operation is not implemented for media uploads")]
2370 OperationNotImplementedYet,
2371
2372 #[error("Can't edit a media caption when the underlying event isn't a media")]
2374 InvalidMediaCaptionEdit,
2375}
2376
2377#[derive(Clone, Debug)]
2379struct MediaHandles {
2380 upload_thumbnail_txn: Option<OwnedTransactionId>,
2384
2385 upload_file_txn: OwnedTransactionId,
2387}
2388
2389#[derive(Clone, Debug)]
2391pub struct SendHandle {
2392 room: RoomSendQueue,
2394
2395 transaction_id: OwnedTransactionId,
2400
2401 media_handles: Vec<MediaHandles>,
2403
2404 pub created_at: MilliSecondsSinceUnixEpoch,
2406}
2407
2408impl SendHandle {
2409 #[cfg(test)]
2411 pub(crate) fn new(
2412 room: RoomSendQueue,
2413 transaction_id: OwnedTransactionId,
2414 created_at: MilliSecondsSinceUnixEpoch,
2415 ) -> Self {
2416 Self { room, transaction_id, media_handles: vec![], created_at }
2417 }
2418
2419 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2420 if !self.media_handles.is_empty() {
2421 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2422 } else {
2423 Ok(())
2424 }
2425 }
2426
2427 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2432 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2433 trace!("received an abort request");
2434
2435 let queue = &self.room.inner.queue;
2436
2437 for handles in &self.media_handles {
2438 if queue.abort_upload(&self.transaction_id, handles).await? {
2439 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2441 transaction_id: self.transaction_id.clone(),
2442 });
2443
2444 return Ok(true);
2445 }
2446
2447 }
2451
2452 if queue.cancel_event(&self.transaction_id).await? {
2453 trace!("successful abort");
2454
2455 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2457 transaction_id: self.transaction_id.clone(),
2458 });
2459
2460 Ok(true)
2461 } else {
2462 debug!("local echo didn't exist anymore, can't abort");
2463 Ok(false)
2464 }
2465 }
2466
2467 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2472 pub async fn edit_raw(
2473 &self,
2474 new_content: Raw<AnyMessageLikeEventContent>,
2475 event_type: String,
2476 ) -> Result<bool, RoomSendQueueStorageError> {
2477 trace!("received an edit request");
2478 self.nyi_for_uploads()?;
2479
2480 let serializable = SerializableEventContent::from_raw(new_content, event_type);
2481
2482 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2483 trace!("successful edit");
2484
2485 self.room.inner.notifier.notify_one();
2487
2488 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2490 transaction_id: self.transaction_id.clone(),
2491 new_content: serializable,
2492 });
2493
2494 Ok(true)
2495 } else {
2496 debug!("local echo doesn't exist anymore, can't edit");
2497 Ok(false)
2498 }
2499 }
2500
2501 pub async fn edit(
2506 &self,
2507 new_content: AnyMessageLikeEventContent,
2508 ) -> Result<bool, RoomSendQueueStorageError> {
2509 self.edit_raw(
2510 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2511 new_content.event_type().to_string(),
2512 )
2513 .await
2514 }
2515
2516 pub async fn edit_media_caption(
2521 &self,
2522 caption: Option<String>,
2523 formatted_caption: Option<FormattedBody>,
2524 mentions: Option<Mentions>,
2525 ) -> Result<bool, RoomSendQueueStorageError> {
2526 if let Some(new_content) = self
2527 .room
2528 .inner
2529 .queue
2530 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2531 .await?
2532 {
2533 trace!("successful edit of media caption");
2534
2535 self.room.inner.notifier.notify_one();
2537
2538 let new_content = SerializableEventContent::new(&new_content)
2539 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2540
2541 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2543 transaction_id: self.transaction_id.clone(),
2544 new_content,
2545 });
2546
2547 Ok(true)
2548 } else {
2549 debug!("local echo doesn't exist anymore, can't edit media caption");
2550 Ok(false)
2551 }
2552 }
2553
2554 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2557 let room = &self.room.inner;
2558 room.queue
2559 .mark_as_unwedged(&self.transaction_id)
2560 .await
2561 .map_err(RoomSendQueueError::StorageError)?;
2562
2563 for handles in &self.media_handles {
2571 room.queue
2572 .mark_as_unwedged(&handles.upload_file_txn)
2573 .await
2574 .map_err(RoomSendQueueError::StorageError)?;
2575
2576 if let Some(txn) = &handles.upload_thumbnail_txn {
2577 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2578 }
2579 }
2580
2581 room.notifier.notify_one();
2583
2584 self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2585 transaction_id: self.transaction_id.clone(),
2586 });
2587
2588 Ok(())
2589 }
2590
2591 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2596 pub async fn react(
2597 &self,
2598 key: String,
2599 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2600 trace!("received an intent to react");
2601
2602 let created_at = MilliSecondsSinceUnixEpoch::now();
2603 if let Some(reaction_txn_id) =
2604 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2605 {
2606 trace!("successfully queued react");
2607
2608 self.room.inner.notifier.notify_one();
2610
2611 let send_handle = SendReactionHandle {
2613 room: self.room.clone(),
2614 transaction_id: reaction_txn_id.clone(),
2615 };
2616
2617 self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2618 transaction_id: reaction_txn_id.into(),
2621 content: LocalEchoContent::React {
2622 key,
2623 send_handle: send_handle.clone(),
2624 applies_to: self.transaction_id.clone(),
2625 },
2626 }));
2627
2628 Ok(Some(send_handle))
2629 } else {
2630 debug!("local echo doesn't exist anymore, can't react");
2631 Ok(None)
2632 }
2633 }
2634}
2635
2636#[derive(Clone, Debug)]
2638pub struct SendReactionHandle {
2639 room: RoomSendQueue,
2641 transaction_id: ChildTransactionId,
2643}
2644
2645impl SendReactionHandle {
2646 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2651 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2652 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2656 transaction_id: self.transaction_id.clone().into(),
2657 });
2658
2659 return Ok(true);
2660 }
2661
2662 let handle = SendHandle {
2665 room: self.room.clone(),
2666 transaction_id: self.transaction_id.clone().into(),
2667 media_handles: vec![],
2668 created_at: MilliSecondsSinceUnixEpoch::now(),
2669 };
2670
2671 handle.abort().await
2672 }
2673
2674 pub fn transaction_id(&self) -> &TransactionId {
2676 &self.transaction_id
2677 }
2678}
2679
2680fn canonicalize_dependent_requests(
2684 dependent: &[DependentQueuedRequest],
2685) -> Vec<DependentQueuedRequest> {
2686 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2687
2688 for d in dependent {
2689 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2690
2691 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2692 continue;
2695 }
2696
2697 match &d.kind {
2698 DependentQueuedRequestKind::EditEvent { .. } => {
2699 if let Some(prev_edit) = prevs
2701 .iter_mut()
2702 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2703 {
2704 *prev_edit = d;
2705 } else {
2706 prevs.insert(0, d);
2707 }
2708 }
2709
2710 DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2711 | DependentQueuedRequestKind::FinishUpload { .. }
2712 | DependentQueuedRequestKind::ReactEvent { .. } => {
2713 prevs.push(d);
2715 }
2716
2717 #[cfg(feature = "unstable-msc4274")]
2718 DependentQueuedRequestKind::FinishGallery { .. } => {
2719 prevs.push(d);
2721 }
2722
2723 DependentQueuedRequestKind::RedactEvent => {
2724 prevs.clear();
2726 prevs.push(d);
2727 }
2728 }
2729 }
2730
2731 by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2732}
2733
2734#[cfg(all(test, not(target_family = "wasm")))]
2735mod tests {
2736 use std::{sync::Arc, time::Duration};
2737
2738 use assert_matches2::{assert_let, assert_matches};
2739 use matrix_sdk_base::store::{
2740 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2741 SerializableEventContent,
2742 };
2743 use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2744 use ruma::{
2745 events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2746 room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2747 };
2748
2749 use super::canonicalize_dependent_requests;
2750 use crate::{client::WeakClient, test_utils::logged_in_client};
2751
2752 #[test]
2753 fn test_canonicalize_dependent_events_created_at() {
2754 let txn = TransactionId::new();
2757 let created_at = MilliSecondsSinceUnixEpoch::now();
2758
2759 let edit = DependentQueuedRequest {
2760 own_transaction_id: ChildTransactionId::new(),
2761 parent_transaction_id: txn.clone(),
2762 kind: DependentQueuedRequestKind::EditEvent {
2763 new_content: SerializableEventContent::new(
2764 &RoomMessageEventContent::text_plain("edit").into(),
2765 )
2766 .unwrap(),
2767 },
2768 parent_key: None,
2769 created_at,
2770 };
2771
2772 let res = canonicalize_dependent_requests(&[edit]);
2773
2774 assert_eq!(res.len(), 1);
2775 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2776 assert_let!(
2777 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2778 );
2779 assert_eq!(msg.body(), "edit");
2780 assert_eq!(res[0].parent_transaction_id, txn);
2781 assert_eq!(res[0].created_at, created_at);
2782 }
2783
2784 #[async_test]
2785 async fn test_client_no_cycle_with_send_queue() {
2786 for enabled in [true, false] {
2787 let client = logged_in_client(None).await;
2788 let weak_client = WeakClient::from_client(&client);
2789
2790 {
2791 let mut sync_response_builder = SyncResponseBuilder::new();
2792
2793 let room_id = room_id!("!a:b.c");
2794
2795 client
2797 .base_client()
2798 .receive_sync_response(
2799 sync_response_builder
2800 .add_joined_room(JoinedRoomBuilder::new(room_id))
2801 .build_sync_response(),
2802 )
2803 .await
2804 .unwrap();
2805
2806 let room = client.get_room(room_id).unwrap();
2807 let q = room.send_queue();
2808
2809 let _watcher = q.subscribe().await;
2810
2811 client.send_queue().set_enabled(enabled).await;
2812 }
2813
2814 drop(client);
2815
2816 tokio::time::sleep(Duration::from_millis(500)).await;
2818
2819 let client = weak_client.get();
2821 assert!(
2822 client.is_none(),
2823 "too many strong references to the client: {}",
2824 Arc::strong_count(&client.unwrap().inner)
2825 );
2826 }
2827 }
2828
2829 #[test]
2830 fn test_canonicalize_dependent_events_smoke_test() {
2831 let txn = TransactionId::new();
2833
2834 let edit = DependentQueuedRequest {
2835 own_transaction_id: ChildTransactionId::new(),
2836 parent_transaction_id: txn.clone(),
2837 kind: DependentQueuedRequestKind::EditEvent {
2838 new_content: SerializableEventContent::new(
2839 &RoomMessageEventContent::text_plain("edit").into(),
2840 )
2841 .unwrap(),
2842 },
2843 parent_key: None,
2844 created_at: MilliSecondsSinceUnixEpoch::now(),
2845 };
2846 let res = canonicalize_dependent_requests(&[edit]);
2847
2848 assert_eq!(res.len(), 1);
2849 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2850 assert_eq!(res[0].parent_transaction_id, txn);
2851 assert!(res[0].parent_key.is_none());
2852 }
2853
2854 #[test]
2855 fn test_canonicalize_dependent_events_redaction_preferred() {
2856 let txn = TransactionId::new();
2858
2859 let mut inputs = Vec::with_capacity(100);
2860 let redact = DependentQueuedRequest {
2861 own_transaction_id: ChildTransactionId::new(),
2862 parent_transaction_id: txn.clone(),
2863 kind: DependentQueuedRequestKind::RedactEvent,
2864 parent_key: None,
2865 created_at: MilliSecondsSinceUnixEpoch::now(),
2866 };
2867
2868 let edit = DependentQueuedRequest {
2869 own_transaction_id: ChildTransactionId::new(),
2870 parent_transaction_id: txn.clone(),
2871 kind: DependentQueuedRequestKind::EditEvent {
2872 new_content: SerializableEventContent::new(
2873 &RoomMessageEventContent::text_plain("edit").into(),
2874 )
2875 .unwrap(),
2876 },
2877 parent_key: None,
2878 created_at: MilliSecondsSinceUnixEpoch::now(),
2879 };
2880
2881 inputs.push({
2882 let mut edit = edit.clone();
2883 edit.own_transaction_id = ChildTransactionId::new();
2884 edit
2885 });
2886
2887 inputs.push(redact);
2888
2889 for _ in 0..98 {
2890 let mut edit = edit.clone();
2891 edit.own_transaction_id = ChildTransactionId::new();
2892 inputs.push(edit);
2893 }
2894
2895 let res = canonicalize_dependent_requests(&inputs);
2896
2897 assert_eq!(res.len(), 1);
2898 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2899 assert_eq!(res[0].parent_transaction_id, txn);
2900 }
2901
2902 #[test]
2903 fn test_canonicalize_dependent_events_last_edit_preferred() {
2904 let parent_txn = TransactionId::new();
2905
2906 let inputs = (0..10)
2908 .map(|i| DependentQueuedRequest {
2909 own_transaction_id: ChildTransactionId::new(),
2910 parent_transaction_id: parent_txn.clone(),
2911 kind: DependentQueuedRequestKind::EditEvent {
2912 new_content: SerializableEventContent::new(
2913 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2914 )
2915 .unwrap(),
2916 },
2917 parent_key: None,
2918 created_at: MilliSecondsSinceUnixEpoch::now(),
2919 })
2920 .collect::<Vec<_>>();
2921
2922 let txn = inputs[9].parent_transaction_id.clone();
2923
2924 let res = canonicalize_dependent_requests(&inputs);
2925
2926 assert_eq!(res.len(), 1);
2927 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2928 assert_let!(
2929 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2930 );
2931 assert_eq!(msg.body(), "edit9");
2932 assert_eq!(res[0].parent_transaction_id, txn);
2933 }
2934
2935 #[test]
2936 fn test_canonicalize_multiple_local_echoes() {
2937 let txn1 = TransactionId::new();
2938 let txn2 = TransactionId::new();
2939
2940 let child1 = ChildTransactionId::new();
2941 let child2 = ChildTransactionId::new();
2942
2943 let inputs = vec![
2944 DependentQueuedRequest {
2946 own_transaction_id: child1.clone(),
2947 kind: DependentQueuedRequestKind::RedactEvent,
2948 parent_transaction_id: txn1.clone(),
2949 parent_key: None,
2950 created_at: MilliSecondsSinceUnixEpoch::now(),
2951 },
2952 DependentQueuedRequest {
2954 own_transaction_id: child2,
2955 kind: DependentQueuedRequestKind::EditEvent {
2956 new_content: SerializableEventContent::new(
2957 &RoomMessageEventContent::text_plain("edit").into(),
2958 )
2959 .unwrap(),
2960 },
2961 parent_transaction_id: txn2.clone(),
2962 parent_key: None,
2963 created_at: MilliSecondsSinceUnixEpoch::now(),
2964 },
2965 ];
2966
2967 let res = canonicalize_dependent_requests(&inputs);
2968
2969 assert_eq!(res.len(), 2);
2971
2972 for dependent in res {
2973 if dependent.own_transaction_id == child1 {
2974 assert_eq!(dependent.parent_transaction_id, txn1);
2975 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2976 } else {
2977 assert_eq!(dependent.parent_transaction_id, txn2);
2978 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2979 }
2980 }
2981 }
2982
2983 #[test]
2984 fn test_canonicalize_reactions_after_edits() {
2985 let txn = TransactionId::new();
2987
2988 let react_id = ChildTransactionId::new();
2989 let react = DependentQueuedRequest {
2990 own_transaction_id: react_id.clone(),
2991 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2992 parent_transaction_id: txn.clone(),
2993 parent_key: None,
2994 created_at: MilliSecondsSinceUnixEpoch::now(),
2995 };
2996
2997 let edit_id = ChildTransactionId::new();
2998 let edit = DependentQueuedRequest {
2999 own_transaction_id: edit_id.clone(),
3000 kind: DependentQueuedRequestKind::EditEvent {
3001 new_content: SerializableEventContent::new(
3002 &RoomMessageEventContent::text_plain("edit").into(),
3003 )
3004 .unwrap(),
3005 },
3006 parent_transaction_id: txn,
3007 parent_key: None,
3008 created_at: MilliSecondsSinceUnixEpoch::now(),
3009 };
3010
3011 let res = canonicalize_dependent_requests(&[react, edit]);
3012
3013 assert_eq!(res.len(), 2);
3014 assert_eq!(res[0].own_transaction_id, edit_id);
3015 assert_eq!(res[1].own_transaction_id, react_id);
3016 }
3017}