1use std::{
132 collections::{BTreeMap, HashMap},
133 str::FromStr as _,
134 sync::{
135 Arc, RwLock,
136 atomic::{AtomicBool, Ordering},
137 },
138};
139
140use eyeball::SharedObservable;
141#[cfg(feature = "e2e-encryption")]
142use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
143#[cfg(feature = "unstable-msc4274")]
144use matrix_sdk_base::store::FinishGalleryItemInfo;
145use matrix_sdk_base::{
146 RoomState, StoreError,
147 cross_process_lock::CrossProcessLockError,
148 deserialized_responses::{EncryptionInfo, TimelineEvent},
149 event_cache::store::EventCacheStoreError,
150 media::{MediaRequestParameters, store::MediaStoreError},
151 store::{
152 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
153 FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
154 SentMediaInfo, SentRequestKey, SerializableEventContent,
155 },
156};
157use matrix_sdk_common::{
158 executor::{JoinHandle, spawn},
159 locks::Mutex as SyncMutex,
160};
161use mime::Mime;
162use ruma::{
163 MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
164 TransactionId,
165 events::{
166 AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
167 reaction::ReactionEventContent,
168 relation::Annotation,
169 room::{
170 MediaSource,
171 message::{FormattedBody, RoomMessageEventContent},
172 },
173 },
174 serde::Raw,
175};
176use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
177use tracing::{debug, error, info, instrument, trace, warn};
178
179use crate::{
180 Client, Media, Room, TransmissionProgress,
181 client::WeakClient,
182 config::RequestConfig,
183 error::RetryKind,
184 room::{WeakRoom, edit::EditedContent},
185};
186
187mod progress;
188mod upload;
189
190pub use progress::AbstractProgress;
191
192pub struct SendQueue {
194 client: Client,
195}
196
197#[cfg(not(tarpaulin_include))]
198impl std::fmt::Debug for SendQueue {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 f.debug_struct("SendQueue").finish_non_exhaustive()
201 }
202}
203
204impl SendQueue {
205 pub(super) fn new(client: Client) -> Self {
206 Self { client }
207 }
208
209 pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
212 if !self.is_enabled() {
213 return;
214 }
215
216 let room_ids =
217 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
218 |err| {
219 warn!("error when loading rooms with unsent requests: {err}");
220 Vec::new()
221 },
222 );
223
224 for room_id in room_ids {
226 if let Some(room) = self.client.get_room(&room_id) {
227 let _ = self.for_room(room);
228 }
229 }
230 }
231
232 #[inline(always)]
234 fn data(&self) -> &SendQueueData {
235 &self.client.inner.send_queue_data
236 }
237
238 pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
241 let data = self.data();
242
243 let mut map = data.rooms.write().unwrap();
244
245 let room_id = room.room_id();
246 if let Some(room_q) = map.get(room_id).cloned() {
247 return room_q;
248 }
249
250 let owned_room_id = room_id.to_owned();
251 let room_q = RoomSendQueue::new(
252 self.is_enabled(),
253 data.global_update_sender.clone(),
254 data.error_sender.clone(),
255 data.is_dropping.clone(),
256 &self.client,
257 owned_room_id.clone(),
258 data.report_media_upload_progress.clone(),
259 );
260
261 map.insert(owned_room_id, room_q.clone());
262
263 room_q
264 }
265
266 pub async fn set_enabled(&self, enabled: bool) {
276 debug!(?enabled, "setting global send queue enablement");
277
278 self.data().globally_enabled.store(enabled, Ordering::SeqCst);
279
280 for room in self.data().rooms.read().unwrap().values() {
282 room.set_enabled(enabled);
283 }
284
285 self.respawn_tasks_for_rooms_with_unsent_requests().await;
288 }
289
290 pub fn is_enabled(&self) -> bool {
293 self.data().globally_enabled.load(Ordering::SeqCst)
294 }
295
296 pub fn enable_upload_progress(&self, enabled: bool) {
298 self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
299 }
300
301 pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
306 self.data().global_update_sender.subscribe()
307 }
308
309 pub async fn local_echoes(
311 &self,
312 ) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
313 let room_ids =
314 self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
315 |err| {
316 warn!("error when loading rooms with unsent requests: {err}");
317 Vec::new()
318 },
319 );
320
321 let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
322
323 for room_id in room_ids {
324 if let Some(room) = self.client.get_room(&room_id) {
325 let queue = self.for_room(room);
326 local_echoes
327 .insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
328 }
329 }
330
331 Ok(local_echoes)
332 }
333
334 pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
337 self.data().error_sender.subscribe()
338 }
339}
340
341#[derive(Clone, Debug)]
344struct QueueThumbnailInfo {
345 finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
347
348 media_request_parameters: MediaRequestParameters,
350
351 content_type: Mime,
353
354 file_size: usize,
356}
357
358#[derive(Clone, Debug)]
360pub struct SendQueueRoomError {
361 pub room_id: OwnedRoomId,
363
364 pub error: Arc<crate::Error>,
366
367 pub is_recoverable: bool,
373}
374
375impl Client {
376 pub fn send_queue(&self) -> SendQueue {
379 SendQueue::new(self.clone())
380 }
381}
382
383pub(super) struct SendQueueData {
384 rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
386
387 globally_enabled: AtomicBool,
392
393 global_update_sender: broadcast::Sender<SendQueueUpdate>,
397
398 error_sender: broadcast::Sender<SendQueueRoomError>,
400
401 is_dropping: Arc<AtomicBool>,
403
404 report_media_upload_progress: Arc<AtomicBool>,
406}
407
408impl SendQueueData {
409 pub fn new(globally_enabled: bool) -> Self {
411 let (global_update_sender, _) = broadcast::channel(32);
412 let (error_sender, _) = broadcast::channel(32);
413
414 Self {
415 rooms: Default::default(),
416 globally_enabled: AtomicBool::new(globally_enabled),
417 global_update_sender,
418 error_sender,
419 is_dropping: Arc::new(false.into()),
420 report_media_upload_progress: Arc::new(false.into()),
421 }
422 }
423}
424
425impl Drop for SendQueueData {
426 fn drop(&mut self) {
427 debug!("globally dropping the send queue");
430 self.is_dropping.store(true, Ordering::SeqCst);
431
432 let rooms = self.rooms.read().unwrap();
433 for room in rooms.values() {
434 room.inner.notifier.notify_one();
435 }
436 }
437}
438
439impl Room {
440 pub fn send_queue(&self) -> RoomSendQueue {
442 self.client.send_queue().for_room(self.clone())
443 }
444}
445
446#[derive(Clone)]
450pub struct RoomSendQueue {
451 inner: Arc<RoomSendQueueInner>,
452}
453
454#[cfg(not(tarpaulin_include))]
455impl std::fmt::Debug for RoomSendQueue {
456 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
457 f.debug_struct("RoomSendQueue").finish_non_exhaustive()
458 }
459}
460
461impl RoomSendQueue {
462 fn new(
463 globally_enabled: bool,
464 global_update_sender: broadcast::Sender<SendQueueUpdate>,
465 global_error_sender: broadcast::Sender<SendQueueRoomError>,
466 is_dropping: Arc<AtomicBool>,
467 client: &Client,
468 room_id: OwnedRoomId,
469 report_media_upload_progress: Arc<AtomicBool>,
470 ) -> Self {
471 let (update_sender, _) = broadcast::channel(32);
472
473 let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
474 let notifier = Arc::new(Notify::new());
475
476 let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
477 let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
478
479 let task = spawn(Self::sending_task(
480 weak_room.clone(),
481 queue.clone(),
482 notifier.clone(),
483 global_update_sender.clone(),
484 update_sender.clone(),
485 locally_enabled.clone(),
486 global_error_sender,
487 is_dropping,
488 report_media_upload_progress,
489 ));
490
491 Self {
492 inner: Arc::new(RoomSendQueueInner {
493 room: weak_room,
494 global_update_sender,
495 update_sender,
496 _task: task,
497 queue,
498 notifier,
499 locally_enabled,
500 }),
501 }
502 }
503
504 pub async fn send_raw(
519 &self,
520 content: Raw<AnyMessageLikeEventContent>,
521 event_type: String,
522 ) -> Result<SendHandle, RoomSendQueueError> {
523 let Some(room) = self.inner.room.get() else {
524 return Err(RoomSendQueueError::RoomDisappeared);
525 };
526 if room.state() != RoomState::Joined {
527 return Err(RoomSendQueueError::RoomNotJoined);
528 }
529
530 let content = SerializableEventContent::from_raw(content, event_type);
531
532 let created_at = MilliSecondsSinceUnixEpoch::now();
533 let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
534 trace!(%transaction_id, "manager sends a raw event to the background task");
535
536 self.inner.notifier.notify_one();
537
538 let send_handle = SendHandle {
539 room: self.clone(),
540 transaction_id: transaction_id.clone(),
541 media_handles: vec![],
542 created_at,
543 };
544
545 self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
546 transaction_id,
547 content: LocalEchoContent::Event {
548 serialized_event: content,
549 send_handle: send_handle.clone(),
550 send_error: None,
551 },
552 }));
553
554 Ok(send_handle)
555 }
556
557 pub async fn send(
572 &self,
573 content: AnyMessageLikeEventContent,
574 ) -> Result<SendHandle, RoomSendQueueError> {
575 self.send_raw(
576 Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
577 content.event_type().to_string(),
578 )
579 .await
580 }
581
582 pub async fn subscribe(
588 &self,
589 ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
590 {
591 let local_echoes = self.inner.queue.local_echoes(self).await?;
592
593 Ok((local_echoes, self.inner.update_sender.subscribe()))
594 }
595
596 #[allow(clippy::too_many_arguments)]
602 #[instrument(skip_all, fields(room_id = %room.room_id()))]
603 async fn sending_task(
604 room: WeakRoom,
605 queue: QueueStorage,
606 notifier: Arc<Notify>,
607 global_update_sender: broadcast::Sender<SendQueueUpdate>,
608 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
609 locally_enabled: Arc<AtomicBool>,
610 global_error_sender: broadcast::Sender<SendQueueRoomError>,
611 is_dropping: Arc<AtomicBool>,
612 report_media_upload_progress: Arc<AtomicBool>,
613 ) {
614 trace!("spawned the sending task");
615
616 let room_id = room.room_id();
617
618 loop {
619 if is_dropping.load(Ordering::SeqCst) {
621 trace!("shutting down!");
622 break;
623 }
624
625 let mut new_updates = Vec::new();
628 if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
629 warn!("errors when applying dependent requests: {err}");
630 }
631
632 for up in new_updates {
633 send_update(&global_update_sender, &update_sender, room_id, up);
634 }
635
636 if !locally_enabled.load(Ordering::SeqCst) {
637 trace!("not enabled, sleeping");
638 notifier.notified().await;
640 continue;
641 }
642
643 let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
644 Ok(Some(request)) => request,
645
646 Ok(None) => {
647 trace!("queue is empty, sleeping");
648 notifier.notified().await;
650 continue;
651 }
652
653 Err(err) => {
654 warn!("error when loading next request to send: {err}");
655 continue;
656 }
657 };
658
659 let txn_id = queued_request.transaction_id.clone();
660 trace!(txn_id = %txn_id, "received a request to send!");
661
662 let Some(room) = room.get() else {
663 if is_dropping.load(Ordering::SeqCst) {
664 break;
665 }
666 error!("the weak room couldn't be upgraded but we're not shutting down?");
667 continue;
668 };
669
670 let (related_txn_id, media_upload_progress_info, http_progress) =
675 if let QueuedRequestKind::MediaUpload {
676 cache_key,
677 thumbnail_source,
678 #[cfg(feature = "unstable-msc4274")]
679 accumulated,
680 related_to,
681 ..
682 } = &queued_request.kind
683 {
684 let (media_upload_progress_info, http_progress) =
687 if report_media_upload_progress.load(Ordering::SeqCst) {
688 let media_upload_progress_info =
689 RoomSendQueue::create_media_upload_progress_info(
690 &queued_request.transaction_id,
691 related_to,
692 cache_key,
693 thumbnail_source.as_ref(),
694 #[cfg(feature = "unstable-msc4274")]
695 accumulated,
696 &room,
697 &queue,
698 )
699 .await;
700
701 let progress = RoomSendQueue::create_media_upload_progress_observable(
702 &media_upload_progress_info,
703 related_to,
704 &update_sender,
705 );
706
707 (Some(media_upload_progress_info), Some(progress))
708 } else {
709 Default::default()
710 };
711
712 (Some(related_to.clone()), media_upload_progress_info, http_progress)
713 } else {
714 Default::default()
715 };
716
717 match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
718 {
719 Ok((Some(parent_key), encryption_info)) => match queue
720 .mark_as_sent(&txn_id, parent_key.clone())
721 .await
722 {
723 Ok(()) => match parent_key {
724 SentRequestKey::Event { event_id, event, event_type } => {
725 send_update(
726 &global_update_sender,
727 &update_sender,
728 room_id,
729 RoomSendQueueUpdate::SentEvent {
730 transaction_id: txn_id,
731 event_id: event_id.clone(),
732 },
733 );
734
735 if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
755 {
756 let timeline_event = match Raw::from_json_string(
757 format!(
759 "{{\
760 \"event_id\":\"{event_id}\",\
761 \"origin_server_ts\":{ts},\
762 \"sender\":\"{sender}\",\
763 \"type\":\"{type}\",\
764 \"content\":{content}\
765 }}",
766 event_id = event_id,
767 ts = MilliSecondsSinceUnixEpoch::now().get(),
768 sender = room.client().user_id().expect("Client must be logged-in"),
769 type = event_type,
770 content = event.into_json(),
771 ),
772 ) {
773 Ok(event) => match encryption_info {
774 #[cfg(feature = "e2e-encryption")]
775 Some(encryption_info) => {
776 use matrix_sdk_base::deserialized_responses::DecryptedRoomEvent;
777 let decrypted_event = DecryptedRoomEvent {
778 event: event.cast_unchecked(),
779 encryption_info: Arc::new(encryption_info),
780 unsigned_encryption_info: None,
781 };
782 Some(TimelineEvent::from_decrypted(
783 decrypted_event,
784 None,
785 ))
786 }
787 _ => Some(TimelineEvent::from_plaintext(event)),
788 },
789 Err(err) => {
790 error!(
791 ?err,
792 "Failed to build the (sync) event before the saving in the Event Cache"
793 );
794 None
795 }
796 };
797
798 if let Some(timeline_event) = timeline_event
802 && let Err(err) = room_event_cache
803 .insert_sent_event_from_send_queue(timeline_event)
804 .await
805 {
806 error!(
807 ?err,
808 "Failed to save the sent event in the Event Cache"
809 );
810 }
811 } else {
812 info!(
813 "Cannot insert the sent event in the Event Cache because \
814 either the room no longer exists, or the Room Event Cache cannot be retrieved"
815 );
816 }
817 }
818
819 SentRequestKey::Media(sent_media_info) => {
820 let index =
823 media_upload_progress_info.as_ref().map_or(0, |info| info.index);
824 let progress = media_upload_progress_info
825 .as_ref()
826 .map(|info| {
827 AbstractProgress { current: info.bytes, total: info.bytes }
828 + info.offsets
829 })
830 .unwrap_or(AbstractProgress { current: 1, total: 1 });
831
832 let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
835 related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
836 file: Some(sent_media_info.file),
837 index,
838 progress,
839 });
840 }
841 },
842
843 Err(err) => {
844 warn!("unable to mark queued request as sent: {err}");
845 }
846 },
847
848 Ok((None, _)) => {
849 debug!("Request has been aborted while running, continuing.");
850 }
851
852 Err(err) => {
853 let is_recoverable = match err {
854 crate::Error::Http(ref http_err) => {
855 matches!(
857 http_err.retry_kind(),
858 RetryKind::Transient { .. } | RetryKind::NetworkFailure
859 )
860 }
861
862 crate::Error::ConcurrentRequestFailed => true,
867
868 _ => false,
870 };
871
872 locally_enabled.store(false, Ordering::SeqCst);
874
875 if is_recoverable {
876 warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
877
878 queue.mark_as_not_being_sent(&txn_id).await;
881
882 } else {
888 warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
889
890 if let Err(storage_error) =
892 queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
893 {
894 warn!("unable to mark request as wedged: {storage_error}");
895 }
896 }
897
898 let error = Arc::new(err);
899
900 let _ = global_error_sender.send(SendQueueRoomError {
901 room_id: room_id.to_owned(),
902 error: error.clone(),
903 is_recoverable,
904 });
905
906 send_update(
907 &global_update_sender,
908 &update_sender,
909 room_id,
910 RoomSendQueueUpdate::SendError {
911 transaction_id: related_txn_id.unwrap_or(txn_id),
912 error,
913 is_recoverable,
914 },
915 );
916 }
917 }
918 }
919
920 info!("exited sending task");
921 }
922
923 async fn handle_request(
927 room: &Room,
928 request: QueuedRequest,
929 cancel_upload_rx: Option<oneshot::Receiver<()>>,
930 progress: Option<SharedObservable<TransmissionProgress>>,
931 ) -> Result<(Option<SentRequestKey>, Option<EncryptionInfo>), crate::Error> {
932 match request.kind {
933 QueuedRequestKind::Event { content } => {
934 let (event, event_type) = content.into_raw();
935
936 let result = room
937 .send_raw(&event_type, &event)
938 .with_transaction_id(&request.transaction_id)
939 .with_request_config(RequestConfig::short_retry())
940 .await?;
941
942 trace!(txn_id = %request.transaction_id, event_id = %result.response.event_id, "event successfully sent");
943
944 Ok((
945 Some(SentRequestKey::Event {
946 event_id: result.response.event_id,
947 event,
948 event_type,
949 }),
950 result.encryption_info,
951 ))
952 }
953
954 QueuedRequestKind::MediaUpload {
955 content_type,
956 cache_key,
957 thumbnail_source,
958 related_to: relates_to,
959 #[cfg(feature = "unstable-msc4274")]
960 accumulated,
961 } => {
962 trace!(%relates_to, "uploading media related to event");
963
964 let fut = async move {
965 let data = room
966 .client()
967 .media_store()
968 .lock()
969 .await?
970 .get_media_content(&cache_key)
971 .await?
972 .ok_or(crate::Error::SendQueueWedgeError(Box::new(
973 QueueWedgeError::MissingMediaContent,
974 )))?;
975
976 let mime = Mime::from_str(&content_type).map_err(|_| {
977 crate::Error::SendQueueWedgeError(Box::new(
978 QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
979 ))
980 })?;
981
982 #[cfg(feature = "e2e-encryption")]
983 let media_source = if room.latest_encryption_state().await?.is_encrypted() {
984 trace!("upload will be encrypted (encrypted room)");
985
986 let mut cursor = std::io::Cursor::new(data);
987 let mut req = room
988 .client
989 .upload_encrypted_file(&mut cursor)
990 .with_request_config(RequestConfig::short_retry());
991 if let Some(progress) = progress {
992 req = req.with_send_progress_observable(progress);
993 }
994 let encrypted_file = req.await?;
995
996 MediaSource::Encrypted(Box::new(encrypted_file))
997 } else {
998 trace!("upload will be in clear text (room without encryption)");
999
1000 let request_config = RequestConfig::short_retry()
1001 .timeout(Media::reasonable_upload_timeout(&data));
1002 let mut req =
1003 room.client().media().upload(&mime, data, Some(request_config));
1004 if let Some(progress) = progress {
1005 req = req.with_send_progress_observable(progress);
1006 }
1007 let res = req.await?;
1008
1009 MediaSource::Plain(res.content_uri)
1010 };
1011
1012 #[cfg(not(feature = "e2e-encryption"))]
1013 let media_source = {
1014 let request_config = RequestConfig::short_retry()
1015 .timeout(Media::reasonable_upload_timeout(&data));
1016 let mut req =
1017 room.client().media().upload(&mime, data, Some(request_config));
1018 if let Some(progress) = progress {
1019 req = req.with_send_progress_observable(progress);
1020 }
1021 let res = req.await?;
1022 MediaSource::Plain(res.content_uri)
1023 };
1024
1025 let uri = match &media_source {
1026 MediaSource::Plain(uri) => uri,
1027 MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
1028 };
1029 trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
1030
1031 Ok((
1032 Some(SentRequestKey::Media(SentMediaInfo {
1033 file: media_source,
1034 thumbnail: thumbnail_source,
1035 #[cfg(feature = "unstable-msc4274")]
1036 accumulated,
1037 })),
1038 None,
1039 ))
1040 };
1041
1042 let wait_for_cancel = async move {
1043 if let Some(rx) = cancel_upload_rx {
1044 rx.await
1045 } else {
1046 std::future::pending().await
1047 }
1048 };
1049
1050 tokio::select! {
1051 biased;
1052
1053 _ = wait_for_cancel => {
1054 Ok((None, None))
1055 }
1056
1057 res = fut => {
1058 res
1059 }
1060 }
1061 }
1062 }
1063 }
1064
1065 pub fn is_enabled(&self) -> bool {
1067 self.inner.locally_enabled.load(Ordering::SeqCst)
1068 }
1069
1070 pub fn set_enabled(&self, enabled: bool) {
1072 self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
1073
1074 if enabled {
1077 self.inner.notifier.notify_one();
1078 }
1079 }
1080
1081 fn send_update(&self, update: RoomSendQueueUpdate) {
1085 let _ = self.inner.update_sender.send(update.clone());
1086 let _ = self
1087 .inner
1088 .global_update_sender
1089 .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
1090 }
1091}
1092
1093fn send_update(
1094 global_update_sender: &broadcast::Sender<SendQueueUpdate>,
1095 update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
1096 room_id: &RoomId,
1097 update: RoomSendQueueUpdate,
1098) {
1099 let _ = update_sender.send(update.clone());
1100 let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
1101}
1102
1103impl From<&crate::Error> for QueueWedgeError {
1104 fn from(value: &crate::Error) -> Self {
1105 match value {
1106 #[cfg(feature = "e2e-encryption")]
1107 crate::Error::OlmError(error) => match &**error {
1108 OlmError::SessionRecipientCollectionError(error) => match error {
1109 SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
1110 QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
1111 }
1112
1113 SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
1114 QueueWedgeError::IdentityViolations { users: users.clone() }
1115 }
1116
1117 SessionRecipientCollectionError::CrossSigningNotSetup
1118 | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
1119 QueueWedgeError::CrossVerificationRequired
1120 }
1121 },
1122 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1123 },
1124
1125 crate::Error::SendQueueWedgeError(error) => *error.clone(),
1127
1128 _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1129 }
1130 }
1131}
1132
1133struct RoomSendQueueInner {
1134 room: WeakRoom,
1136
1137 global_update_sender: broadcast::Sender<SendQueueUpdate>,
1141
1142 update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1148
1149 queue: QueueStorage,
1156
1157 notifier: Arc<Notify>,
1160
1161 locally_enabled: Arc<AtomicBool>,
1164
1165 _task: JoinHandle<()>,
1168}
1169
1170struct BeingSentInfo {
1172 transaction_id: OwnedTransactionId,
1174
1175 cancel_upload: Option<oneshot::Sender<()>>,
1178}
1179
1180impl BeingSentInfo {
1181 fn cancel_upload(self) -> bool {
1186 if let Some(cancel_upload) = self.cancel_upload {
1187 let _ = cancel_upload.send(());
1188 true
1189 } else {
1190 false
1191 }
1192 }
1193}
1194
1195#[derive(Clone)]
1198struct StoreLock {
1199 client: WeakClient,
1201
1202 being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1207}
1208
1209impl StoreLock {
1210 async fn lock(&self) -> StoreLockGuard {
1212 StoreLockGuard {
1213 client: self.client.clone(),
1214 being_sent: self.being_sent.clone().lock_owned().await,
1215 }
1216 }
1217}
1218
1219struct StoreLockGuard {
1222 client: WeakClient,
1224
1225 being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1228}
1229
1230impl StoreLockGuard {
1231 fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1233 self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1234 }
1235}
1236
1237#[derive(Clone)]
1238struct QueueStorage {
1239 store: StoreLock,
1242
1243 room_id: OwnedRoomId,
1245
1246 thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1259}
1260
1261impl QueueStorage {
1262 const LOW_PRIORITY: usize = 0;
1264
1265 const HIGH_PRIORITY: usize = 10;
1267
1268 fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1270 Self {
1271 room_id: room,
1272 store: StoreLock { client, being_sent: Default::default() },
1273 thumbnail_file_sizes: Default::default(),
1274 }
1275 }
1276
1277 async fn push(
1281 &self,
1282 request: QueuedRequestKind,
1283 created_at: MilliSecondsSinceUnixEpoch,
1284 ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1285 let transaction_id = TransactionId::new();
1286
1287 self.store
1288 .lock()
1289 .await
1290 .client()?
1291 .state_store()
1292 .save_send_queue_request(
1293 &self.room_id,
1294 transaction_id.clone(),
1295 created_at,
1296 request,
1297 Self::LOW_PRIORITY,
1298 )
1299 .await?;
1300
1301 Ok(transaction_id)
1302 }
1303
1304 async fn peek_next_to_send(
1309 &self,
1310 ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1311 {
1312 let mut guard = self.store.lock().await;
1313 let queued_requests =
1314 guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1315
1316 if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1317 let (cancel_upload_tx, cancel_upload_rx) =
1318 if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1319 let (tx, rx) = oneshot::channel();
1320 (Some(tx), Some(rx))
1321 } else {
1322 Default::default()
1323 };
1324
1325 let prev = guard.being_sent.replace(BeingSentInfo {
1326 transaction_id: request.transaction_id.clone(),
1327 cancel_upload: cancel_upload_tx,
1328 });
1329
1330 if let Some(prev) = prev {
1331 error!(
1332 prev_txn = ?prev.transaction_id,
1333 "a previous request was still active while picking a new one"
1334 );
1335 }
1336
1337 Ok(Some((request.clone(), cancel_upload_rx)))
1338 } else {
1339 Ok(None)
1340 }
1341 }
1342
1343 async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1347 let was_being_sent = self.store.lock().await.being_sent.take();
1348
1349 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1350 if prev_txn != Some(transaction_id) {
1351 error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1352 }
1353 }
1354
1355 async fn mark_as_wedged(
1359 &self,
1360 transaction_id: &TransactionId,
1361 reason: QueueWedgeError,
1362 ) -> Result<(), RoomSendQueueStorageError> {
1363 let mut guard = self.store.lock().await;
1365 let was_being_sent = guard.being_sent.take();
1366
1367 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1368 if prev_txn != Some(transaction_id) {
1369 error!(
1370 ?prev_txn,
1371 "previous active request didn't match that we expect (after permanent error)",
1372 );
1373 }
1374
1375 Ok(guard
1376 .client()?
1377 .state_store()
1378 .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1379 .await?)
1380 }
1381
1382 async fn mark_as_unwedged(
1385 &self,
1386 transaction_id: &TransactionId,
1387 ) -> Result<(), RoomSendQueueStorageError> {
1388 Ok(self
1389 .store
1390 .lock()
1391 .await
1392 .client()?
1393 .state_store()
1394 .update_send_queue_request_status(&self.room_id, transaction_id, None)
1395 .await?)
1396 }
1397
1398 async fn mark_as_sent(
1401 &self,
1402 transaction_id: &TransactionId,
1403 parent_key: SentRequestKey,
1404 ) -> Result<(), RoomSendQueueStorageError> {
1405 let mut guard = self.store.lock().await;
1407 let was_being_sent = guard.being_sent.take();
1408
1409 let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1410 if prev_txn != Some(transaction_id) {
1411 error!(
1412 ?prev_txn,
1413 "previous active request didn't match that we expect (after successful send)",
1414 );
1415 }
1416
1417 let client = guard.client()?;
1418 let store = client.state_store();
1419
1420 store
1422 .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1423 .await?;
1424
1425 let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1426
1427 if !removed {
1428 warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1429 }
1430
1431 self.thumbnail_file_sizes.lock().remove(transaction_id);
1432
1433 Ok(())
1434 }
1435
1436 async fn cancel_event(
1443 &self,
1444 transaction_id: &TransactionId,
1445 ) -> Result<bool, RoomSendQueueStorageError> {
1446 let guard = self.store.lock().await;
1447
1448 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1449 == Some(transaction_id)
1450 {
1451 guard
1453 .client()?
1454 .state_store()
1455 .save_dependent_queued_request(
1456 &self.room_id,
1457 transaction_id,
1458 ChildTransactionId::new(),
1459 MilliSecondsSinceUnixEpoch::now(),
1460 DependentQueuedRequestKind::RedactEvent,
1461 )
1462 .await?;
1463
1464 return Ok(true);
1465 }
1466
1467 let removed = guard
1468 .client()?
1469 .state_store()
1470 .remove_send_queue_request(&self.room_id, transaction_id)
1471 .await?;
1472
1473 self.thumbnail_file_sizes.lock().remove(transaction_id);
1474
1475 Ok(removed)
1476 }
1477
1478 async fn replace_event(
1485 &self,
1486 transaction_id: &TransactionId,
1487 serializable: SerializableEventContent,
1488 ) -> Result<bool, RoomSendQueueStorageError> {
1489 let guard = self.store.lock().await;
1490
1491 if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1492 == Some(transaction_id)
1493 {
1494 guard
1496 .client()?
1497 .state_store()
1498 .save_dependent_queued_request(
1499 &self.room_id,
1500 transaction_id,
1501 ChildTransactionId::new(),
1502 MilliSecondsSinceUnixEpoch::now(),
1503 DependentQueuedRequestKind::EditEvent { new_content: serializable },
1504 )
1505 .await?;
1506
1507 return Ok(true);
1508 }
1509
1510 let edited = guard
1511 .client()?
1512 .state_store()
1513 .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1514 .await?;
1515
1516 Ok(edited)
1517 }
1518
1519 #[allow(clippy::too_many_arguments)]
1523 async fn push_media(
1524 &self,
1525 event: RoomMessageEventContent,
1526 content_type: Mime,
1527 send_event_txn: OwnedTransactionId,
1528 created_at: MilliSecondsSinceUnixEpoch,
1529 upload_file_txn: OwnedTransactionId,
1530 file_media_request: MediaRequestParameters,
1531 thumbnail: Option<QueueThumbnailInfo>,
1532 ) -> Result<(), RoomSendQueueStorageError> {
1533 let guard = self.store.lock().await;
1534 let client = guard.client()?;
1535 let store = client.state_store();
1536
1537 let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1539
1540 let thumbnail_info = self
1541 .push_thumbnail_and_media_uploads(
1542 store,
1543 &content_type,
1544 send_event_txn.clone(),
1545 created_at,
1546 upload_file_txn.clone(),
1547 file_media_request,
1548 thumbnail,
1549 )
1550 .await?;
1551
1552 store
1554 .save_dependent_queued_request(
1555 &self.room_id,
1556 &upload_file_txn,
1557 send_event_txn.clone().into(),
1558 created_at,
1559 DependentQueuedRequestKind::FinishUpload {
1560 local_echo: Box::new(event),
1561 file_upload: upload_file_txn.clone(),
1562 thumbnail_info,
1563 },
1564 )
1565 .await?;
1566
1567 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1568
1569 Ok(())
1570 }
1571
1572 #[cfg(feature = "unstable-msc4274")]
1576 #[allow(clippy::too_many_arguments)]
1577 async fn push_gallery(
1578 &self,
1579 event: RoomMessageEventContent,
1580 send_event_txn: OwnedTransactionId,
1581 created_at: MilliSecondsSinceUnixEpoch,
1582 item_queue_infos: Vec<GalleryItemQueueInfo>,
1583 ) -> Result<(), RoomSendQueueStorageError> {
1584 let guard = self.store.lock().await;
1585 let client = guard.client()?;
1586 let store = client.state_store();
1587
1588 let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1589 let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1590
1591 let Some((first, rest)) = item_queue_infos.split_first() else {
1592 return Ok(());
1593 };
1594
1595 let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1596 first;
1597
1598 let thumbnail_info = self
1599 .push_thumbnail_and_media_uploads(
1600 store,
1601 content_type,
1602 send_event_txn.clone(),
1603 created_at,
1604 upload_file_txn.clone(),
1605 file_media_request.clone(),
1606 thumbnail.clone(),
1607 )
1608 .await?;
1609
1610 finish_item_infos
1611 .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1612 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1613
1614 let mut last_upload_file_txn = upload_file_txn.clone();
1615
1616 for item_queue_info in rest {
1617 let GalleryItemQueueInfo {
1618 content_type,
1619 upload_file_txn,
1620 file_media_request,
1621 thumbnail,
1622 } = item_queue_info;
1623
1624 let thumbnail_info = if let Some(QueueThumbnailInfo {
1625 finish_upload_thumbnail_info: thumbnail_info,
1626 media_request_parameters: thumbnail_media_request,
1627 content_type: thumbnail_content_type,
1628 ..
1629 }) = thumbnail
1630 {
1631 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1632
1633 store
1636 .save_dependent_queued_request(
1637 &self.room_id,
1638 &last_upload_file_txn,
1639 upload_thumbnail_txn.clone().into(),
1640 created_at,
1641 DependentQueuedRequestKind::UploadFileOrThumbnail {
1642 content_type: thumbnail_content_type.to_string(),
1643 cache_key: thumbnail_media_request.clone(),
1644 related_to: send_event_txn.clone(),
1645 parent_is_thumbnail_upload: false,
1646 },
1647 )
1648 .await?;
1649
1650 last_upload_file_txn = upload_thumbnail_txn;
1651
1652 Some(thumbnail_info)
1653 } else {
1654 None
1655 };
1656
1657 store
1659 .save_dependent_queued_request(
1660 &self.room_id,
1661 &last_upload_file_txn,
1662 upload_file_txn.clone().into(),
1663 created_at,
1664 DependentQueuedRequestKind::UploadFileOrThumbnail {
1665 content_type: content_type.to_string(),
1666 cache_key: file_media_request.clone(),
1667 related_to: send_event_txn.clone(),
1668 parent_is_thumbnail_upload: thumbnail.is_some(),
1669 },
1670 )
1671 .await?;
1672
1673 finish_item_infos.push(FinishGalleryItemInfo {
1674 file_upload: upload_file_txn.clone(),
1675 thumbnail_info: thumbnail_info.cloned(),
1676 });
1677 thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1678
1679 last_upload_file_txn = upload_file_txn.clone();
1680 }
1681
1682 store
1685 .save_dependent_queued_request(
1686 &self.room_id,
1687 &last_upload_file_txn,
1688 send_event_txn.clone().into(),
1689 created_at,
1690 DependentQueuedRequestKind::FinishGallery {
1691 local_echo: Box::new(event),
1692 item_infos: finish_item_infos,
1693 },
1694 )
1695 .await?;
1696
1697 self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1698
1699 Ok(())
1700 }
1701
1702 #[allow(clippy::too_many_arguments)]
1708 async fn push_thumbnail_and_media_uploads(
1709 &self,
1710 store: &DynStateStore,
1711 content_type: &Mime,
1712 send_event_txn: OwnedTransactionId,
1713 created_at: MilliSecondsSinceUnixEpoch,
1714 upload_file_txn: OwnedTransactionId,
1715 file_media_request: MediaRequestParameters,
1716 thumbnail: Option<QueueThumbnailInfo>,
1717 ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1718 if let Some(QueueThumbnailInfo {
1719 finish_upload_thumbnail_info: thumbnail_info,
1720 media_request_parameters: thumbnail_media_request,
1721 content_type: thumbnail_content_type,
1722 ..
1723 }) = thumbnail
1724 {
1725 let upload_thumbnail_txn = thumbnail_info.txn.clone();
1726
1727 store
1729 .save_send_queue_request(
1730 &self.room_id,
1731 upload_thumbnail_txn.clone(),
1732 created_at,
1733 QueuedRequestKind::MediaUpload {
1734 content_type: thumbnail_content_type.to_string(),
1735 cache_key: thumbnail_media_request,
1736 thumbnail_source: None, related_to: send_event_txn.clone(),
1738 #[cfg(feature = "unstable-msc4274")]
1739 accumulated: vec![],
1740 },
1741 Self::LOW_PRIORITY,
1742 )
1743 .await?;
1744
1745 store
1747 .save_dependent_queued_request(
1748 &self.room_id,
1749 &upload_thumbnail_txn,
1750 upload_file_txn.into(),
1751 created_at,
1752 DependentQueuedRequestKind::UploadFileOrThumbnail {
1753 content_type: content_type.to_string(),
1754 cache_key: file_media_request,
1755 related_to: send_event_txn,
1756 parent_is_thumbnail_upload: true,
1757 },
1758 )
1759 .await?;
1760
1761 Ok(Some(thumbnail_info))
1762 } else {
1763 store
1765 .save_send_queue_request(
1766 &self.room_id,
1767 upload_file_txn,
1768 created_at,
1769 QueuedRequestKind::MediaUpload {
1770 content_type: content_type.to_string(),
1771 cache_key: file_media_request,
1772 thumbnail_source: None,
1773 related_to: send_event_txn,
1774 #[cfg(feature = "unstable-msc4274")]
1775 accumulated: vec![],
1776 },
1777 Self::LOW_PRIORITY,
1778 )
1779 .await?;
1780
1781 Ok(None)
1782 }
1783 }
1784
1785 #[instrument(skip(self))]
1787 async fn react(
1788 &self,
1789 transaction_id: &TransactionId,
1790 key: String,
1791 created_at: MilliSecondsSinceUnixEpoch,
1792 ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1793 let guard = self.store.lock().await;
1794 let client = guard.client()?;
1795 let store = client.state_store();
1796
1797 let requests = store.load_send_queue_requests(&self.room_id).await?;
1798
1799 if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1801 let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1804 if !dependent_requests
1805 .into_iter()
1806 .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1807 .any(|child_txn| *child_txn == *transaction_id)
1808 {
1809 return Ok(None);
1811 }
1812 }
1813
1814 let reaction_txn_id = ChildTransactionId::new();
1816 store
1817 .save_dependent_queued_request(
1818 &self.room_id,
1819 transaction_id,
1820 reaction_txn_id.clone(),
1821 created_at,
1822 DependentQueuedRequestKind::ReactEvent { key },
1823 )
1824 .await?;
1825
1826 Ok(Some(reaction_txn_id))
1827 }
1828
1829 async fn local_echoes(
1832 &self,
1833 room: &RoomSendQueue,
1834 ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1835 let guard = self.store.lock().await;
1836 let client = guard.client()?;
1837 let store = client.state_store();
1838
1839 let local_requests =
1840 store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1841 Some(LocalEcho {
1842 transaction_id: queued.transaction_id.clone(),
1843 content: match queued.kind {
1844 QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1845 serialized_event: content,
1846 send_handle: SendHandle {
1847 room: room.clone(),
1848 transaction_id: queued.transaction_id,
1849 media_handles: vec![],
1850 created_at: queued.created_at,
1851 },
1852 send_error: queued.error,
1853 },
1854
1855 QueuedRequestKind::MediaUpload { .. } => {
1856 return None;
1859 }
1860 },
1861 })
1862 });
1863
1864 let reactions_and_medias = store
1865 .load_dependent_queued_requests(&self.room_id)
1866 .await?
1867 .into_iter()
1868 .filter_map(|dep| match dep.kind {
1869 DependentQueuedRequestKind::EditEvent { .. }
1870 | DependentQueuedRequestKind::RedactEvent => {
1871 None
1873 }
1874
1875 DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1876 transaction_id: dep.own_transaction_id.clone().into(),
1877 content: LocalEchoContent::React {
1878 key,
1879 send_handle: SendReactionHandle {
1880 room: room.clone(),
1881 transaction_id: dep.own_transaction_id,
1882 },
1883 applies_to: dep.parent_transaction_id,
1884 },
1885 }),
1886
1887 DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1888 None
1890 }
1891
1892 DependentQueuedRequestKind::FinishUpload {
1893 local_echo,
1894 file_upload,
1895 thumbnail_info,
1896 } => {
1897 Some(LocalEcho {
1899 transaction_id: dep.own_transaction_id.clone().into(),
1900 content: LocalEchoContent::Event {
1901 serialized_event: SerializableEventContent::new(&(*local_echo).into())
1902 .ok()?,
1903 send_handle: SendHandle {
1904 room: room.clone(),
1905 transaction_id: dep.own_transaction_id.into(),
1906 media_handles: vec![MediaHandles {
1907 upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1908 upload_file_txn: file_upload,
1909 }],
1910 created_at: dep.created_at,
1911 },
1912 send_error: None,
1913 },
1914 })
1915 }
1916
1917 #[cfg(feature = "unstable-msc4274")]
1918 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1919 self.create_gallery_local_echo(
1921 dep.own_transaction_id,
1922 room,
1923 dep.created_at,
1924 local_echo,
1925 item_infos,
1926 )
1927 }
1928 });
1929
1930 Ok(local_requests.chain(reactions_and_medias).collect())
1931 }
1932
1933 #[cfg(feature = "unstable-msc4274")]
1935 fn create_gallery_local_echo(
1936 &self,
1937 transaction_id: ChildTransactionId,
1938 room: &RoomSendQueue,
1939 created_at: MilliSecondsSinceUnixEpoch,
1940 local_echo: Box<RoomMessageEventContent>,
1941 item_infos: Vec<FinishGalleryItemInfo>,
1942 ) -> Option<LocalEcho> {
1943 Some(LocalEcho {
1944 transaction_id: transaction_id.clone().into(),
1945 content: LocalEchoContent::Event {
1946 serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1947 send_handle: SendHandle {
1948 room: room.clone(),
1949 transaction_id: transaction_id.into(),
1950 media_handles: item_infos
1951 .into_iter()
1952 .map(|i| MediaHandles {
1953 upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1954 upload_file_txn: i.file_upload,
1955 })
1956 .collect(),
1957 created_at,
1958 },
1959 send_error: None,
1960 },
1961 })
1962 }
1963
1964 #[instrument(skip_all)]
1972 async fn try_apply_single_dependent_request(
1973 &self,
1974 client: &Client,
1975 dependent_request: DependentQueuedRequest,
1976 new_updates: &mut Vec<RoomSendQueueUpdate>,
1977 ) -> Result<bool, RoomSendQueueError> {
1978 let store = client.state_store();
1979
1980 let parent_key = dependent_request.parent_key;
1981
1982 match dependent_request.kind {
1983 DependentQueuedRequestKind::EditEvent { new_content } => {
1984 if let Some(parent_key) = parent_key {
1985 let Some(event_id) = parent_key.into_event_id() else {
1986 return Err(RoomSendQueueError::StorageError(
1987 RoomSendQueueStorageError::InvalidParentKey,
1988 ));
1989 };
1990
1991 let room = client
1993 .get_room(&self.room_id)
1994 .ok_or(RoomSendQueueError::RoomDisappeared)?;
1995
1996 let edited_content = match new_content.deserialize() {
2000 Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
2001 EditedContent::RoomMessage(c.into())
2003 }
2004
2005 Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
2006 let poll_start = c.poll_start().clone();
2007 EditedContent::PollStart {
2008 fallback_text: poll_start.question.text.clone(),
2009 new_content: poll_start,
2010 }
2011 }
2012
2013 Ok(c) => {
2014 warn!("Unsupported edit content type: {:?}", c.event_type());
2015 return Ok(true);
2016 }
2017
2018 Err(err) => {
2019 warn!("Unable to deserialize: {err}");
2020 return Ok(true);
2021 }
2022 };
2023
2024 let edit_event = match room.make_edit_event(&event_id, edited_content).await {
2025 Ok(e) => e,
2026 Err(err) => {
2027 warn!("couldn't create edited event: {err}");
2028 return Ok(true);
2029 }
2030 };
2031
2032 let serializable = SerializableEventContent::from_raw(
2034 Raw::new(&edit_event)
2035 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2036 edit_event.event_type().to_string(),
2037 );
2038
2039 store
2040 .save_send_queue_request(
2041 &self.room_id,
2042 dependent_request.own_transaction_id.into(),
2043 dependent_request.created_at,
2044 serializable.into(),
2045 Self::HIGH_PRIORITY,
2046 )
2047 .await
2048 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2049 } else {
2050 let edited = store
2052 .update_send_queue_request(
2053 &self.room_id,
2054 &dependent_request.parent_transaction_id,
2055 new_content.into(),
2056 )
2057 .await
2058 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2059
2060 if !edited {
2061 warn!("missing local echo upon dependent edit");
2062 }
2063 }
2064 }
2065
2066 DependentQueuedRequestKind::RedactEvent => {
2067 if let Some(parent_key) = parent_key {
2068 let Some(event_id) = parent_key.into_event_id() else {
2069 return Err(RoomSendQueueError::StorageError(
2070 RoomSendQueueStorageError::InvalidParentKey,
2071 ));
2072 };
2073
2074 let room = client
2076 .get_room(&self.room_id)
2077 .ok_or(RoomSendQueueError::RoomDisappeared)?;
2078
2079 if let Err(err) = room
2087 .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
2088 .await
2089 {
2090 warn!("error when sending a redact for {event_id}: {err}");
2091 return Ok(false);
2092 }
2093 } else {
2094 let removed = store
2097 .remove_send_queue_request(
2098 &self.room_id,
2099 &dependent_request.parent_transaction_id,
2100 )
2101 .await
2102 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2103
2104 if !removed {
2105 warn!("missing local echo upon dependent redact");
2106 }
2107 }
2108 }
2109
2110 DependentQueuedRequestKind::ReactEvent { key } => {
2111 if let Some(parent_key) = parent_key {
2112 let Some(parent_event_id) = parent_key.into_event_id() else {
2113 return Err(RoomSendQueueError::StorageError(
2114 RoomSendQueueStorageError::InvalidParentKey,
2115 ));
2116 };
2117
2118 let react_event =
2120 ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
2121 let serializable = SerializableEventContent::from_raw(
2122 Raw::new(&react_event)
2123 .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2124 react_event.event_type().to_string(),
2125 );
2126
2127 store
2128 .save_send_queue_request(
2129 &self.room_id,
2130 dependent_request.own_transaction_id.into(),
2131 dependent_request.created_at,
2132 serializable.into(),
2133 Self::HIGH_PRIORITY,
2134 )
2135 .await
2136 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2137 } else {
2138 return Ok(false);
2140 }
2141 }
2142
2143 DependentQueuedRequestKind::UploadFileOrThumbnail {
2144 content_type,
2145 cache_key,
2146 related_to,
2147 parent_is_thumbnail_upload,
2148 } => {
2149 let Some(parent_key) = parent_key else {
2150 return Ok(false);
2152 };
2153 self.handle_dependent_file_or_thumbnail_upload(
2154 client,
2155 dependent_request.own_transaction_id.into(),
2156 parent_key,
2157 content_type,
2158 cache_key,
2159 related_to,
2160 parent_is_thumbnail_upload,
2161 )
2162 .await?;
2163 }
2164
2165 DependentQueuedRequestKind::FinishUpload {
2166 local_echo,
2167 file_upload,
2168 thumbnail_info,
2169 } => {
2170 let Some(parent_key) = parent_key else {
2171 return Ok(false);
2173 };
2174 self.handle_dependent_finish_upload(
2175 client,
2176 dependent_request.own_transaction_id.into(),
2177 parent_key,
2178 *local_echo,
2179 file_upload,
2180 thumbnail_info,
2181 new_updates,
2182 )
2183 .await?;
2184 }
2185
2186 #[cfg(feature = "unstable-msc4274")]
2187 DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2188 let Some(parent_key) = parent_key else {
2189 return Ok(false);
2191 };
2192 self.handle_dependent_finish_gallery_upload(
2193 client,
2194 dependent_request.own_transaction_id.into(),
2195 parent_key,
2196 *local_echo,
2197 item_infos,
2198 new_updates,
2199 )
2200 .await?;
2201 }
2202 }
2203
2204 Ok(true)
2205 }
2206
2207 #[instrument(skip(self))]
2208 async fn apply_dependent_requests(
2209 &self,
2210 new_updates: &mut Vec<RoomSendQueueUpdate>,
2211 ) -> Result<(), RoomSendQueueError> {
2212 let guard = self.store.lock().await;
2213
2214 let client = guard.client()?;
2215 let store = client.state_store();
2216
2217 let dependent_requests = store
2218 .load_dependent_queued_requests(&self.room_id)
2219 .await
2220 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2221
2222 let num_initial_dependent_requests = dependent_requests.len();
2223 if num_initial_dependent_requests == 0 {
2224 return Ok(());
2226 }
2227
2228 let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2229
2230 for original in &dependent_requests {
2232 if !canonicalized_dependent_requests
2233 .iter()
2234 .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2235 {
2236 store
2237 .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2238 .await
2239 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2240 }
2241 }
2242
2243 let mut num_dependent_requests = canonicalized_dependent_requests.len();
2244
2245 debug!(
2246 num_dependent_requests,
2247 num_initial_dependent_requests, "starting handling of dependent requests"
2248 );
2249
2250 for dependent in canonicalized_dependent_requests {
2251 let dependent_id = dependent.own_transaction_id.clone();
2252
2253 match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2254 Ok(should_remove) => {
2255 if should_remove {
2256 store
2258 .remove_dependent_queued_request(&self.room_id, &dependent_id)
2259 .await
2260 .map_err(RoomSendQueueStorageError::StateStoreError)?;
2261
2262 num_dependent_requests -= 1;
2263 }
2264 }
2265
2266 Err(err) => {
2267 warn!("error when applying single dependent request: {err}");
2268 }
2269 }
2270 }
2271
2272 debug!(
2273 leftover_dependent_requests = num_dependent_requests,
2274 "stopped handling dependent request"
2275 );
2276
2277 Ok(())
2278 }
2279
2280 async fn remove_dependent_send_queue_request(
2282 &self,
2283 dependent_event_id: &ChildTransactionId,
2284 ) -> Result<bool, RoomSendQueueStorageError> {
2285 Ok(self
2286 .store
2287 .lock()
2288 .await
2289 .client()?
2290 .state_store()
2291 .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2292 .await?)
2293 }
2294}
2295
2296#[cfg(feature = "unstable-msc4274")]
2297struct GalleryItemQueueInfo {
2299 content_type: Mime,
2300 upload_file_txn: OwnedTransactionId,
2301 file_media_request: MediaRequestParameters,
2302 thumbnail: Option<QueueThumbnailInfo>,
2303}
2304
2305#[derive(Clone, Debug)]
2307pub enum LocalEchoContent {
2308 Event {
2310 serialized_event: SerializableEventContent,
2313 send_handle: SendHandle,
2315 send_error: Option<QueueWedgeError>,
2318 },
2319
2320 React {
2322 key: String,
2324 send_handle: SendReactionHandle,
2326 applies_to: OwnedTransactionId,
2328 },
2329}
2330
2331#[derive(Clone, Debug)]
2334pub struct LocalEcho {
2335 pub transaction_id: OwnedTransactionId,
2337 pub content: LocalEchoContent,
2339}
2340
2341#[derive(Clone, Debug)]
2344pub enum RoomSendQueueUpdate {
2345 NewLocalEvent(LocalEcho),
2350
2351 CancelledLocalEvent {
2354 transaction_id: OwnedTransactionId,
2356 },
2357
2358 ReplacedLocalEvent {
2360 transaction_id: OwnedTransactionId,
2362
2363 new_content: SerializableEventContent,
2365 },
2366
2367 SendError {
2372 transaction_id: OwnedTransactionId,
2374 error: Arc<crate::Error>,
2376 is_recoverable: bool,
2382 },
2383
2384 RetryEvent {
2386 transaction_id: OwnedTransactionId,
2388 },
2389
2390 SentEvent {
2393 transaction_id: OwnedTransactionId,
2395 event_id: OwnedEventId,
2397 },
2398
2399 MediaUpload {
2402 related_to: OwnedTransactionId,
2404
2405 file: Option<MediaSource>,
2407
2408 index: u64,
2412
2413 progress: AbstractProgress,
2417 },
2418}
2419
2420#[derive(Clone, Debug)]
2425pub struct SendQueueUpdate {
2426 pub room_id: OwnedRoomId,
2428
2429 pub update: RoomSendQueueUpdate,
2431}
2432
2433#[derive(Debug, thiserror::Error)]
2435pub enum RoomSendQueueError {
2436 #[error("the room isn't in the joined state")]
2438 RoomNotJoined,
2439
2440 #[error("the room is now missing from the client")]
2444 RoomDisappeared,
2445
2446 #[error(transparent)]
2448 StorageError(#[from] RoomSendQueueStorageError),
2449
2450 #[error("the attachment event could not be created")]
2452 FailedToCreateAttachment,
2453
2454 #[cfg(feature = "unstable-msc4274")]
2456 #[error("the gallery contains no items")]
2457 EmptyGallery,
2458
2459 #[cfg(feature = "unstable-msc4274")]
2461 #[error("the gallery event could not be created")]
2462 FailedToCreateGallery,
2463}
2464
2465#[derive(Debug, thiserror::Error)]
2467pub enum RoomSendQueueStorageError {
2468 #[error(transparent)]
2470 StateStoreError(#[from] StoreError),
2471
2472 #[error(transparent)]
2474 EventCacheStoreError(#[from] EventCacheStoreError),
2475
2476 #[error(transparent)]
2478 MediaStoreError(#[from] MediaStoreError),
2479
2480 #[error(transparent)]
2482 LockError(#[from] CrossProcessLockError),
2483
2484 #[error(transparent)]
2486 JsonSerialization(#[from] serde_json::Error),
2487
2488 #[error("a dependent event had an invalid parent key type")]
2491 InvalidParentKey,
2492
2493 #[error("The client is shutting down.")]
2495 ClientShuttingDown,
2496
2497 #[error("This operation is not implemented for media uploads")]
2499 OperationNotImplementedYet,
2500
2501 #[error("Can't edit a media caption when the underlying event isn't a media")]
2503 InvalidMediaCaptionEdit,
2504}
2505
2506#[derive(Clone, Debug)]
2508struct MediaHandles {
2509 upload_thumbnail_txn: Option<OwnedTransactionId>,
2513
2514 upload_file_txn: OwnedTransactionId,
2516}
2517
2518#[derive(Clone, Debug)]
2520pub struct SendHandle {
2521 room: RoomSendQueue,
2523
2524 transaction_id: OwnedTransactionId,
2529
2530 media_handles: Vec<MediaHandles>,
2532
2533 pub created_at: MilliSecondsSinceUnixEpoch,
2535}
2536
2537impl SendHandle {
2538 #[cfg(test)]
2540 pub(crate) fn new(
2541 room: RoomSendQueue,
2542 transaction_id: OwnedTransactionId,
2543 created_at: MilliSecondsSinceUnixEpoch,
2544 ) -> Self {
2545 Self { room, transaction_id, media_handles: vec![], created_at }
2546 }
2547
2548 fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2549 if !self.media_handles.is_empty() {
2550 Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2551 } else {
2552 Ok(())
2553 }
2554 }
2555
2556 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2561 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2562 trace!("received an abort request");
2563
2564 let queue = &self.room.inner.queue;
2565
2566 for handles in &self.media_handles {
2567 if queue.abort_upload(&self.transaction_id, handles).await? {
2568 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2570 transaction_id: self.transaction_id.clone(),
2571 });
2572
2573 return Ok(true);
2574 }
2575
2576 }
2580
2581 if queue.cancel_event(&self.transaction_id).await? {
2582 trace!("successful abort");
2583
2584 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2586 transaction_id: self.transaction_id.clone(),
2587 });
2588
2589 Ok(true)
2590 } else {
2591 debug!("local echo didn't exist anymore, can't abort");
2592 Ok(false)
2593 }
2594 }
2595
2596 #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2601 pub async fn edit_raw(
2602 &self,
2603 new_content: Raw<AnyMessageLikeEventContent>,
2604 event_type: String,
2605 ) -> Result<bool, RoomSendQueueStorageError> {
2606 trace!("received an edit request");
2607 self.nyi_for_uploads()?;
2608
2609 let serializable = SerializableEventContent::from_raw(new_content, event_type);
2610
2611 if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2612 trace!("successful edit");
2613
2614 self.room.inner.notifier.notify_one();
2616
2617 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2619 transaction_id: self.transaction_id.clone(),
2620 new_content: serializable,
2621 });
2622
2623 Ok(true)
2624 } else {
2625 debug!("local echo doesn't exist anymore, can't edit");
2626 Ok(false)
2627 }
2628 }
2629
2630 pub async fn edit(
2635 &self,
2636 new_content: AnyMessageLikeEventContent,
2637 ) -> Result<bool, RoomSendQueueStorageError> {
2638 self.edit_raw(
2639 Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2640 new_content.event_type().to_string(),
2641 )
2642 .await
2643 }
2644
2645 pub async fn edit_media_caption(
2650 &self,
2651 caption: Option<String>,
2652 formatted_caption: Option<FormattedBody>,
2653 mentions: Option<Mentions>,
2654 ) -> Result<bool, RoomSendQueueStorageError> {
2655 if let Some(new_content) = self
2656 .room
2657 .inner
2658 .queue
2659 .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2660 .await?
2661 {
2662 trace!("successful edit of media caption");
2663
2664 self.room.inner.notifier.notify_one();
2666
2667 let new_content = SerializableEventContent::new(&new_content)
2668 .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2669
2670 self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2672 transaction_id: self.transaction_id.clone(),
2673 new_content,
2674 });
2675
2676 Ok(true)
2677 } else {
2678 debug!("local echo doesn't exist anymore, can't edit media caption");
2679 Ok(false)
2680 }
2681 }
2682
2683 pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2686 let room = &self.room.inner;
2687 room.queue
2688 .mark_as_unwedged(&self.transaction_id)
2689 .await
2690 .map_err(RoomSendQueueError::StorageError)?;
2691
2692 for handles in &self.media_handles {
2700 room.queue
2701 .mark_as_unwedged(&handles.upload_file_txn)
2702 .await
2703 .map_err(RoomSendQueueError::StorageError)?;
2704
2705 if let Some(txn) = &handles.upload_thumbnail_txn {
2706 room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2707 }
2708 }
2709
2710 room.notifier.notify_one();
2712
2713 self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2714 transaction_id: self.transaction_id.clone(),
2715 });
2716
2717 Ok(())
2718 }
2719
2720 #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2725 pub async fn react(
2726 &self,
2727 key: String,
2728 ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2729 trace!("received an intent to react");
2730
2731 let created_at = MilliSecondsSinceUnixEpoch::now();
2732 if let Some(reaction_txn_id) =
2733 self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2734 {
2735 trace!("successfully queued react");
2736
2737 self.room.inner.notifier.notify_one();
2739
2740 let send_handle = SendReactionHandle {
2742 room: self.room.clone(),
2743 transaction_id: reaction_txn_id.clone(),
2744 };
2745
2746 self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2747 transaction_id: reaction_txn_id.into(),
2750 content: LocalEchoContent::React {
2751 key,
2752 send_handle: send_handle.clone(),
2753 applies_to: self.transaction_id.clone(),
2754 },
2755 }));
2756
2757 Ok(Some(send_handle))
2758 } else {
2759 debug!("local echo doesn't exist anymore, can't react");
2760 Ok(None)
2761 }
2762 }
2763}
2764
2765#[derive(Clone, Debug)]
2767pub struct SendReactionHandle {
2768 room: RoomSendQueue,
2770 transaction_id: ChildTransactionId,
2772}
2773
2774impl SendReactionHandle {
2775 pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2780 if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2781 self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2785 transaction_id: self.transaction_id.clone().into(),
2786 });
2787
2788 return Ok(true);
2789 }
2790
2791 let handle = SendHandle {
2794 room: self.room.clone(),
2795 transaction_id: self.transaction_id.clone().into(),
2796 media_handles: vec![],
2797 created_at: MilliSecondsSinceUnixEpoch::now(),
2798 };
2799
2800 handle.abort().await
2801 }
2802
2803 pub fn transaction_id(&self) -> &TransactionId {
2805 &self.transaction_id
2806 }
2807}
2808
2809fn canonicalize_dependent_requests(
2813 dependent: &[DependentQueuedRequest],
2814) -> Vec<DependentQueuedRequest> {
2815 let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2816
2817 for d in dependent {
2818 let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2819
2820 if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2821 continue;
2824 }
2825
2826 match &d.kind {
2827 DependentQueuedRequestKind::EditEvent { .. } => {
2828 if let Some(prev_edit) = prevs
2830 .iter_mut()
2831 .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2832 {
2833 *prev_edit = d;
2834 } else {
2835 prevs.insert(0, d);
2836 }
2837 }
2838
2839 DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2840 | DependentQueuedRequestKind::FinishUpload { .. }
2841 | DependentQueuedRequestKind::ReactEvent { .. } => {
2842 prevs.push(d);
2844 }
2845
2846 #[cfg(feature = "unstable-msc4274")]
2847 DependentQueuedRequestKind::FinishGallery { .. } => {
2848 prevs.push(d);
2850 }
2851
2852 DependentQueuedRequestKind::RedactEvent => {
2853 prevs.clear();
2855 prevs.push(d);
2856 }
2857 }
2858 }
2859
2860 by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2861}
2862
2863#[cfg(all(test, not(target_family = "wasm")))]
2864mod tests {
2865 use std::{sync::Arc, time::Duration};
2866
2867 use assert_matches2::{assert_let, assert_matches};
2868 use matrix_sdk_base::store::{
2869 ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2870 SerializableEventContent,
2871 };
2872 use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
2873 use ruma::{
2874 MilliSecondsSinceUnixEpoch, TransactionId,
2875 events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
2876 room_id,
2877 };
2878
2879 use super::canonicalize_dependent_requests;
2880 use crate::{client::WeakClient, test_utils::logged_in_client};
2881
2882 #[test]
2883 fn test_canonicalize_dependent_events_created_at() {
2884 let txn = TransactionId::new();
2887 let created_at = MilliSecondsSinceUnixEpoch::now();
2888
2889 let edit = DependentQueuedRequest {
2890 own_transaction_id: ChildTransactionId::new(),
2891 parent_transaction_id: txn.clone(),
2892 kind: DependentQueuedRequestKind::EditEvent {
2893 new_content: SerializableEventContent::new(
2894 &RoomMessageEventContent::text_plain("edit").into(),
2895 )
2896 .unwrap(),
2897 },
2898 parent_key: None,
2899 created_at,
2900 };
2901
2902 let res = canonicalize_dependent_requests(&[edit]);
2903
2904 assert_eq!(res.len(), 1);
2905 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2906 assert_let!(
2907 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2908 );
2909 assert_eq!(msg.body(), "edit");
2910 assert_eq!(res[0].parent_transaction_id, txn);
2911 assert_eq!(res[0].created_at, created_at);
2912 }
2913
2914 #[async_test]
2915 async fn test_client_no_cycle_with_send_queue() {
2916 for enabled in [true, false] {
2917 let client = logged_in_client(None).await;
2918 let weak_client = WeakClient::from_client(&client);
2919
2920 {
2921 let mut sync_response_builder = SyncResponseBuilder::new();
2922
2923 let room_id = room_id!("!a:b.c");
2924
2925 client
2927 .base_client()
2928 .receive_sync_response(
2929 sync_response_builder
2930 .add_joined_room(JoinedRoomBuilder::new(room_id))
2931 .build_sync_response(),
2932 )
2933 .await
2934 .unwrap();
2935
2936 let room = client.get_room(room_id).unwrap();
2937 let q = room.send_queue();
2938
2939 let _watcher = q.subscribe().await;
2940
2941 client.send_queue().set_enabled(enabled).await;
2942 }
2943
2944 drop(client);
2945
2946 tokio::time::sleep(Duration::from_millis(500)).await;
2948
2949 let client = weak_client.get();
2951 assert!(
2952 client.is_none(),
2953 "too many strong references to the client: {}",
2954 Arc::strong_count(&client.unwrap().inner)
2955 );
2956 }
2957 }
2958
2959 #[test]
2960 fn test_canonicalize_dependent_events_smoke_test() {
2961 let txn = TransactionId::new();
2963
2964 let edit = DependentQueuedRequest {
2965 own_transaction_id: ChildTransactionId::new(),
2966 parent_transaction_id: txn.clone(),
2967 kind: DependentQueuedRequestKind::EditEvent {
2968 new_content: SerializableEventContent::new(
2969 &RoomMessageEventContent::text_plain("edit").into(),
2970 )
2971 .unwrap(),
2972 },
2973 parent_key: None,
2974 created_at: MilliSecondsSinceUnixEpoch::now(),
2975 };
2976 let res = canonicalize_dependent_requests(&[edit]);
2977
2978 assert_eq!(res.len(), 1);
2979 assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2980 assert_eq!(res[0].parent_transaction_id, txn);
2981 assert!(res[0].parent_key.is_none());
2982 }
2983
2984 #[test]
2985 fn test_canonicalize_dependent_events_redaction_preferred() {
2986 let txn = TransactionId::new();
2988
2989 let mut inputs = Vec::with_capacity(100);
2990 let redact = DependentQueuedRequest {
2991 own_transaction_id: ChildTransactionId::new(),
2992 parent_transaction_id: txn.clone(),
2993 kind: DependentQueuedRequestKind::RedactEvent,
2994 parent_key: None,
2995 created_at: MilliSecondsSinceUnixEpoch::now(),
2996 };
2997
2998 let edit = DependentQueuedRequest {
2999 own_transaction_id: ChildTransactionId::new(),
3000 parent_transaction_id: txn.clone(),
3001 kind: DependentQueuedRequestKind::EditEvent {
3002 new_content: SerializableEventContent::new(
3003 &RoomMessageEventContent::text_plain("edit").into(),
3004 )
3005 .unwrap(),
3006 },
3007 parent_key: None,
3008 created_at: MilliSecondsSinceUnixEpoch::now(),
3009 };
3010
3011 inputs.push({
3012 let mut edit = edit.clone();
3013 edit.own_transaction_id = ChildTransactionId::new();
3014 edit
3015 });
3016
3017 inputs.push(redact);
3018
3019 for _ in 0..98 {
3020 let mut edit = edit.clone();
3021 edit.own_transaction_id = ChildTransactionId::new();
3022 inputs.push(edit);
3023 }
3024
3025 let res = canonicalize_dependent_requests(&inputs);
3026
3027 assert_eq!(res.len(), 1);
3028 assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
3029 assert_eq!(res[0].parent_transaction_id, txn);
3030 }
3031
3032 #[test]
3033 fn test_canonicalize_dependent_events_last_edit_preferred() {
3034 let parent_txn = TransactionId::new();
3035
3036 let inputs = (0..10)
3038 .map(|i| DependentQueuedRequest {
3039 own_transaction_id: ChildTransactionId::new(),
3040 parent_transaction_id: parent_txn.clone(),
3041 kind: DependentQueuedRequestKind::EditEvent {
3042 new_content: SerializableEventContent::new(
3043 &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
3044 )
3045 .unwrap(),
3046 },
3047 parent_key: None,
3048 created_at: MilliSecondsSinceUnixEpoch::now(),
3049 })
3050 .collect::<Vec<_>>();
3051
3052 let txn = inputs[9].parent_transaction_id.clone();
3053
3054 let res = canonicalize_dependent_requests(&inputs);
3055
3056 assert_eq!(res.len(), 1);
3057 assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
3058 assert_let!(
3059 AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
3060 );
3061 assert_eq!(msg.body(), "edit9");
3062 assert_eq!(res[0].parent_transaction_id, txn);
3063 }
3064
3065 #[test]
3066 fn test_canonicalize_multiple_local_echoes() {
3067 let txn1 = TransactionId::new();
3068 let txn2 = TransactionId::new();
3069
3070 let child1 = ChildTransactionId::new();
3071 let child2 = ChildTransactionId::new();
3072
3073 let inputs = vec![
3074 DependentQueuedRequest {
3076 own_transaction_id: child1.clone(),
3077 kind: DependentQueuedRequestKind::RedactEvent,
3078 parent_transaction_id: txn1.clone(),
3079 parent_key: None,
3080 created_at: MilliSecondsSinceUnixEpoch::now(),
3081 },
3082 DependentQueuedRequest {
3084 own_transaction_id: child2,
3085 kind: DependentQueuedRequestKind::EditEvent {
3086 new_content: SerializableEventContent::new(
3087 &RoomMessageEventContent::text_plain("edit").into(),
3088 )
3089 .unwrap(),
3090 },
3091 parent_transaction_id: txn2.clone(),
3092 parent_key: None,
3093 created_at: MilliSecondsSinceUnixEpoch::now(),
3094 },
3095 ];
3096
3097 let res = canonicalize_dependent_requests(&inputs);
3098
3099 assert_eq!(res.len(), 2);
3101
3102 for dependent in res {
3103 if dependent.own_transaction_id == child1 {
3104 assert_eq!(dependent.parent_transaction_id, txn1);
3105 assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
3106 } else {
3107 assert_eq!(dependent.parent_transaction_id, txn2);
3108 assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
3109 }
3110 }
3111 }
3112
3113 #[test]
3114 fn test_canonicalize_reactions_after_edits() {
3115 let txn = TransactionId::new();
3117
3118 let react_id = ChildTransactionId::new();
3119 let react = DependentQueuedRequest {
3120 own_transaction_id: react_id.clone(),
3121 kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
3122 parent_transaction_id: txn.clone(),
3123 parent_key: None,
3124 created_at: MilliSecondsSinceUnixEpoch::now(),
3125 };
3126
3127 let edit_id = ChildTransactionId::new();
3128 let edit = DependentQueuedRequest {
3129 own_transaction_id: edit_id.clone(),
3130 kind: DependentQueuedRequestKind::EditEvent {
3131 new_content: SerializableEventContent::new(
3132 &RoomMessageEventContent::text_plain("edit").into(),
3133 )
3134 .unwrap(),
3135 },
3136 parent_transaction_id: txn,
3137 parent_key: None,
3138 created_at: MilliSecondsSinceUnixEpoch::now(),
3139 };
3140
3141 let res = canonicalize_dependent_requests(&[react, edit]);
3142
3143 assert_eq!(res.len(), 2);
3144 assert_eq!(res[0].own_transaction_id, edit_id);
3145 assert_eq!(res[1].own_transaction_id, react_id);
3146 }
3147}