matrix_sdk/send_queue/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A send queue facility to serializing queuing and sending of messages.
16//!
17//! # [`Room`] send queue
18//!
19//! Each room gets its own [`RoomSendQueue`], that's available by calling
20//! [`Room::send_queue()`]. The first time this method is called, it will spawn
21//! a background task that's used to actually send events, in the order they
22//! were passed from calls to [`RoomSendQueue::send()`].
23//!
24//! This queue tries to simplify error management around sending events, using
25//! [`RoomSendQueue::send`] or [`RoomSendQueue::send_raw`]: by default, it will retry to send the
26//! same event a few times, before automatically disabling itself, and emitting
27//! a notification that can be listened to with the global send queue (see
28//! paragraph below) or using [`RoomSendQueue::subscribe()`].
29//!
30//! It is possible to control whether a single room is enabled using
31//! [`RoomSendQueue::set_enabled()`].
32//!
33//! # Global [`SendQueue`] object
34//!
35//! The [`Client::send_queue()`] method returns an API object allowing to
36//! control all the room send queues:
37//!
38//! - enable/disable them all at once with [`SendQueue::set_enabled()`].
39//! - get notifications about send errors with [`SendQueue::subscribe_errors`].
40//! - reload all unsent events that had been persisted in storage using
41//!   [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is
42//!   recommended to call this method during initialization of a client,
43//!   otherwise persisted unsent events will only be re-sent after the send
44//!   queue for the given room has been reopened for the first time.
45//!
46//! # Send handle
47//!
48//! Just after queuing a request to send something, a [`SendHandle`] is
49//! returned, allowing manipulating the inflight request.
50//!
51//! For a send handle for an event, it's possible to edit the event / abort
52//! sending it. If it was still in the queue (i.e. not sent yet, or not being
53//! sent), then such an action would happen locally (i.e. in the database).
54//! Otherwise, it is "too late": the background task may be sending
55//! the event already, or has sent it; in that case, the edit/aborting must
56//! happen as an actual event materializing this, on the server. To accomplish
57//! this, the send queue may send such an event, using the dependency system
58//! described below.
59//!
60//! # Dependency system
61//!
62//! The send queue includes a simple dependency system, where a
63//! [`QueuedRequest`] can have zero or more dependents in the form of
64//! [`DependentQueuedRequest`]. A dependent queued request can have at most one
65//! depended-upon (parent) queued request.
66//!
67//! This allows implementing deferred edits/redacts, as hinted to in the
68//! previous section.
69//!
70//! ## Media upload
71//!
72//! This dependency system also allows uploading medias, since the media's
73//! *content* must be uploaded before we send the media *event* that describes
74//! it.
75//!
76//! In the simplest case, that is, a media file and its event must be sent (i.e.
77//! no thumbnails):
78//!
79//! - The file's content is immediately cached in the
80//!   [`matrix_sdk_base::event_cache::store::EventCacheStore`], using an MXC ID
81//!   that is temporary and designates a local URI without any possible doubt.
82//! - An initial media event is created and uses this temporary MXC ID, and
83//!   propagated as a local echo for an event.
84//! - A [`QueuedRequest`] is pushed to upload the file's media
85//!   ([`QueuedRequestKind::MediaUpload`]).
86//! - A [`DependentQueuedRequest`] is pushed to finish the upload
87//!   ([`DependentQueuedRequestKind::FinishUpload`]).
88//!
89//! What is expected to happen, if all goes well, is the following:
90//!
91//! - the media is uploaded to the media homeserver, which returns the final MXC
92//!   ID.
93//! - when marking the upload request as sent, the MXC ID is injected (as a
94//!   [`matrix_sdk_base::store::SentRequestKey`]) into the dependent request
95//!   [`DependentQueuedRequestKind::FinishUpload`] created in the last step
96//!   above.
97//! - next time the send queue handles dependent queries, it'll see this one is
98//!   ready to be sent, and it will transform it into an event queued request
99//!   ([`QueuedRequestKind::Event`]), with the event created in the local echo
100//!   before, updated with the MXC ID returned from the server.
101//! - this updated local echo is also propagated as an edit of the local echo to
102//!   observers, who get the final version with the final MXC IDs at this point
103//!   too.
104//! - then the event is sent normally, as any event sent with the send queue.
105//!
106//! When there is a thumbnail, things behave similarly, with some tweaks:
107//!
108//! - the thumbnail's content is also stored into the cache store immediately,
109//! - the thumbnail is sent first as an [`QueuedRequestKind::MediaUpload`]
110//!   request,
111//! - the file upload is pushed as a dependent request of kind
112//!   [`DependentQueuedRequestKind::UploadFileOrThumbnail`] (this variant keeps
113//!   the file's key used to look it up in the cache store).
114//! - the media event is then sent as a dependent request as described in the
115//!   previous section.
116//!
117//! What's expected to happen is thus the following:
118//!
119//! - After the thumbnail has been uploaded, the dependent query will retrieve
120//!   the final MXC ID returned by the homeserver for the thumbnail, and store
121//!   it into the [`QueuedRequestKind::MediaUpload`]'s `thumbnail_source` field,
122//!   allowing to remember the thumbnail MXC ID when it's time to finish the
123//!   upload later.
124//! - The dependent request is morphed into another
125//!   [`QueuedRequestKind::MediaUpload`], for the file itself.
126//!
127//! The rest of the process is then similar to that of uploading a file without
128//! a thumbnail. The only difference is that there's a thumbnail source (MXC ID)
129//! remembered and fixed up into the media event, just before sending it.
130
131use std::{
132    collections::{BTreeMap, HashMap},
133    str::FromStr as _,
134    sync::{
135        atomic::{AtomicBool, Ordering},
136        Arc, RwLock,
137    },
138};
139
140use as_variant::as_variant;
141#[cfg(feature = "unstable-msc4274")]
142use matrix_sdk_base::store::FinishGalleryItemInfo;
143use matrix_sdk_base::{
144    event_cache::store::EventCacheStoreError,
145    media::MediaRequestParameters,
146    store::{
147        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
148        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
149        SentMediaInfo, SentRequestKey, SerializableEventContent,
150    },
151    store_locks::LockStoreError,
152    RoomState, StoreError,
153};
154use matrix_sdk_common::executor::{spawn, JoinHandle};
155use mime::Mime;
156use ruma::{
157    events::{
158        reaction::ReactionEventContent,
159        relation::Annotation,
160        room::{
161            message::{FormattedBody, RoomMessageEventContent},
162            MediaSource,
163        },
164        AnyMessageLikeEventContent, EventContent as _, Mentions,
165    },
166    serde::Raw,
167    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
168};
169use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
170use tracing::{debug, error, info, instrument, trace, warn};
171
172#[cfg(feature = "e2e-encryption")]
173use crate::crypto::{OlmError, SessionRecipientCollectionError};
174use crate::{
175    client::WeakClient,
176    config::RequestConfig,
177    error::RetryKind,
178    room::{edit::EditedContent, WeakRoom},
179    Client, Media, Room,
180};
181
182mod upload;
183
184/// A client-wide send queue, for all the rooms known by a client.
185pub struct SendQueue {
186    client: Client,
187}
188
189#[cfg(not(tarpaulin_include))]
190impl std::fmt::Debug for SendQueue {
191    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192        f.debug_struct("SendQueue").finish_non_exhaustive()
193    }
194}
195
196impl SendQueue {
197    pub(super) fn new(client: Client) -> Self {
198        Self { client }
199    }
200
201    /// Reload all the rooms which had unsent requests, and respawn tasks for
202    /// those rooms.
203    pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
204        if !self.is_enabled() {
205            return;
206        }
207
208        let room_ids =
209            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
210                |err| {
211                    warn!("error when loading rooms with unsent requests: {err}");
212                    Vec::new()
213                },
214            );
215
216        // Getting the [`RoomSendQueue`] is sufficient to spawn the task if needs be.
217        for room_id in room_ids {
218            if let Some(room) = self.client.get_room(&room_id) {
219                let _ = self.for_room(room);
220            }
221        }
222    }
223
224    /// Tiny helper to get the send queue's global context from the [`Client`].
225    #[inline(always)]
226    fn data(&self) -> &SendQueueData {
227        &self.client.inner.send_queue_data
228    }
229
230    /// Get or create a new send queue for a given room, and insert it into our
231    /// memoized rooms mapping.
232    fn for_room(&self, room: Room) -> RoomSendQueue {
233        let data = self.data();
234
235        let mut map = data.rooms.write().unwrap();
236
237        let room_id = room.room_id();
238        if let Some(room_q) = map.get(room_id).cloned() {
239            return room_q;
240        }
241
242        let owned_room_id = room_id.to_owned();
243        let room_q = RoomSendQueue::new(
244            self.is_enabled(),
245            data.error_reporter.clone(),
246            data.is_dropping.clone(),
247            &self.client,
248            owned_room_id.clone(),
249        );
250
251        map.insert(owned_room_id, room_q.clone());
252
253        room_q
254    }
255
256    /// Enable or disable the send queue for the entire client, i.e. all rooms.
257    ///
258    /// If we're disabling the queue, and requests were being sent, they're not
259    /// aborted, and will continue until a status resolves (error responses
260    /// will keep the events in the buffer of events to send later). The
261    /// disablement will happen before the next request is sent.
262    ///
263    /// This may wake up background tasks and resume sending of requests in the
264    /// background.
265    pub async fn set_enabled(&self, enabled: bool) {
266        debug!(?enabled, "setting global send queue enablement");
267
268        self.data().globally_enabled.store(enabled, Ordering::SeqCst);
269
270        // Wake up individual rooms we already know about.
271        for room in self.data().rooms.read().unwrap().values() {
272            room.set_enabled(enabled);
273        }
274
275        // Reload some extra rooms that might not have been awaken yet, but could have
276        // requests from previous sessions.
277        self.respawn_tasks_for_rooms_with_unsent_requests().await;
278    }
279
280    /// Returns whether the send queue is enabled, at a client-wide
281    /// granularity.
282    pub fn is_enabled(&self) -> bool {
283        self.data().globally_enabled.load(Ordering::SeqCst)
284    }
285
286    /// A subscriber to the enablement status (enabled or disabled) of the
287    /// send queue, along with useful errors.
288    pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
289        self.data().error_reporter.subscribe()
290    }
291}
292
293/// A specific room's send queue ran into an error, and it has disabled itself.
294#[derive(Clone, Debug)]
295pub struct SendQueueRoomError {
296    /// For which room is the send queue failing?
297    pub room_id: OwnedRoomId,
298
299    /// The error the room has ran into, when trying to send a request.
300    pub error: Arc<crate::Error>,
301
302    /// Whether the error is considered recoverable or not.
303    ///
304    /// An error that's recoverable will disable the room's send queue, while an
305    /// unrecoverable error will be parked, until the user decides to do
306    /// something about it.
307    pub is_recoverable: bool,
308}
309
310impl Client {
311    /// Returns a [`SendQueue`] that handles sending, retrying and not
312    /// forgetting about requests that are to be sent.
313    pub fn send_queue(&self) -> SendQueue {
314        SendQueue::new(self.clone())
315    }
316}
317
318pub(super) struct SendQueueData {
319    /// Mapping of room to their unique send queue.
320    rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
321
322    /// Is the whole mechanism enabled or disabled?
323    ///
324    /// This is only kept in memory to initialize new room queues with an
325    /// initial enablement state.
326    globally_enabled: AtomicBool,
327
328    /// Global error updates for the send queue.
329    error_reporter: broadcast::Sender<SendQueueRoomError>,
330
331    /// Are we currently dropping the Client?
332    is_dropping: Arc<AtomicBool>,
333}
334
335impl SendQueueData {
336    /// Create the data for a send queue, in the given enabled state.
337    pub fn new(globally_enabled: bool) -> Self {
338        let (sender, _) = broadcast::channel(32);
339
340        Self {
341            rooms: Default::default(),
342            globally_enabled: AtomicBool::new(globally_enabled),
343            error_reporter: sender,
344            is_dropping: Arc::new(false.into()),
345        }
346    }
347}
348
349impl Drop for SendQueueData {
350    fn drop(&mut self) {
351        // Mark the whole send queue as shutting down, then wake up all the room
352        // queues so they're stopped too.
353        debug!("globally dropping the send queue");
354        self.is_dropping.store(true, Ordering::SeqCst);
355
356        let rooms = self.rooms.read().unwrap();
357        for room in rooms.values() {
358            room.inner.notifier.notify_one();
359        }
360    }
361}
362
363impl Room {
364    /// Returns the [`RoomSendQueue`] for this specific room.
365    pub fn send_queue(&self) -> RoomSendQueue {
366        self.client.send_queue().for_room(self.clone())
367    }
368}
369
370/// A per-room send queue.
371///
372/// This is cheap to clone.
373#[derive(Clone)]
374pub struct RoomSendQueue {
375    inner: Arc<RoomSendQueueInner>,
376}
377
378#[cfg(not(tarpaulin_include))]
379impl std::fmt::Debug for RoomSendQueue {
380    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
381        f.debug_struct("RoomSendQueue").finish_non_exhaustive()
382    }
383}
384
385impl RoomSendQueue {
386    fn new(
387        globally_enabled: bool,
388        global_error_reporter: broadcast::Sender<SendQueueRoomError>,
389        is_dropping: Arc<AtomicBool>,
390        client: &Client,
391        room_id: OwnedRoomId,
392    ) -> Self {
393        let (updates_sender, _) = broadcast::channel(32);
394
395        let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
396        let notifier = Arc::new(Notify::new());
397
398        let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
399        let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
400
401        let task = spawn(Self::sending_task(
402            weak_room.clone(),
403            queue.clone(),
404            notifier.clone(),
405            updates_sender.clone(),
406            locally_enabled.clone(),
407            global_error_reporter,
408            is_dropping,
409        ));
410
411        Self {
412            inner: Arc::new(RoomSendQueueInner {
413                room: weak_room,
414                updates: updates_sender,
415                _task: task,
416                queue,
417                notifier,
418                locally_enabled,
419            }),
420        }
421    }
422
423    /// Queues a raw event for sending it to this room.
424    ///
425    /// This immediately returns, and will push the event to be sent into a
426    /// queue, handled in the background.
427    ///
428    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
429    /// the [`Self::subscribe()`] method to get updates about the sending of
430    /// that event.
431    ///
432    /// By default, if sending failed on the first attempt, it will be retried a
433    /// few times. If sending failed after those retries, the entire
434    /// client's sending queue will be disabled, and it will need to be
435    /// manually re-enabled by the caller (e.g. after network is back, or when
436    /// something has been done about the faulty requests).
437    pub async fn send_raw(
438        &self,
439        content: Raw<AnyMessageLikeEventContent>,
440        event_type: String,
441    ) -> Result<SendHandle, RoomSendQueueError> {
442        let Some(room) = self.inner.room.get() else {
443            return Err(RoomSendQueueError::RoomDisappeared);
444        };
445        if room.state() != RoomState::Joined {
446            return Err(RoomSendQueueError::RoomNotJoined);
447        }
448
449        let content = SerializableEventContent::from_raw(content, event_type);
450
451        let created_at = MilliSecondsSinceUnixEpoch::now();
452        let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
453        trace!(%transaction_id, "manager sends a raw event to the background task");
454
455        self.inner.notifier.notify_one();
456
457        let send_handle = SendHandle {
458            room: self.clone(),
459            transaction_id: transaction_id.clone(),
460            media_handles: vec![],
461            created_at,
462        };
463
464        let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
465            transaction_id,
466            content: LocalEchoContent::Event {
467                serialized_event: content,
468                send_handle: send_handle.clone(),
469                send_error: None,
470            },
471        }));
472
473        Ok(send_handle)
474    }
475
476    /// Queues an event for sending it to this room.
477    ///
478    /// This immediately returns, and will push the event to be sent into a
479    /// queue, handled in the background.
480    ///
481    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
482    /// the [`Self::subscribe()`] method to get updates about the sending of
483    /// that event.
484    ///
485    /// By default, if sending failed on the first attempt, it will be retried a
486    /// few times. If sending failed after those retries, the entire
487    /// client's sending queue will be disabled, and it will need to be
488    /// manually re-enabled by the caller (e.g. after network is back, or when
489    /// something has been done about the faulty requests).
490    pub async fn send(
491        &self,
492        content: AnyMessageLikeEventContent,
493    ) -> Result<SendHandle, RoomSendQueueError> {
494        self.send_raw(
495            Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
496            content.event_type().to_string(),
497        )
498        .await
499    }
500
501    /// Returns the current local requests as well as a receiver to listen to
502    /// the send queue updates, as defined in [`RoomSendQueueUpdate`].
503    pub async fn subscribe(
504        &self,
505    ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
506    {
507        let local_echoes = self.inner.queue.local_echoes(self).await?;
508
509        Ok((local_echoes, self.inner.updates.subscribe()))
510    }
511
512    /// A task that must be spawned in the async runtime, running in the
513    /// background for each room that has a send queue.
514    ///
515    /// It only progresses forward: nothing can be cancelled at any point, which
516    /// makes the implementation not overly complicated to follow.
517    #[instrument(skip_all, fields(room_id = %room.room_id()))]
518    async fn sending_task(
519        room: WeakRoom,
520        queue: QueueStorage,
521        notifier: Arc<Notify>,
522        updates: broadcast::Sender<RoomSendQueueUpdate>,
523        locally_enabled: Arc<AtomicBool>,
524        global_error_reporter: broadcast::Sender<SendQueueRoomError>,
525        is_dropping: Arc<AtomicBool>,
526    ) {
527        trace!("spawned the sending task");
528
529        loop {
530            // A request to shut down should be preferred above everything else.
531            if is_dropping.load(Ordering::SeqCst) {
532                trace!("shutting down!");
533                break;
534            }
535
536            // Try to apply dependent requests now; those applying to previously failed
537            // attempts (local echoes) would succeed now.
538            let mut new_updates = Vec::new();
539            if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
540                warn!("errors when applying dependent requests: {err}");
541            }
542
543            for up in new_updates {
544                let _ = updates.send(up);
545            }
546
547            if !locally_enabled.load(Ordering::SeqCst) {
548                trace!("not enabled, sleeping");
549                // Wait for an explicit wakeup.
550                notifier.notified().await;
551                continue;
552            }
553
554            let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
555                Ok(Some(request)) => request,
556
557                Ok(None) => {
558                    trace!("queue is empty, sleeping");
559                    // Wait for an explicit wakeup.
560                    notifier.notified().await;
561                    continue;
562                }
563
564                Err(err) => {
565                    warn!("error when loading next request to send: {err}");
566                    continue;
567                }
568            };
569
570            let txn_id = queued_request.transaction_id.clone();
571            trace!(txn_id = %txn_id, "received a request to send!");
572
573            let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::MediaUpload { related_to, .. } => related_to.clone());
574
575            let Some(room) = room.get() else {
576                if is_dropping.load(Ordering::SeqCst) {
577                    break;
578                }
579                error!("the weak room couldn't be upgraded but we're not shutting down?");
580                continue;
581            };
582
583            match Self::handle_request(&room, queued_request, cancel_upload_rx).await {
584                Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
585                {
586                    Ok(()) => match parent_key {
587                        SentRequestKey::Event(event_id) => {
588                            let _ = updates.send(RoomSendQueueUpdate::SentEvent {
589                                transaction_id: txn_id,
590                                event_id,
591                            });
592                        }
593
594                        SentRequestKey::Media(media_info) => {
595                            let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
596                                related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
597                                file: media_info.file,
598                            });
599                        }
600                    },
601
602                    Err(err) => {
603                        warn!("unable to mark queued request as sent: {err}");
604                    }
605                },
606
607                Ok(None) => {
608                    debug!("Request has been aborted while running, continuing.");
609                }
610
611                Err(err) => {
612                    let is_recoverable = match err {
613                        crate::Error::Http(ref http_err) => {
614                            // All transient errors are recoverable.
615                            matches!(
616                                http_err.retry_kind(),
617                                RetryKind::Transient { .. } | RetryKind::NetworkFailure
618                            )
619                        }
620
621                        // `ConcurrentRequestFailed` typically happens because of an HTTP failure;
622                        // since we don't get the underlying error, be lax and consider it
623                        // recoverable, and let observers decide to retry it or not. At some point
624                        // we'll get the actual underlying error.
625                        crate::Error::ConcurrentRequestFailed => true,
626
627                        // As of 2024-06-27, all other error types are considered unrecoverable.
628                        _ => false,
629                    };
630
631                    // Disable the queue for this room after any kind of error happened.
632                    locally_enabled.store(false, Ordering::SeqCst);
633
634                    if is_recoverable {
635                        warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
636
637                        // In this case, we intentionally keep the request in the queue, but mark it
638                        // as not being sent anymore.
639                        queue.mark_as_not_being_sent(&txn_id).await;
640
641                        // Let observers know about a failure *after* we've
642                        // marked the item as not being sent anymore. Otherwise,
643                        // there's a possible race where a caller might try to
644                        // remove an item, while it's still marked as being
645                        // sent, resulting in a cancellation failure.
646                    } else {
647                        warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
648
649                        // Mark the request as wedged, so it's not picked at any future point.
650                        if let Err(storage_error) =
651                            queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
652                        {
653                            warn!("unable to mark request as wedged: {storage_error}");
654                        }
655                    }
656
657                    let error = Arc::new(err);
658
659                    let _ = global_error_reporter.send(SendQueueRoomError {
660                        room_id: room.room_id().to_owned(),
661                        error: error.clone(),
662                        is_recoverable,
663                    });
664
665                    let _ = updates.send(RoomSendQueueUpdate::SendError {
666                        transaction_id: related_txn_id.unwrap_or(txn_id),
667                        error,
668                        is_recoverable,
669                    });
670                }
671            }
672        }
673
674        info!("exited sending task");
675    }
676
677    /// Handles a single request and returns the [`SentRequestKey`] on success
678    /// (unless the request was cancelled, in which case it'll return
679    /// `None`).
680    async fn handle_request(
681        room: &Room,
682        request: QueuedRequest,
683        cancel_upload_rx: Option<oneshot::Receiver<()>>,
684    ) -> Result<Option<SentRequestKey>, crate::Error> {
685        match request.kind {
686            QueuedRequestKind::Event { content } => {
687                let (event, event_type) = content.raw();
688
689                let res = room
690                    .send_raw(event_type, event)
691                    .with_transaction_id(&request.transaction_id)
692                    .with_request_config(RequestConfig::short_retry())
693                    .await?;
694
695                trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
696                Ok(Some(SentRequestKey::Event(res.event_id)))
697            }
698
699            QueuedRequestKind::MediaUpload {
700                content_type,
701                cache_key,
702                thumbnail_source,
703                related_to: relates_to,
704                #[cfg(feature = "unstable-msc4274")]
705                accumulated,
706            } => {
707                trace!(%relates_to, "uploading media related to event");
708
709                let fut = async move {
710                    let data = room
711                        .client()
712                        .event_cache_store()
713                        .lock()
714                        .await?
715                        .get_media_content(&cache_key)
716                        .await?
717                        .ok_or(crate::Error::SendQueueWedgeError(Box::new(
718                            QueueWedgeError::MissingMediaContent,
719                        )))?;
720
721                    let mime = Mime::from_str(&content_type).map_err(|_| {
722                        crate::Error::SendQueueWedgeError(Box::new(
723                            QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
724                        ))
725                    })?;
726
727                    #[cfg(feature = "e2e-encryption")]
728                    let media_source = if room.latest_encryption_state().await?.is_encrypted() {
729                        trace!("upload will be encrypted (encrypted room)");
730                        let mut cursor = std::io::Cursor::new(data);
731                        let encrypted_file = room
732                            .client()
733                            .upload_encrypted_file(&mut cursor)
734                            .with_request_config(RequestConfig::short_retry())
735                            .await?;
736                        MediaSource::Encrypted(Box::new(encrypted_file))
737                    } else {
738                        trace!("upload will be in clear text (room without encryption)");
739                        let request_config = RequestConfig::short_retry()
740                            .timeout(Media::reasonable_upload_timeout(&data));
741                        let res =
742                            room.client().media().upload(&mime, data, Some(request_config)).await?;
743                        MediaSource::Plain(res.content_uri)
744                    };
745
746                    #[cfg(not(feature = "e2e-encryption"))]
747                    let media_source = {
748                        let request_config = RequestConfig::short_retry()
749                            .timeout(Media::reasonable_upload_timeout(&data));
750                        let res =
751                            room.client().media().upload(&mime, data, Some(request_config)).await?;
752                        MediaSource::Plain(res.content_uri)
753                    };
754
755                    let uri = match &media_source {
756                        MediaSource::Plain(uri) => uri,
757                        MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
758                    };
759                    trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
760
761                    Ok(SentRequestKey::Media(SentMediaInfo {
762                        file: media_source,
763                        thumbnail: thumbnail_source,
764                        #[cfg(feature = "unstable-msc4274")]
765                        accumulated,
766                    }))
767                };
768
769                let wait_for_cancel = async move {
770                    if let Some(rx) = cancel_upload_rx {
771                        rx.await
772                    } else {
773                        std::future::pending().await
774                    }
775                };
776
777                tokio::select! {
778                    biased;
779
780                    _ = wait_for_cancel => {
781                        Ok(None)
782                    }
783
784                    res = fut => {
785                        res.map(Some)
786                    }
787                }
788            }
789        }
790    }
791
792    /// Returns whether the room is enabled, at the room level.
793    pub fn is_enabled(&self) -> bool {
794        self.inner.locally_enabled.load(Ordering::SeqCst)
795    }
796
797    /// Set the locally enabled flag for this room queue.
798    pub fn set_enabled(&self, enabled: bool) {
799        self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
800
801        // No need to wake a task to tell it it's been disabled, so only notify if we're
802        // re-enabling the queue.
803        if enabled {
804            self.inner.notifier.notify_one();
805        }
806    }
807}
808
809impl From<&crate::Error> for QueueWedgeError {
810    fn from(value: &crate::Error) -> Self {
811        match value {
812            #[cfg(feature = "e2e-encryption")]
813            crate::Error::OlmError(error) => match &**error {
814                OlmError::SessionRecipientCollectionError(error) => match error {
815                    SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
816                        QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
817                    }
818
819                    SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
820                        QueueWedgeError::IdentityViolations { users: users.clone() }
821                    }
822
823                    SessionRecipientCollectionError::CrossSigningNotSetup
824                    | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
825                        QueueWedgeError::CrossVerificationRequired
826                    }
827                },
828                _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
829            },
830
831            // Flatten errors of `Self` type.
832            crate::Error::SendQueueWedgeError(error) => *error.clone(),
833
834            _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
835        }
836    }
837}
838
839struct RoomSendQueueInner {
840    /// The room which this send queue relates to.
841    room: WeakRoom,
842
843    /// Broadcaster for notifications about the statuses of requests to be sent.
844    ///
845    /// Can be subscribed to from the outside.
846    updates: broadcast::Sender<RoomSendQueueUpdate>,
847
848    /// Queue of requests that are either to be sent, or being sent.
849    ///
850    /// When a request has been sent to the server, it is removed from that
851    /// queue *after* being sent. That way, we will retry sending upon
852    /// failure, in the same order requests have been inserted in the first
853    /// place.
854    queue: QueueStorage,
855
856    /// A notifier that's updated any time common data is touched (stopped or
857    /// enabled statuses), or the associated room [`QueueStorage`].
858    notifier: Arc<Notify>,
859
860    /// Should the room process new requests or not (because e.g. it might be
861    /// running off the network)?
862    locally_enabled: Arc<AtomicBool>,
863
864    /// Handle to the actual sending task. Unused, but kept alive along this
865    /// data structure.
866    _task: JoinHandle<()>,
867}
868
869/// Information about a request being sent right this moment.
870struct BeingSentInfo {
871    /// Transaction id of the thing being sent.
872    transaction_id: OwnedTransactionId,
873
874    /// For an upload request, a trigger to cancel the upload before it
875    /// completes.
876    cancel_upload: Option<oneshot::Sender<()>>,
877}
878
879impl BeingSentInfo {
880    /// Aborts the upload, if a trigger is available.
881    ///
882    /// Consumes the object because the sender is a oneshot and will be consumed
883    /// upon sending.
884    fn cancel_upload(self) -> bool {
885        if let Some(cancel_upload) = self.cancel_upload {
886            let _ = cancel_upload.send(());
887            true
888        } else {
889            false
890        }
891    }
892}
893
894/// A specialized lock that guards both against the state store and the
895/// [`Self::being_sent`] data.
896#[derive(Clone)]
897struct StoreLock {
898    /// Reference to the client, to get access to the underlying store.
899    client: WeakClient,
900
901    /// The one queued request that is being sent at the moment, along with
902    /// associated data that can be useful to act upon it.
903    ///
904    /// Also used as the lock to access the state store.
905    being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
906}
907
908impl StoreLock {
909    /// Gets a hold of the locked store and [`Self::being_sent`] pair.
910    async fn lock(&self) -> StoreLockGuard {
911        StoreLockGuard {
912            client: self.client.clone(),
913            being_sent: self.being_sent.clone().lock_owned().await,
914        }
915    }
916}
917
918/// A lock guard obtained through locking with [`StoreLock`].
919/// `being_sent` data.
920struct StoreLockGuard {
921    /// Reference to the client, to get access to the underlying store.
922    client: WeakClient,
923
924    /// The one queued request that is being sent at the moment, along with
925    /// associated data that can be useful to act upon it.
926    being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
927}
928
929impl StoreLockGuard {
930    /// Get a client from the locked state, useful to get a handle on a store.
931    fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
932        self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
933    }
934}
935
936#[derive(Clone)]
937struct QueueStorage {
938    /// A lock to make sure the state store is only accessed once at a time, to
939    /// make some store operations atomic.
940    store: StoreLock,
941
942    /// To which room is this storage related.
943    room_id: OwnedRoomId,
944}
945
946impl QueueStorage {
947    /// Default priority for a queued request.
948    const LOW_PRIORITY: usize = 0;
949
950    /// High priority for a queued request that must be handled before others.
951    const HIGH_PRIORITY: usize = 10;
952
953    /// Create a new queue for queuing requests to be sent later.
954    fn new(client: WeakClient, room: OwnedRoomId) -> Self {
955        Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } }
956    }
957
958    /// Push a new event to be sent in the queue, with a default priority of 0.
959    ///
960    /// Returns the transaction id chosen to identify the request.
961    async fn push(
962        &self,
963        request: QueuedRequestKind,
964        created_at: MilliSecondsSinceUnixEpoch,
965    ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
966        let transaction_id = TransactionId::new();
967
968        self.store
969            .lock()
970            .await
971            .client()?
972            .state_store()
973            .save_send_queue_request(
974                &self.room_id,
975                transaction_id.clone(),
976                created_at,
977                request,
978                Self::LOW_PRIORITY,
979            )
980            .await?;
981
982        Ok(transaction_id)
983    }
984
985    /// Peeks the next request to be sent, marking it as being sent.
986    ///
987    /// It is required to call [`Self::mark_as_sent`] after it's been
988    /// effectively sent.
989    async fn peek_next_to_send(
990        &self,
991    ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
992    {
993        let mut guard = self.store.lock().await;
994        let queued_requests =
995            guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
996
997        if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
998            let (cancel_upload_tx, cancel_upload_rx) =
999                if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1000                    let (tx, rx) = oneshot::channel();
1001                    (Some(tx), Some(rx))
1002                } else {
1003                    Default::default()
1004                };
1005
1006            let prev = guard.being_sent.replace(BeingSentInfo {
1007                transaction_id: request.transaction_id.clone(),
1008                cancel_upload: cancel_upload_tx,
1009            });
1010
1011            if let Some(prev) = prev {
1012                error!(
1013                    prev_txn = ?prev.transaction_id,
1014                    "a previous request was still active while picking a new one"
1015                );
1016            }
1017
1018            Ok(Some((request.clone(), cancel_upload_rx)))
1019        } else {
1020            Ok(None)
1021        }
1022    }
1023
1024    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1025    /// with the given transaction id as not being sent anymore, so it can
1026    /// be removed from the queue later.
1027    async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1028        let was_being_sent = self.store.lock().await.being_sent.take();
1029
1030        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1031        if prev_txn != Some(transaction_id) {
1032            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1033        }
1034    }
1035
1036    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1037    /// with the given transaction id as being wedged (and not being sent
1038    /// anymore), so it can be removed from the queue later.
1039    async fn mark_as_wedged(
1040        &self,
1041        transaction_id: &TransactionId,
1042        reason: QueueWedgeError,
1043    ) -> Result<(), RoomSendQueueStorageError> {
1044        // Keep the lock until we're done touching the storage.
1045        let mut guard = self.store.lock().await;
1046        let was_being_sent = guard.being_sent.take();
1047
1048        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1049        if prev_txn != Some(transaction_id) {
1050            error!(
1051                ?prev_txn,
1052                "previous active request didn't match that we expect (after permanent error)",
1053            );
1054        }
1055
1056        Ok(guard
1057            .client()?
1058            .state_store()
1059            .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1060            .await?)
1061    }
1062
1063    /// Marks a request identified with the given transaction id as being now
1064    /// unwedged and adds it back to the queue.
1065    async fn mark_as_unwedged(
1066        &self,
1067        transaction_id: &TransactionId,
1068    ) -> Result<(), RoomSendQueueStorageError> {
1069        Ok(self
1070            .store
1071            .lock()
1072            .await
1073            .client()?
1074            .state_store()
1075            .update_send_queue_request_status(&self.room_id, transaction_id, None)
1076            .await?)
1077    }
1078
1079    /// Marks a request pushed with [`Self::push`] and identified with the given
1080    /// transaction id as sent, by removing it from the local queue.
1081    async fn mark_as_sent(
1082        &self,
1083        transaction_id: &TransactionId,
1084        parent_key: SentRequestKey,
1085    ) -> Result<(), RoomSendQueueStorageError> {
1086        // Keep the lock until we're done touching the storage.
1087        let mut guard = self.store.lock().await;
1088        let was_being_sent = guard.being_sent.take();
1089
1090        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1091        if prev_txn != Some(transaction_id) {
1092            error!(
1093                ?prev_txn,
1094                "previous active request didn't match that we expect (after successful send)",
1095            );
1096        }
1097
1098        let client = guard.client()?;
1099        let store = client.state_store();
1100
1101        // Update all dependent requests.
1102        store
1103            .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1104            .await?;
1105
1106        let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1107
1108        if !removed {
1109            warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1110        }
1111
1112        Ok(())
1113    }
1114
1115    /// Cancel a sending command for an event that has been sent with
1116    /// [`Self::push`] with the given transaction id.
1117    ///
1118    /// Returns whether the given transaction has been effectively removed. If
1119    /// false, this either means that the transaction id was unrelated to
1120    /// this queue, or that the request was sent before we cancelled it.
1121    async fn cancel_event(
1122        &self,
1123        transaction_id: &TransactionId,
1124    ) -> Result<bool, RoomSendQueueStorageError> {
1125        let guard = self.store.lock().await;
1126
1127        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1128            == Some(transaction_id)
1129        {
1130            // Save the intent to redact the event.
1131            guard
1132                .client()?
1133                .state_store()
1134                .save_dependent_queued_request(
1135                    &self.room_id,
1136                    transaction_id,
1137                    ChildTransactionId::new(),
1138                    MilliSecondsSinceUnixEpoch::now(),
1139                    DependentQueuedRequestKind::RedactEvent,
1140                )
1141                .await?;
1142
1143            return Ok(true);
1144        }
1145
1146        let removed = guard
1147            .client()?
1148            .state_store()
1149            .remove_send_queue_request(&self.room_id, transaction_id)
1150            .await?;
1151
1152        Ok(removed)
1153    }
1154
1155    /// Replace an event that has been sent with [`Self::push`] with the given
1156    /// transaction id, before it's been actually sent.
1157    ///
1158    /// Returns whether the given transaction has been effectively edited. If
1159    /// false, this either means that the transaction id was unrelated to
1160    /// this queue, or that the request was sent before we edited it.
1161    async fn replace_event(
1162        &self,
1163        transaction_id: &TransactionId,
1164        serializable: SerializableEventContent,
1165    ) -> Result<bool, RoomSendQueueStorageError> {
1166        let guard = self.store.lock().await;
1167
1168        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1169            == Some(transaction_id)
1170        {
1171            // Save the intent to edit the associated event.
1172            guard
1173                .client()?
1174                .state_store()
1175                .save_dependent_queued_request(
1176                    &self.room_id,
1177                    transaction_id,
1178                    ChildTransactionId::new(),
1179                    MilliSecondsSinceUnixEpoch::now(),
1180                    DependentQueuedRequestKind::EditEvent { new_content: serializable },
1181                )
1182                .await?;
1183
1184            return Ok(true);
1185        }
1186
1187        let edited = guard
1188            .client()?
1189            .state_store()
1190            .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1191            .await?;
1192
1193        Ok(edited)
1194    }
1195
1196    /// Push requests (and dependents) to upload a media.
1197    ///
1198    /// See the module-level description for details of the whole processus.
1199    #[allow(clippy::too_many_arguments)]
1200    async fn push_media(
1201        &self,
1202        event: RoomMessageEventContent,
1203        content_type: Mime,
1204        send_event_txn: OwnedTransactionId,
1205        created_at: MilliSecondsSinceUnixEpoch,
1206        upload_file_txn: OwnedTransactionId,
1207        file_media_request: MediaRequestParameters,
1208        thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1209    ) -> Result<(), RoomSendQueueStorageError> {
1210        let guard = self.store.lock().await;
1211        let client = guard.client()?;
1212        let store = client.state_store();
1213
1214        let thumbnail_info = self
1215            .push_thumbnail_and_media_uploads(
1216                store,
1217                &content_type,
1218                send_event_txn.clone(),
1219                created_at,
1220                upload_file_txn.clone(),
1221                file_media_request,
1222                thumbnail,
1223            )
1224            .await?;
1225
1226        // Push the dependent request for the event itself.
1227        store
1228            .save_dependent_queued_request(
1229                &self.room_id,
1230                &upload_file_txn,
1231                send_event_txn.into(),
1232                created_at,
1233                DependentQueuedRequestKind::FinishUpload {
1234                    local_echo: Box::new(event),
1235                    file_upload: upload_file_txn.clone(),
1236                    thumbnail_info,
1237                },
1238            )
1239            .await?;
1240
1241        Ok(())
1242    }
1243
1244    /// Push requests (and dependents) to upload a gallery.
1245    ///
1246    /// See the module-level description for details of the whole processus.
1247    #[cfg(feature = "unstable-msc4274")]
1248    #[allow(clippy::too_many_arguments)]
1249    async fn push_gallery(
1250        &self,
1251        event: RoomMessageEventContent,
1252        send_event_txn: OwnedTransactionId,
1253        created_at: MilliSecondsSinceUnixEpoch,
1254        item_queue_infos: Vec<GalleryItemQueueInfo>,
1255    ) -> Result<(), RoomSendQueueStorageError> {
1256        let guard = self.store.lock().await;
1257        let client = guard.client()?;
1258        let store = client.state_store();
1259
1260        let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1261
1262        let Some((first, rest)) = item_queue_infos.split_first() else {
1263            return Ok(());
1264        };
1265
1266        let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1267            first;
1268
1269        let thumbnail_info = self
1270            .push_thumbnail_and_media_uploads(
1271                store,
1272                content_type,
1273                send_event_txn.clone(),
1274                created_at,
1275                upload_file_txn.clone(),
1276                file_media_request.clone(),
1277                thumbnail.clone(),
1278            )
1279            .await?;
1280
1281        finish_item_infos
1282            .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1283
1284        let mut last_upload_file_txn = upload_file_txn.clone();
1285
1286        for item_queue_info in rest {
1287            let GalleryItemQueueInfo {
1288                content_type,
1289                upload_file_txn,
1290                file_media_request,
1291                thumbnail,
1292            } = item_queue_info;
1293
1294            let thumbnail_info =
1295                if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) =
1296                    thumbnail
1297                {
1298                    let upload_thumbnail_txn = thumbnail_info.txn.clone();
1299
1300                    // Save the thumbnail upload request as a dependent request of the last file
1301                    // upload.
1302                    store
1303                        .save_dependent_queued_request(
1304                            &self.room_id,
1305                            &last_upload_file_txn,
1306                            upload_thumbnail_txn.clone().into(),
1307                            created_at,
1308                            DependentQueuedRequestKind::UploadFileOrThumbnail {
1309                                content_type: thumbnail_content_type.to_string(),
1310                                cache_key: thumbnail_media_request.clone(),
1311                                related_to: send_event_txn.clone(),
1312                                parent_is_thumbnail_upload: false,
1313                            },
1314                        )
1315                        .await?;
1316
1317                    last_upload_file_txn = upload_thumbnail_txn;
1318
1319                    Some(thumbnail_info)
1320                } else {
1321                    None
1322                };
1323
1324            // Save the file upload as a dependent request of the previous upload.
1325            store
1326                .save_dependent_queued_request(
1327                    &self.room_id,
1328                    &last_upload_file_txn,
1329                    upload_file_txn.clone().into(),
1330                    created_at,
1331                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1332                        content_type: content_type.to_string(),
1333                        cache_key: file_media_request.clone(),
1334                        related_to: send_event_txn.clone(),
1335                        parent_is_thumbnail_upload: thumbnail.is_some(),
1336                    },
1337                )
1338                .await?;
1339
1340            finish_item_infos.push(FinishGalleryItemInfo {
1341                file_upload: upload_file_txn.clone(),
1342                thumbnail_info: thumbnail_info.cloned(),
1343            });
1344
1345            last_upload_file_txn = upload_file_txn.clone();
1346        }
1347
1348        // Push the request for the event itself as a dependent request of the last file
1349        // upload.
1350        store
1351            .save_dependent_queued_request(
1352                &self.room_id,
1353                &last_upload_file_txn,
1354                send_event_txn.into(),
1355                created_at,
1356                DependentQueuedRequestKind::FinishGallery {
1357                    local_echo: Box::new(event),
1358                    item_infos: finish_item_infos,
1359                },
1360            )
1361            .await?;
1362
1363        Ok(())
1364    }
1365
1366    /// If a thumbnail exists, pushes a [`QueuedRequestKind::MediaUpload`] to
1367    /// upload it
1368    /// and a [`DependentQueuedRequestKind::UploadFileOrThumbnail`] to upload
1369    /// the media itself. Otherwise, pushes a
1370    /// [`QueuedRequestKind::MediaUpload`] to upload the media directly.
1371    #[allow(clippy::too_many_arguments)]
1372    async fn push_thumbnail_and_media_uploads(
1373        &self,
1374        store: &DynStateStore,
1375        content_type: &Mime,
1376        send_event_txn: OwnedTransactionId,
1377        created_at: MilliSecondsSinceUnixEpoch,
1378        upload_file_txn: OwnedTransactionId,
1379        file_media_request: MediaRequestParameters,
1380        thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1381    ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1382        if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) = thumbnail {
1383            let upload_thumbnail_txn = thumbnail_info.txn.clone();
1384
1385            // Save the thumbnail upload request.
1386            store
1387                .save_send_queue_request(
1388                    &self.room_id,
1389                    upload_thumbnail_txn.clone(),
1390                    created_at,
1391                    QueuedRequestKind::MediaUpload {
1392                        content_type: thumbnail_content_type.to_string(),
1393                        cache_key: thumbnail_media_request,
1394                        thumbnail_source: None, // the thumbnail has no thumbnails :)
1395                        related_to: send_event_txn.clone(),
1396                        #[cfg(feature = "unstable-msc4274")]
1397                        accumulated: vec![],
1398                    },
1399                    Self::LOW_PRIORITY,
1400                )
1401                .await?;
1402
1403            // Save the file upload request as a dependent request of the thumbnail upload.
1404            store
1405                .save_dependent_queued_request(
1406                    &self.room_id,
1407                    &upload_thumbnail_txn,
1408                    upload_file_txn.into(),
1409                    created_at,
1410                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1411                        content_type: content_type.to_string(),
1412                        cache_key: file_media_request,
1413                        related_to: send_event_txn,
1414                        #[cfg(feature = "unstable-msc4274")]
1415                        parent_is_thumbnail_upload: true,
1416                    },
1417                )
1418                .await?;
1419
1420            Ok(Some(thumbnail_info))
1421        } else {
1422            // Save the file upload as its own request, not a dependent one.
1423            store
1424                .save_send_queue_request(
1425                    &self.room_id,
1426                    upload_file_txn,
1427                    created_at,
1428                    QueuedRequestKind::MediaUpload {
1429                        content_type: content_type.to_string(),
1430                        cache_key: file_media_request,
1431                        thumbnail_source: None,
1432                        related_to: send_event_txn,
1433                        #[cfg(feature = "unstable-msc4274")]
1434                        accumulated: vec![],
1435                    },
1436                    Self::LOW_PRIORITY,
1437                )
1438                .await?;
1439
1440            Ok(None)
1441        }
1442    }
1443
1444    /// Reacts to the given local echo of an event.
1445    #[instrument(skip(self))]
1446    async fn react(
1447        &self,
1448        transaction_id: &TransactionId,
1449        key: String,
1450        created_at: MilliSecondsSinceUnixEpoch,
1451    ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1452        let guard = self.store.lock().await;
1453        let client = guard.client()?;
1454        let store = client.state_store();
1455
1456        let requests = store.load_send_queue_requests(&self.room_id).await?;
1457
1458        // If the target event has been already sent, abort immediately.
1459        if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1460            // We didn't find it as a queued request; try to find it as a dependent queued
1461            // request.
1462            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1463            if !dependent_requests
1464                .into_iter()
1465                .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1466                .any(|child_txn| *child_txn == *transaction_id)
1467            {
1468                // We didn't find it as either a request or a dependent request, abort.
1469                return Ok(None);
1470            }
1471        }
1472
1473        // Record the dependent request.
1474        let reaction_txn_id = ChildTransactionId::new();
1475        store
1476            .save_dependent_queued_request(
1477                &self.room_id,
1478                transaction_id,
1479                reaction_txn_id.clone(),
1480                created_at,
1481                DependentQueuedRequestKind::ReactEvent { key },
1482            )
1483            .await?;
1484
1485        Ok(Some(reaction_txn_id))
1486    }
1487
1488    /// Returns a list of the local echoes, that is, all the requests that we're
1489    /// about to send but that haven't been sent yet (or are being sent).
1490    async fn local_echoes(
1491        &self,
1492        room: &RoomSendQueue,
1493    ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1494        let guard = self.store.lock().await;
1495        let client = guard.client()?;
1496        let store = client.state_store();
1497
1498        let local_requests =
1499            store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1500                Some(LocalEcho {
1501                    transaction_id: queued.transaction_id.clone(),
1502                    content: match queued.kind {
1503                        QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1504                            serialized_event: content,
1505                            send_handle: SendHandle {
1506                                room: room.clone(),
1507                                transaction_id: queued.transaction_id,
1508                                media_handles: vec![],
1509                                created_at: queued.created_at,
1510                            },
1511                            send_error: queued.error,
1512                        },
1513
1514                        QueuedRequestKind::MediaUpload { .. } => {
1515                            // Don't return uploaded medias as their own things; the accompanying
1516                            // event represented as a dependent request should be sufficient.
1517                            return None;
1518                        }
1519                    },
1520                })
1521            });
1522
1523        let reactions_and_medias = store
1524            .load_dependent_queued_requests(&self.room_id)
1525            .await?
1526            .into_iter()
1527            .filter_map(|dep| match dep.kind {
1528                DependentQueuedRequestKind::EditEvent { .. }
1529                | DependentQueuedRequestKind::RedactEvent => {
1530                    // TODO: reflect local edits/redacts too?
1531                    None
1532                }
1533
1534                DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1535                    transaction_id: dep.own_transaction_id.clone().into(),
1536                    content: LocalEchoContent::React {
1537                        key,
1538                        send_handle: SendReactionHandle {
1539                            room: room.clone(),
1540                            transaction_id: dep.own_transaction_id,
1541                        },
1542                        applies_to: dep.parent_transaction_id,
1543                    },
1544                }),
1545
1546                DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1547                    // Don't reflect these: only the associated event is interesting to observers.
1548                    None
1549                }
1550
1551                DependentQueuedRequestKind::FinishUpload {
1552                    local_echo,
1553                    file_upload,
1554                    thumbnail_info,
1555                } => {
1556                    // Materialize as an event local echo.
1557                    Some(LocalEcho {
1558                        transaction_id: dep.own_transaction_id.clone().into(),
1559                        content: LocalEchoContent::Event {
1560                            serialized_event: SerializableEventContent::new(&(*local_echo).into())
1561                                .ok()?,
1562                            send_handle: SendHandle {
1563                                room: room.clone(),
1564                                transaction_id: dep.own_transaction_id.into(),
1565                                media_handles: vec![MediaHandles {
1566                                    upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1567                                    upload_file_txn: file_upload,
1568                                }],
1569                                created_at: dep.created_at,
1570                            },
1571                            send_error: None,
1572                        },
1573                    })
1574                }
1575
1576                #[cfg(feature = "unstable-msc4274")]
1577                DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1578                    // Materialize as an event local echo.
1579                    self.create_gallery_local_echo(
1580                        dep.own_transaction_id,
1581                        room,
1582                        dep.created_at,
1583                        local_echo,
1584                        item_infos,
1585                    )
1586                }
1587            });
1588
1589        Ok(local_requests.chain(reactions_and_medias).collect())
1590    }
1591
1592    /// Create a local echo for a gallery event.
1593    #[cfg(feature = "unstable-msc4274")]
1594    fn create_gallery_local_echo(
1595        &self,
1596        transaction_id: ChildTransactionId,
1597        room: &RoomSendQueue,
1598        created_at: MilliSecondsSinceUnixEpoch,
1599        local_echo: Box<RoomMessageEventContent>,
1600        item_infos: Vec<FinishGalleryItemInfo>,
1601    ) -> Option<LocalEcho> {
1602        Some(LocalEcho {
1603            transaction_id: transaction_id.clone().into(),
1604            content: LocalEchoContent::Event {
1605                serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1606                send_handle: SendHandle {
1607                    room: room.clone(),
1608                    transaction_id: transaction_id.into(),
1609                    media_handles: item_infos
1610                        .into_iter()
1611                        .map(|i| MediaHandles {
1612                            upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1613                            upload_file_txn: i.file_upload,
1614                        })
1615                        .collect(),
1616                    created_at,
1617                },
1618                send_error: None,
1619            },
1620        })
1621    }
1622
1623    /// Try to apply a single dependent request, whether it's local or remote.
1624    ///
1625    /// This swallows errors that would retrigger every time if we retried
1626    /// applying the dependent request: invalid edit content, etc.
1627    ///
1628    /// Returns true if the dependent request has been sent (or should not be
1629    /// retried later).
1630    #[instrument(skip_all)]
1631    async fn try_apply_single_dependent_request(
1632        &self,
1633        client: &Client,
1634        dependent_request: DependentQueuedRequest,
1635        new_updates: &mut Vec<RoomSendQueueUpdate>,
1636    ) -> Result<bool, RoomSendQueueError> {
1637        let store = client.state_store();
1638
1639        let parent_key = dependent_request.parent_key;
1640
1641        match dependent_request.kind {
1642            DependentQueuedRequestKind::EditEvent { new_content } => {
1643                if let Some(parent_key) = parent_key {
1644                    let Some(event_id) = parent_key.into_event_id() else {
1645                        return Err(RoomSendQueueError::StorageError(
1646                            RoomSendQueueStorageError::InvalidParentKey,
1647                        ));
1648                    };
1649
1650                    // The parent event has been sent, so send an edit event.
1651                    let room = client
1652                        .get_room(&self.room_id)
1653                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1654
1655                    // Check the event is one we know how to edit with an edit event.
1656
1657                    // It must be deserializable…
1658                    let edited_content = match new_content.deserialize() {
1659                        Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1660                            // Assume no relationships.
1661                            EditedContent::RoomMessage(c.into())
1662                        }
1663
1664                        Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1665                            let poll_start = c.poll_start().clone();
1666                            EditedContent::PollStart {
1667                                fallback_text: poll_start.question.text.clone(),
1668                                new_content: poll_start,
1669                            }
1670                        }
1671
1672                        Ok(c) => {
1673                            warn!("Unsupported edit content type: {:?}", c.event_type());
1674                            return Ok(true);
1675                        }
1676
1677                        Err(err) => {
1678                            warn!("Unable to deserialize: {err}");
1679                            return Ok(true);
1680                        }
1681                    };
1682
1683                    let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1684                        Ok(e) => e,
1685                        Err(err) => {
1686                            warn!("couldn't create edited event: {err}");
1687                            return Ok(true);
1688                        }
1689                    };
1690
1691                    // Queue the edit event in the send queue 🧠.
1692                    let serializable = SerializableEventContent::from_raw(
1693                        Raw::new(&edit_event)
1694                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1695                        edit_event.event_type().to_string(),
1696                    );
1697
1698                    store
1699                        .save_send_queue_request(
1700                            &self.room_id,
1701                            dependent_request.own_transaction_id.into(),
1702                            dependent_request.created_at,
1703                            serializable.into(),
1704                            Self::HIGH_PRIORITY,
1705                        )
1706                        .await
1707                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1708                } else {
1709                    // The parent event is still local; update the local echo.
1710                    let edited = store
1711                        .update_send_queue_request(
1712                            &self.room_id,
1713                            &dependent_request.parent_transaction_id,
1714                            new_content.into(),
1715                        )
1716                        .await
1717                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1718
1719                    if !edited {
1720                        warn!("missing local echo upon dependent edit");
1721                    }
1722                }
1723            }
1724
1725            DependentQueuedRequestKind::RedactEvent => {
1726                if let Some(parent_key) = parent_key {
1727                    let Some(event_id) = parent_key.into_event_id() else {
1728                        return Err(RoomSendQueueError::StorageError(
1729                            RoomSendQueueStorageError::InvalidParentKey,
1730                        ));
1731                    };
1732
1733                    // The parent event has been sent; send a redaction.
1734                    let room = client
1735                        .get_room(&self.room_id)
1736                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1737
1738                    // Ideally we'd use the send queue to send the redaction, but the protocol has
1739                    // changed the shape of a room.redaction after v11, so keep it simple and try
1740                    // once here.
1741
1742                    // Note: no reason is provided because we materialize the intent of "cancel
1743                    // sending the parent event".
1744
1745                    if let Err(err) = room
1746                        .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1747                        .await
1748                    {
1749                        warn!("error when sending a redact for {event_id}: {err}");
1750                        return Ok(false);
1751                    }
1752                } else {
1753                    // The parent event is still local (sending must have failed); redact the local
1754                    // echo.
1755                    let removed = store
1756                        .remove_send_queue_request(
1757                            &self.room_id,
1758                            &dependent_request.parent_transaction_id,
1759                        )
1760                        .await
1761                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1762
1763                    if !removed {
1764                        warn!("missing local echo upon dependent redact");
1765                    }
1766                }
1767            }
1768
1769            DependentQueuedRequestKind::ReactEvent { key } => {
1770                if let Some(parent_key) = parent_key {
1771                    let Some(parent_event_id) = parent_key.into_event_id() else {
1772                        return Err(RoomSendQueueError::StorageError(
1773                            RoomSendQueueStorageError::InvalidParentKey,
1774                        ));
1775                    };
1776
1777                    // Queue the reaction event in the send queue 🧠.
1778                    let react_event =
1779                        ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1780                    let serializable = SerializableEventContent::from_raw(
1781                        Raw::new(&react_event)
1782                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1783                        react_event.event_type().to_string(),
1784                    );
1785
1786                    store
1787                        .save_send_queue_request(
1788                            &self.room_id,
1789                            dependent_request.own_transaction_id.into(),
1790                            dependent_request.created_at,
1791                            serializable.into(),
1792                            Self::HIGH_PRIORITY,
1793                        )
1794                        .await
1795                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1796                } else {
1797                    // Not applied yet, we should retry later => false.
1798                    return Ok(false);
1799                }
1800            }
1801
1802            DependentQueuedRequestKind::UploadFileOrThumbnail {
1803                content_type,
1804                cache_key,
1805                related_to,
1806                #[cfg(feature = "unstable-msc4274")]
1807                parent_is_thumbnail_upload,
1808            } => {
1809                let Some(parent_key) = parent_key else {
1810                    // Not finished yet, we should retry later => false.
1811                    return Ok(false);
1812                };
1813                let parent_is_thumbnail_upload = {
1814                    cfg_if::cfg_if! {
1815                        if #[cfg(feature = "unstable-msc4274")] {
1816                            parent_is_thumbnail_upload
1817                        } else {
1818                            // Before parent_is_thumbnail_upload was introduced, the only
1819                            // possible usage for this request was a file upload following
1820                            // a thumbnail upload.
1821                            true
1822                        }
1823                    }
1824                };
1825                self.handle_dependent_file_or_thumbnail_upload(
1826                    client,
1827                    dependent_request.own_transaction_id.into(),
1828                    parent_key,
1829                    content_type,
1830                    cache_key,
1831                    related_to,
1832                    parent_is_thumbnail_upload,
1833                )
1834                .await?;
1835            }
1836
1837            DependentQueuedRequestKind::FinishUpload {
1838                local_echo,
1839                file_upload,
1840                thumbnail_info,
1841            } => {
1842                let Some(parent_key) = parent_key else {
1843                    // Not finished yet, we should retry later => false.
1844                    return Ok(false);
1845                };
1846                self.handle_dependent_finish_upload(
1847                    client,
1848                    dependent_request.own_transaction_id.into(),
1849                    parent_key,
1850                    *local_echo,
1851                    file_upload,
1852                    thumbnail_info,
1853                    new_updates,
1854                )
1855                .await?;
1856            }
1857
1858            #[cfg(feature = "unstable-msc4274")]
1859            DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1860                let Some(parent_key) = parent_key else {
1861                    // Not finished yet, we should retry later => false.
1862                    return Ok(false);
1863                };
1864                self.handle_dependent_finish_gallery_upload(
1865                    client,
1866                    dependent_request.own_transaction_id.into(),
1867                    parent_key,
1868                    *local_echo,
1869                    item_infos,
1870                    new_updates,
1871                )
1872                .await?;
1873            }
1874        }
1875
1876        Ok(true)
1877    }
1878
1879    #[instrument(skip(self))]
1880    async fn apply_dependent_requests(
1881        &self,
1882        new_updates: &mut Vec<RoomSendQueueUpdate>,
1883    ) -> Result<(), RoomSendQueueError> {
1884        let guard = self.store.lock().await;
1885
1886        let client = guard.client()?;
1887        let store = client.state_store();
1888
1889        let dependent_requests = store
1890            .load_dependent_queued_requests(&self.room_id)
1891            .await
1892            .map_err(RoomSendQueueStorageError::StateStoreError)?;
1893
1894        let num_initial_dependent_requests = dependent_requests.len();
1895        if num_initial_dependent_requests == 0 {
1896            // Returning early here avoids a bit of useless logging.
1897            return Ok(());
1898        }
1899
1900        let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
1901
1902        // Get rid of the all non-canonical dependent events.
1903        for original in &dependent_requests {
1904            if !canonicalized_dependent_requests
1905                .iter()
1906                .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
1907            {
1908                store
1909                    .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
1910                    .await
1911                    .map_err(RoomSendQueueStorageError::StateStoreError)?;
1912            }
1913        }
1914
1915        let mut num_dependent_requests = canonicalized_dependent_requests.len();
1916
1917        debug!(
1918            num_dependent_requests,
1919            num_initial_dependent_requests, "starting handling of dependent requests"
1920        );
1921
1922        for dependent in canonicalized_dependent_requests {
1923            let dependent_id = dependent.own_transaction_id.clone();
1924
1925            match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
1926                Ok(should_remove) => {
1927                    if should_remove {
1928                        // The dependent request has been successfully applied, forget about it.
1929                        store
1930                            .remove_dependent_queued_request(&self.room_id, &dependent_id)
1931                            .await
1932                            .map_err(RoomSendQueueStorageError::StateStoreError)?;
1933
1934                        num_dependent_requests -= 1;
1935                    }
1936                }
1937
1938                Err(err) => {
1939                    warn!("error when applying single dependent request: {err}");
1940                }
1941            }
1942        }
1943
1944        debug!(
1945            leftover_dependent_requests = num_dependent_requests,
1946            "stopped handling dependent request"
1947        );
1948
1949        Ok(())
1950    }
1951
1952    /// Remove a single dependent request from storage.
1953    async fn remove_dependent_send_queue_request(
1954        &self,
1955        dependent_event_id: &ChildTransactionId,
1956    ) -> Result<bool, RoomSendQueueStorageError> {
1957        Ok(self
1958            .store
1959            .lock()
1960            .await
1961            .client()?
1962            .state_store()
1963            .remove_dependent_queued_request(&self.room_id, dependent_event_id)
1964            .await?)
1965    }
1966}
1967
1968#[cfg(feature = "unstable-msc4274")]
1969/// Metadata needed for pushing gallery item uploads onto the send queue.
1970struct GalleryItemQueueInfo {
1971    content_type: Mime,
1972    upload_file_txn: OwnedTransactionId,
1973    file_media_request: MediaRequestParameters,
1974    thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1975}
1976
1977/// The content of a local echo.
1978#[derive(Clone, Debug)]
1979pub enum LocalEchoContent {
1980    /// The local echo contains an actual event ready to display.
1981    Event {
1982        /// Content of the event itself (along with its type) that we are about
1983        /// to send.
1984        serialized_event: SerializableEventContent,
1985        /// A handle to manipulate the sending of the associated event.
1986        send_handle: SendHandle,
1987        /// Whether trying to send this local echo failed in the past with an
1988        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
1989        send_error: Option<QueueWedgeError>,
1990    },
1991
1992    /// A local echo has been reacted to.
1993    React {
1994        /// The key with which the local echo has been reacted to.
1995        key: String,
1996        /// A handle to manipulate the sending of the reaction.
1997        send_handle: SendReactionHandle,
1998        /// The local echo which has been reacted to.
1999        applies_to: OwnedTransactionId,
2000    },
2001}
2002
2003/// A local representation for a request that hasn't been sent yet to the user's
2004/// homeserver.
2005#[derive(Clone, Debug)]
2006pub struct LocalEcho {
2007    /// Transaction id used to identify the associated request.
2008    pub transaction_id: OwnedTransactionId,
2009    /// The content for the local echo.
2010    pub content: LocalEchoContent,
2011}
2012
2013/// An update to a room send queue, observable with
2014/// [`RoomSendQueue::subscribe`].
2015#[derive(Clone, Debug)]
2016pub enum RoomSendQueueUpdate {
2017    /// A new local event is being sent.
2018    ///
2019    /// There's been a user query to create this event. It is being sent to the
2020    /// server.
2021    NewLocalEvent(LocalEcho),
2022
2023    /// A local event that hadn't been sent to the server yet has been cancelled
2024    /// before sending.
2025    CancelledLocalEvent {
2026        /// Transaction id used to identify this event.
2027        transaction_id: OwnedTransactionId,
2028    },
2029
2030    /// A local event's content has been replaced with something else.
2031    ReplacedLocalEvent {
2032        /// Transaction id used to identify this event.
2033        transaction_id: OwnedTransactionId,
2034
2035        /// The new content replacing the previous one.
2036        new_content: SerializableEventContent,
2037    },
2038
2039    /// An error happened when an event was being sent.
2040    ///
2041    /// The event has not been removed from the queue. All the send queues
2042    /// will be disabled after this happens, and must be manually re-enabled.
2043    SendError {
2044        /// Transaction id used to identify this event.
2045        transaction_id: OwnedTransactionId,
2046        /// Error received while sending the event.
2047        error: Arc<crate::Error>,
2048        /// Whether the error is considered recoverable or not.
2049        ///
2050        /// An error that's recoverable will disable the room's send queue,
2051        /// while an unrecoverable error will be parked, until the user
2052        /// decides to cancel sending it.
2053        is_recoverable: bool,
2054    },
2055
2056    /// The event has been unwedged and sending is now being retried.
2057    RetryEvent {
2058        /// Transaction id used to identify this event.
2059        transaction_id: OwnedTransactionId,
2060    },
2061
2062    /// The event has been sent to the server, and the query returned
2063    /// successfully.
2064    SentEvent {
2065        /// Transaction id used to identify this event.
2066        transaction_id: OwnedTransactionId,
2067        /// Received event id from the send response.
2068        event_id: OwnedEventId,
2069    },
2070
2071    /// A media has been successfully uploaded.
2072    UploadedMedia {
2073        /// The media event this uploaded media relates to.
2074        related_to: OwnedTransactionId,
2075
2076        /// The final media source for the file that was just uploaded.
2077        file: MediaSource,
2078    },
2079}
2080
2081/// An error triggered by the send queue module.
2082#[derive(Debug, thiserror::Error)]
2083pub enum RoomSendQueueError {
2084    /// The room isn't in the joined state.
2085    #[error("the room isn't in the joined state")]
2086    RoomNotJoined,
2087
2088    /// The room is missing from the client.
2089    ///
2090    /// This happens only whenever the client is shutting down.
2091    #[error("the room is now missing from the client")]
2092    RoomDisappeared,
2093
2094    /// Error coming from storage.
2095    #[error(transparent)]
2096    StorageError(#[from] RoomSendQueueStorageError),
2097
2098    /// The attachment event failed to be created.
2099    #[error("the attachment event could not be created")]
2100    FailedToCreateAttachment,
2101
2102    /// The gallery contains no items.
2103    #[cfg(feature = "unstable-msc4274")]
2104    #[error("the gallery contains no items")]
2105    EmptyGallery,
2106
2107    /// The gallery event failed to be created.
2108    #[cfg(feature = "unstable-msc4274")]
2109    #[error("the gallery event could not be created")]
2110    FailedToCreateGallery,
2111}
2112
2113/// An error triggered by the send queue storage.
2114#[derive(Debug, thiserror::Error)]
2115pub enum RoomSendQueueStorageError {
2116    /// Error caused by the state store.
2117    #[error(transparent)]
2118    StateStoreError(#[from] StoreError),
2119
2120    /// Error caused by the event cache store.
2121    #[error(transparent)]
2122    EventCacheStoreError(#[from] EventCacheStoreError),
2123
2124    /// Error caused when attempting to get a handle on the event cache store.
2125    #[error(transparent)]
2126    LockError(#[from] LockStoreError),
2127
2128    /// Error caused when (de)serializing into/from json.
2129    #[error(transparent)]
2130    JsonSerialization(#[from] serde_json::Error),
2131
2132    /// A parent key was expected to be of a certain type, and it was another
2133    /// type instead.
2134    #[error("a dependent event had an invalid parent key type")]
2135    InvalidParentKey,
2136
2137    /// The client is shutting down.
2138    #[error("The client is shutting down.")]
2139    ClientShuttingDown,
2140
2141    /// An operation not implemented on a send handle.
2142    #[error("This operation is not implemented for media uploads")]
2143    OperationNotImplementedYet,
2144
2145    /// Trying to edit a media caption for something that's not a media.
2146    #[error("Can't edit a media caption when the underlying event isn't a media")]
2147    InvalidMediaCaptionEdit,
2148}
2149
2150/// Extra transaction IDs useful during an upload.
2151#[derive(Clone, Debug)]
2152struct MediaHandles {
2153    /// Transaction id used when uploading the thumbnail.
2154    ///
2155    /// Optional because a media can be uploaded without a thumbnail.
2156    upload_thumbnail_txn: Option<OwnedTransactionId>,
2157
2158    /// Transaction id used when uploading the media itself.
2159    upload_file_txn: OwnedTransactionId,
2160}
2161
2162/// A handle to manipulate an event that was scheduled to be sent to a room.
2163#[derive(Clone, Debug)]
2164pub struct SendHandle {
2165    /// Link to the send queue used to send this request.
2166    room: RoomSendQueue,
2167
2168    /// Transaction id used for the sent request.
2169    ///
2170    /// If this is a media upload, this is the "main" transaction id, i.e. the
2171    /// one used to send the event, and that will be seen by observers.
2172    transaction_id: OwnedTransactionId,
2173
2174    /// Additional handles for a media upload.
2175    media_handles: Vec<MediaHandles>,
2176
2177    /// The time at which the event to be sent has been created.
2178    pub created_at: MilliSecondsSinceUnixEpoch,
2179}
2180
2181impl SendHandle {
2182    fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2183        if !self.media_handles.is_empty() {
2184            Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2185        } else {
2186            Ok(())
2187        }
2188    }
2189
2190    /// Aborts the sending of the event, if it wasn't sent yet.
2191    ///
2192    /// Returns true if the sending could be aborted, false if not (i.e. the
2193    /// event had already been sent).
2194    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2195    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2196        trace!("received an abort request");
2197
2198        let queue = &self.room.inner.queue;
2199
2200        for handles in &self.media_handles {
2201            if queue.abort_upload(&self.transaction_id, handles).await? {
2202                // Propagate a cancelled update.
2203                let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2204                    transaction_id: self.transaction_id.clone(),
2205                });
2206
2207                return Ok(true);
2208            }
2209
2210            // If it failed, it means the sending of the event is not a
2211            // dependent request anymore. Fall back to the regular
2212            // code path below, that handles aborting sending of an event.
2213        }
2214
2215        if queue.cancel_event(&self.transaction_id).await? {
2216            trace!("successful abort");
2217
2218            // Propagate a cancelled update too.
2219            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2220                transaction_id: self.transaction_id.clone(),
2221            });
2222
2223            Ok(true)
2224        } else {
2225            debug!("local echo didn't exist anymore, can't abort");
2226            Ok(false)
2227        }
2228    }
2229
2230    /// Edits the content of a local echo with a raw event content.
2231    ///
2232    /// Returns true if the event to be sent was replaced, false if not (i.e.
2233    /// the event had already been sent).
2234    #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2235    pub async fn edit_raw(
2236        &self,
2237        new_content: Raw<AnyMessageLikeEventContent>,
2238        event_type: String,
2239    ) -> Result<bool, RoomSendQueueStorageError> {
2240        trace!("received an edit request");
2241        self.nyi_for_uploads()?;
2242
2243        let serializable = SerializableEventContent::from_raw(new_content, event_type);
2244
2245        if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2246            trace!("successful edit");
2247
2248            // Wake up the queue, in case the room was asleep before the edit.
2249            self.room.inner.notifier.notify_one();
2250
2251            // Propagate a replaced update too.
2252            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2253                transaction_id: self.transaction_id.clone(),
2254                new_content: serializable,
2255            });
2256
2257            Ok(true)
2258        } else {
2259            debug!("local echo doesn't exist anymore, can't edit");
2260            Ok(false)
2261        }
2262    }
2263
2264    /// Edits the content of a local echo with an event content.
2265    ///
2266    /// Returns true if the event to be sent was replaced, false if not (i.e.
2267    /// the event had already been sent).
2268    pub async fn edit(
2269        &self,
2270        new_content: AnyMessageLikeEventContent,
2271    ) -> Result<bool, RoomSendQueueStorageError> {
2272        self.edit_raw(
2273            Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2274            new_content.event_type().to_string(),
2275        )
2276        .await
2277    }
2278
2279    /// Edits the content of a local echo with a media caption.
2280    ///
2281    /// Will fail if the event to be sent, represented by this send handle,
2282    /// wasn't a media.
2283    pub async fn edit_media_caption(
2284        &self,
2285        caption: Option<String>,
2286        formatted_caption: Option<FormattedBody>,
2287        mentions: Option<Mentions>,
2288    ) -> Result<bool, RoomSendQueueStorageError> {
2289        if let Some(new_content) = self
2290            .room
2291            .inner
2292            .queue
2293            .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2294            .await?
2295        {
2296            trace!("successful edit of media caption");
2297
2298            // Wake up the queue, in case the room was asleep before the edit.
2299            self.room.inner.notifier.notify_one();
2300
2301            let new_content = SerializableEventContent::new(&new_content)
2302                .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2303
2304            // Propagate a replaced update too.
2305            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2306                transaction_id: self.transaction_id.clone(),
2307                new_content,
2308            });
2309
2310            Ok(true)
2311        } else {
2312            debug!("local echo doesn't exist anymore, can't edit media caption");
2313            Ok(false)
2314        }
2315    }
2316
2317    /// Unwedge a local echo identified by its transaction identifier and try to
2318    /// resend it.
2319    pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2320        let room = &self.room.inner;
2321        room.queue
2322            .mark_as_unwedged(&self.transaction_id)
2323            .await
2324            .map_err(RoomSendQueueError::StorageError)?;
2325
2326        // If we have media handles, also try to unwedge them.
2327        //
2328        // It's fine to always do it to *all* the transaction IDs at once, because only
2329        // one of the three requests will be active at the same time, i.e. only
2330        // one entry will be updated in the store. The other two are either
2331        // done, or dependent requests.
2332
2333        for handles in &self.media_handles {
2334            room.queue
2335                .mark_as_unwedged(&handles.upload_file_txn)
2336                .await
2337                .map_err(RoomSendQueueError::StorageError)?;
2338
2339            if let Some(txn) = &handles.upload_thumbnail_txn {
2340                room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2341            }
2342        }
2343
2344        // Wake up the queue, in case the room was asleep before unwedging the request.
2345        room.notifier.notify_one();
2346
2347        let _ = room
2348            .updates
2349            .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() });
2350
2351        Ok(())
2352    }
2353
2354    /// Send a reaction to the event as soon as it's sent.
2355    ///
2356    /// If returning `Ok(None)`; this means the reaction couldn't be sent
2357    /// because the event is already a remote one.
2358    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2359    pub async fn react(
2360        &self,
2361        key: String,
2362    ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2363        trace!("received an intent to react");
2364
2365        let created_at = MilliSecondsSinceUnixEpoch::now();
2366        if let Some(reaction_txn_id) =
2367            self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2368        {
2369            trace!("successfully queued react");
2370
2371            // Wake up the queue, in case the room was asleep before the sending.
2372            self.room.inner.notifier.notify_one();
2373
2374            // Propagate a new local event.
2375            let send_handle = SendReactionHandle {
2376                room: self.room.clone(),
2377                transaction_id: reaction_txn_id.clone(),
2378            };
2379
2380            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2381                // Note: we do want to use the txn_id we're going to use for the reaction, not the
2382                // one for the event we're reacting to.
2383                transaction_id: reaction_txn_id.into(),
2384                content: LocalEchoContent::React {
2385                    key,
2386                    send_handle: send_handle.clone(),
2387                    applies_to: self.transaction_id.clone(),
2388                },
2389            }));
2390
2391            Ok(Some(send_handle))
2392        } else {
2393            debug!("local echo doesn't exist anymore, can't react");
2394            Ok(None)
2395        }
2396    }
2397}
2398
2399/// A handle to execute actions on the sending of a reaction.
2400#[derive(Clone, Debug)]
2401pub struct SendReactionHandle {
2402    /// Reference to the send queue for the room where this reaction was sent.
2403    room: RoomSendQueue,
2404    /// The own transaction id for the reaction.
2405    transaction_id: ChildTransactionId,
2406}
2407
2408impl SendReactionHandle {
2409    /// Abort the sending of the reaction.
2410    ///
2411    /// Will return true if the reaction could be aborted, false if it's been
2412    /// sent (and there's no matching local echo anymore).
2413    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2414        if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2415            // Simple case: the reaction was found in the dependent event list.
2416
2417            // Propagate a cancelled update too.
2418            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2419                transaction_id: self.transaction_id.clone().into(),
2420            });
2421
2422            return Ok(true);
2423        }
2424
2425        // The reaction has already been queued for sending, try to abort it using a
2426        // regular abort.
2427        let handle = SendHandle {
2428            room: self.room.clone(),
2429            transaction_id: self.transaction_id.clone().into(),
2430            media_handles: vec![],
2431            created_at: MilliSecondsSinceUnixEpoch::now(),
2432        };
2433
2434        handle.abort().await
2435    }
2436
2437    /// The transaction id that will be used to send this reaction later.
2438    pub fn transaction_id(&self) -> &TransactionId {
2439        &self.transaction_id
2440    }
2441}
2442
2443/// From a given source of [`DependentQueuedRequest`], return only the most
2444/// meaningful, i.e. the ones that wouldn't be overridden after applying the
2445/// others.
2446fn canonicalize_dependent_requests(
2447    dependent: &[DependentQueuedRequest],
2448) -> Vec<DependentQueuedRequest> {
2449    let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2450
2451    for d in dependent {
2452        let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2453
2454        if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2455            // The parent event has already been flagged for redaction, don't consider the
2456            // other dependent events.
2457            continue;
2458        }
2459
2460        match &d.kind {
2461            DependentQueuedRequestKind::EditEvent { .. } => {
2462                // Replace any previous edit with this one.
2463                if let Some(prev_edit) = prevs
2464                    .iter_mut()
2465                    .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2466                {
2467                    *prev_edit = d;
2468                } else {
2469                    prevs.insert(0, d);
2470                }
2471            }
2472
2473            DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2474            | DependentQueuedRequestKind::FinishUpload { .. }
2475            | DependentQueuedRequestKind::ReactEvent { .. } => {
2476                // These requests can't be canonicalized, push them as is.
2477                prevs.push(d);
2478            }
2479
2480            #[cfg(feature = "unstable-msc4274")]
2481            DependentQueuedRequestKind::FinishGallery { .. } => {
2482                // This request can't be canonicalized, push it as is.
2483                prevs.push(d);
2484            }
2485
2486            DependentQueuedRequestKind::RedactEvent => {
2487                // Remove every other dependent action.
2488                prevs.clear();
2489                prevs.push(d);
2490            }
2491        }
2492    }
2493
2494    by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2495}
2496
2497#[cfg(all(test, not(target_family = "wasm")))]
2498mod tests {
2499    use std::{sync::Arc, time::Duration};
2500
2501    use assert_matches2::{assert_let, assert_matches};
2502    use matrix_sdk_base::store::{
2503        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2504        SerializableEventContent,
2505    };
2506    use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2507    use ruma::{
2508        events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2509        room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2510    };
2511
2512    use super::canonicalize_dependent_requests;
2513    use crate::{client::WeakClient, test_utils::logged_in_client};
2514
2515    #[test]
2516    fn test_canonicalize_dependent_events_created_at() {
2517        // Test to ensure the created_at field is being serialized and retrieved
2518        // correctly.
2519        let txn = TransactionId::new();
2520        let created_at = MilliSecondsSinceUnixEpoch::now();
2521
2522        let edit = DependentQueuedRequest {
2523            own_transaction_id: ChildTransactionId::new(),
2524            parent_transaction_id: txn.clone(),
2525            kind: DependentQueuedRequestKind::EditEvent {
2526                new_content: SerializableEventContent::new(
2527                    &RoomMessageEventContent::text_plain("edit").into(),
2528                )
2529                .unwrap(),
2530            },
2531            parent_key: None,
2532            created_at,
2533        };
2534
2535        let res = canonicalize_dependent_requests(&[edit]);
2536
2537        assert_eq!(res.len(), 1);
2538        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2539        assert_let!(
2540            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2541        );
2542        assert_eq!(msg.body(), "edit");
2543        assert_eq!(res[0].parent_transaction_id, txn);
2544        assert_eq!(res[0].created_at, created_at);
2545    }
2546
2547    #[async_test]
2548    async fn test_client_no_cycle_with_send_queue() {
2549        for enabled in [true, false] {
2550            let client = logged_in_client(None).await;
2551            let weak_client = WeakClient::from_client(&client);
2552
2553            {
2554                let mut sync_response_builder = SyncResponseBuilder::new();
2555
2556                let room_id = room_id!("!a:b.c");
2557
2558                // Make sure the client knows about the room.
2559                client
2560                    .base_client()
2561                    .receive_sync_response(
2562                        sync_response_builder
2563                            .add_joined_room(JoinedRoomBuilder::new(room_id))
2564                            .build_sync_response(),
2565                    )
2566                    .await
2567                    .unwrap();
2568
2569                let room = client.get_room(room_id).unwrap();
2570                let q = room.send_queue();
2571
2572                let _watcher = q.subscribe().await;
2573
2574                client.send_queue().set_enabled(enabled).await;
2575            }
2576
2577            drop(client);
2578
2579            // Give a bit of time for background tasks to die.
2580            tokio::time::sleep(Duration::from_millis(500)).await;
2581
2582            // The weak client must be the last reference to the client now.
2583            let client = weak_client.get();
2584            assert!(
2585                client.is_none(),
2586                "too many strong references to the client: {}",
2587                Arc::strong_count(&client.unwrap().inner)
2588            );
2589        }
2590    }
2591
2592    #[test]
2593    fn test_canonicalize_dependent_events_smoke_test() {
2594        // Smoke test: canonicalizing a single dependent event returns it.
2595        let txn = TransactionId::new();
2596
2597        let edit = DependentQueuedRequest {
2598            own_transaction_id: ChildTransactionId::new(),
2599            parent_transaction_id: txn.clone(),
2600            kind: DependentQueuedRequestKind::EditEvent {
2601                new_content: SerializableEventContent::new(
2602                    &RoomMessageEventContent::text_plain("edit").into(),
2603                )
2604                .unwrap(),
2605            },
2606            parent_key: None,
2607            created_at: MilliSecondsSinceUnixEpoch::now(),
2608        };
2609        let res = canonicalize_dependent_requests(&[edit]);
2610
2611        assert_eq!(res.len(), 1);
2612        assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2613        assert_eq!(res[0].parent_transaction_id, txn);
2614        assert!(res[0].parent_key.is_none());
2615    }
2616
2617    #[test]
2618    fn test_canonicalize_dependent_events_redaction_preferred() {
2619        // A redaction is preferred over any other kind of dependent event.
2620        let txn = TransactionId::new();
2621
2622        let mut inputs = Vec::with_capacity(100);
2623        let redact = DependentQueuedRequest {
2624            own_transaction_id: ChildTransactionId::new(),
2625            parent_transaction_id: txn.clone(),
2626            kind: DependentQueuedRequestKind::RedactEvent,
2627            parent_key: None,
2628            created_at: MilliSecondsSinceUnixEpoch::now(),
2629        };
2630
2631        let edit = DependentQueuedRequest {
2632            own_transaction_id: ChildTransactionId::new(),
2633            parent_transaction_id: txn.clone(),
2634            kind: DependentQueuedRequestKind::EditEvent {
2635                new_content: SerializableEventContent::new(
2636                    &RoomMessageEventContent::text_plain("edit").into(),
2637                )
2638                .unwrap(),
2639            },
2640            parent_key: None,
2641            created_at: MilliSecondsSinceUnixEpoch::now(),
2642        };
2643
2644        inputs.push({
2645            let mut edit = edit.clone();
2646            edit.own_transaction_id = ChildTransactionId::new();
2647            edit
2648        });
2649
2650        inputs.push(redact);
2651
2652        for _ in 0..98 {
2653            let mut edit = edit.clone();
2654            edit.own_transaction_id = ChildTransactionId::new();
2655            inputs.push(edit);
2656        }
2657
2658        let res = canonicalize_dependent_requests(&inputs);
2659
2660        assert_eq!(res.len(), 1);
2661        assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2662        assert_eq!(res[0].parent_transaction_id, txn);
2663    }
2664
2665    #[test]
2666    fn test_canonicalize_dependent_events_last_edit_preferred() {
2667        let parent_txn = TransactionId::new();
2668
2669        // The latest edit of a list is always preferred.
2670        let inputs = (0..10)
2671            .map(|i| DependentQueuedRequest {
2672                own_transaction_id: ChildTransactionId::new(),
2673                parent_transaction_id: parent_txn.clone(),
2674                kind: DependentQueuedRequestKind::EditEvent {
2675                    new_content: SerializableEventContent::new(
2676                        &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2677                    )
2678                    .unwrap(),
2679                },
2680                parent_key: None,
2681                created_at: MilliSecondsSinceUnixEpoch::now(),
2682            })
2683            .collect::<Vec<_>>();
2684
2685        let txn = inputs[9].parent_transaction_id.clone();
2686
2687        let res = canonicalize_dependent_requests(&inputs);
2688
2689        assert_eq!(res.len(), 1);
2690        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2691        assert_let!(
2692            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2693        );
2694        assert_eq!(msg.body(), "edit9");
2695        assert_eq!(res[0].parent_transaction_id, txn);
2696    }
2697
2698    #[test]
2699    fn test_canonicalize_multiple_local_echoes() {
2700        let txn1 = TransactionId::new();
2701        let txn2 = TransactionId::new();
2702
2703        let child1 = ChildTransactionId::new();
2704        let child2 = ChildTransactionId::new();
2705
2706        let inputs = vec![
2707            // This one pertains to txn1.
2708            DependentQueuedRequest {
2709                own_transaction_id: child1.clone(),
2710                kind: DependentQueuedRequestKind::RedactEvent,
2711                parent_transaction_id: txn1.clone(),
2712                parent_key: None,
2713                created_at: MilliSecondsSinceUnixEpoch::now(),
2714            },
2715            // This one pertains to txn2.
2716            DependentQueuedRequest {
2717                own_transaction_id: child2,
2718                kind: DependentQueuedRequestKind::EditEvent {
2719                    new_content: SerializableEventContent::new(
2720                        &RoomMessageEventContent::text_plain("edit").into(),
2721                    )
2722                    .unwrap(),
2723                },
2724                parent_transaction_id: txn2.clone(),
2725                parent_key: None,
2726                created_at: MilliSecondsSinceUnixEpoch::now(),
2727            },
2728        ];
2729
2730        let res = canonicalize_dependent_requests(&inputs);
2731
2732        // The canonicalization shouldn't depend per event id.
2733        assert_eq!(res.len(), 2);
2734
2735        for dependent in res {
2736            if dependent.own_transaction_id == child1 {
2737                assert_eq!(dependent.parent_transaction_id, txn1);
2738                assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2739            } else {
2740                assert_eq!(dependent.parent_transaction_id, txn2);
2741                assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2742            }
2743        }
2744    }
2745
2746    #[test]
2747    fn test_canonicalize_reactions_after_edits() {
2748        // Sending reactions should happen after edits to a given event.
2749        let txn = TransactionId::new();
2750
2751        let react_id = ChildTransactionId::new();
2752        let react = DependentQueuedRequest {
2753            own_transaction_id: react_id.clone(),
2754            kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2755            parent_transaction_id: txn.clone(),
2756            parent_key: None,
2757            created_at: MilliSecondsSinceUnixEpoch::now(),
2758        };
2759
2760        let edit_id = ChildTransactionId::new();
2761        let edit = DependentQueuedRequest {
2762            own_transaction_id: edit_id.clone(),
2763            kind: DependentQueuedRequestKind::EditEvent {
2764                new_content: SerializableEventContent::new(
2765                    &RoomMessageEventContent::text_plain("edit").into(),
2766                )
2767                .unwrap(),
2768            },
2769            parent_transaction_id: txn,
2770            parent_key: None,
2771            created_at: MilliSecondsSinceUnixEpoch::now(),
2772        };
2773
2774        let res = canonicalize_dependent_requests(&[react, edit]);
2775
2776        assert_eq!(res.len(), 2);
2777        assert_eq!(res[0].own_transaction_id, edit_id);
2778        assert_eq!(res[1].own_transaction_id, react_id);
2779    }
2780}