matrix_sdk/send_queue/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A send queue facility to serializing queuing and sending of messages.
16//!
17//! # [`Room`] send queue
18//!
19//! Each room gets its own [`RoomSendQueue`], that's available by calling
20//! [`Room::send_queue()`]. The first time this method is called, it will spawn
21//! a background task that's used to actually send events, in the order they
22//! were passed from calls to [`RoomSendQueue::send()`].
23//!
24//! This queue tries to simplify error management around sending events, using
25//! [`RoomSendQueue::send`] or [`RoomSendQueue::send_raw`]: by default, it will retry to send the
26//! same event a few times, before automatically disabling itself, and emitting
27//! a notification that can be listened to with the global send queue (see
28//! paragraph below) or using [`RoomSendQueue::subscribe()`].
29//!
30//! It is possible to control whether a single room is enabled using
31//! [`RoomSendQueue::set_enabled()`].
32//!
33//! # Global [`SendQueue`] object
34//!
35//! The [`Client::send_queue()`] method returns an API object allowing to
36//! control all the room send queues:
37//!
38//! - enable/disable them all at once with [`SendQueue::set_enabled()`].
39//! - get notifications about send errors with [`SendQueue::subscribe_errors`].
40//! - reload all unsent events that had been persisted in storage using
41//!   [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is
42//!   recommended to call this method during initialization of a client,
43//!   otherwise persisted unsent events will only be re-sent after the send
44//!   queue for the given room has been reopened for the first time.
45//!
46//! # Send handle
47//!
48//! Just after queuing a request to send something, a [`SendHandle`] is
49//! returned, allowing manipulating the inflight request.
50//!
51//! For a send handle for an event, it's possible to edit the event / abort
52//! sending it. If it was still in the queue (i.e. not sent yet, or not being
53//! sent), then such an action would happen locally (i.e. in the database).
54//! Otherwise, it is "too late": the background task may be sending
55//! the event already, or has sent it; in that case, the edit/aborting must
56//! happen as an actual event materializing this, on the server. To accomplish
57//! this, the send queue may send such an event, using the dependency system
58//! described below.
59//!
60//! # Dependency system
61//!
62//! The send queue includes a simple dependency system, where a
63//! [`QueuedRequest`] can have zero or more dependents in the form of
64//! [`DependentQueuedRequest`]. A dependent queued request can have at most one
65//! depended-upon (parent) queued request.
66//!
67//! This allows implementing deferred edits/redacts, as hinted to in the
68//! previous section.
69//!
70//! ## Media upload
71//!
72//! This dependency system also allows uploading medias, since the media's
73//! *content* must be uploaded before we send the media *event* that describes
74//! it.
75//!
76//! In the simplest case, that is, a media file and its event must be sent (i.e.
77//! no thumbnails):
78//!
79//! - The file's content is immediately cached in the
80//!   [`matrix_sdk_base::event_cache::store::EventCacheStore`], using an MXC ID
81//!   that is temporary and designates a local URI without any possible doubt.
82//! - An initial media event is created and uses this temporary MXC ID, and
83//!   propagated as a local echo for an event.
84//! - A [`QueuedRequest`] is pushed to upload the file's media
85//!   ([`QueuedRequestKind::MediaUpload`]).
86//! - A [`DependentQueuedRequest`] is pushed to finish the upload
87//!   ([`DependentQueuedRequestKind::FinishUpload`]).
88//!
89//! What is expected to happen, if all goes well, is the following:
90//!
91//! - the media is uploaded to the media homeserver, which returns the final MXC
92//!   ID.
93//! - when marking the upload request as sent, the MXC ID is injected (as a
94//!   [`matrix_sdk_base::store::SentRequestKey`]) into the dependent request
95//!   [`DependentQueuedRequestKind::FinishUpload`] created in the last step
96//!   above.
97//! - next time the send queue handles dependent queries, it'll see this one is
98//!   ready to be sent, and it will transform it into an event queued request
99//!   ([`QueuedRequestKind::Event`]), with the event created in the local echo
100//!   before, updated with the MXC ID returned from the server.
101//! - this updated local echo is also propagated as an edit of the local echo to
102//!   observers, who get the final version with the final MXC IDs at this point
103//!   too.
104//! - then the event is sent normally, as any event sent with the send queue.
105//!
106//! When there is a thumbnail, things behave similarly, with some tweaks:
107//!
108//! - the thumbnail's content is also stored into the cache store immediately,
109//! - the thumbnail is sent first as an [`QueuedRequestKind::MediaUpload`]
110//!   request,
111//! - the file upload is pushed as a dependent request of kind
112//!   [`DependentQueuedRequestKind::UploadFileOrThumbnail`] (this variant keeps
113//!   the file's key used to look it up in the cache store).
114//! - the media event is then sent as a dependent request as described in the
115//!   previous section.
116//!
117//! What's expected to happen is thus the following:
118//!
119//! - After the thumbnail has been uploaded, the dependent query will retrieve
120//!   the final MXC ID returned by the homeserver for the thumbnail, and store
121//!   it into the [`QueuedRequestKind::MediaUpload`]'s `thumbnail_source` field,
122//!   allowing to remember the thumbnail MXC ID when it's time to finish the
123//!   upload later.
124//! - The dependent request is morphed into another
125//!   [`QueuedRequestKind::MediaUpload`], for the file itself.
126//!
127//! The rest of the process is then similar to that of uploading a file without
128//! a thumbnail. The only difference is that there's a thumbnail source (MXC ID)
129//! remembered and fixed up into the media event, just before sending it.
130
131use std::{
132    collections::{BTreeMap, HashMap},
133    str::FromStr as _,
134    sync::{
135        atomic::{AtomicBool, Ordering},
136        Arc, RwLock,
137    },
138};
139
140use eyeball::SharedObservable;
141#[cfg(feature = "unstable-msc4274")]
142use matrix_sdk_base::store::FinishGalleryItemInfo;
143use matrix_sdk_base::{
144    event_cache::store::EventCacheStoreError,
145    media::MediaRequestParameters,
146    store::{
147        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
148        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
149        SentMediaInfo, SentRequestKey, SerializableEventContent,
150    },
151    store_locks::LockStoreError,
152    RoomState, StoreError,
153};
154use matrix_sdk_common::{
155    executor::{spawn, JoinHandle},
156    locks::Mutex as SyncMutex,
157};
158use mime::Mime;
159use ruma::{
160    events::{
161        reaction::ReactionEventContent,
162        relation::Annotation,
163        room::{
164            message::{FormattedBody, RoomMessageEventContent},
165            MediaSource,
166        },
167        AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
168    },
169    serde::Raw,
170    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
171    TransactionId,
172};
173use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
174use tracing::{debug, error, info, instrument, trace, warn};
175
176#[cfg(feature = "e2e-encryption")]
177use crate::crypto::{OlmError, SessionRecipientCollectionError};
178use crate::{
179    client::WeakClient,
180    config::RequestConfig,
181    error::RetryKind,
182    room::{edit::EditedContent, WeakRoom},
183    Client, Media, Room, TransmissionProgress,
184};
185
186mod progress;
187mod upload;
188
189pub use progress::AbstractProgress;
190
191/// A client-wide send queue, for all the rooms known by a client.
192pub struct SendQueue {
193    client: Client,
194}
195
196#[cfg(not(tarpaulin_include))]
197impl std::fmt::Debug for SendQueue {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        f.debug_struct("SendQueue").finish_non_exhaustive()
200    }
201}
202
203impl SendQueue {
204    pub(super) fn new(client: Client) -> Self {
205        Self { client }
206    }
207
208    /// Reload all the rooms which had unsent requests, and respawn tasks for
209    /// those rooms.
210    pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
211        if !self.is_enabled() {
212            return;
213        }
214
215        let room_ids =
216            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
217                |err| {
218                    warn!("error when loading rooms with unsent requests: {err}");
219                    Vec::new()
220                },
221            );
222
223        // Getting the [`RoomSendQueue`] is sufficient to spawn the task if needs be.
224        for room_id in room_ids {
225            if let Some(room) = self.client.get_room(&room_id) {
226                let _ = self.for_room(room);
227            }
228        }
229    }
230
231    /// Tiny helper to get the send queue's global context from the [`Client`].
232    #[inline(always)]
233    fn data(&self) -> &SendQueueData {
234        &self.client.inner.send_queue_data
235    }
236
237    /// Get or create a new send queue for a given room, and insert it into our
238    /// memoized rooms mapping.
239    pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
240        let data = self.data();
241
242        let mut map = data.rooms.write().unwrap();
243
244        let room_id = room.room_id();
245        if let Some(room_q) = map.get(room_id).cloned() {
246            return room_q;
247        }
248
249        let owned_room_id = room_id.to_owned();
250        let room_q = RoomSendQueue::new(
251            self.is_enabled(),
252            data.global_update_sender.clone(),
253            data.error_sender.clone(),
254            data.is_dropping.clone(),
255            &self.client,
256            owned_room_id.clone(),
257            data.report_media_upload_progress.clone(),
258        );
259
260        map.insert(owned_room_id, room_q.clone());
261
262        room_q
263    }
264
265    /// Enable or disable the send queue for the entire client, i.e. all rooms.
266    ///
267    /// If we're disabling the queue, and requests were being sent, they're not
268    /// aborted, and will continue until a status resolves (error responses
269    /// will keep the events in the buffer of events to send later). The
270    /// disablement will happen before the next request is sent.
271    ///
272    /// This may wake up background tasks and resume sending of requests in the
273    /// background.
274    pub async fn set_enabled(&self, enabled: bool) {
275        debug!(?enabled, "setting global send queue enablement");
276
277        self.data().globally_enabled.store(enabled, Ordering::SeqCst);
278
279        // Wake up individual rooms we already know about.
280        for room in self.data().rooms.read().unwrap().values() {
281            room.set_enabled(enabled);
282        }
283
284        // Reload some extra rooms that might not have been awaken yet, but could have
285        // requests from previous sessions.
286        self.respawn_tasks_for_rooms_with_unsent_requests().await;
287    }
288
289    /// Returns whether the send queue is enabled, at a client-wide
290    /// granularity.
291    pub fn is_enabled(&self) -> bool {
292        self.data().globally_enabled.load(Ordering::SeqCst)
293    }
294
295    /// Enable or disable progress reporting for media uploads.
296    pub fn enable_upload_progress(&self, enabled: bool) {
297        self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
298    }
299
300    /// Subscribe to all updates for all rooms.
301    ///
302    /// Use [`RoomSendQueue::subscribe`] to subscribe to update for a _specific
303    /// room_.
304    pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
305        self.data().global_update_sender.subscribe()
306    }
307
308    /// A subscriber to the enablement status (enabled or disabled) of the
309    /// send queue, along with useful errors.
310    pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
311        self.data().error_sender.subscribe()
312    }
313}
314
315/// Metadata about a thumbnail needed when pushing media uploads to the send
316/// queue.
317#[derive(Clone, Debug)]
318struct QueueThumbnailInfo {
319    /// Metadata about the thumbnail needed when finishing a media upload.
320    finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
321
322    /// The parameters for the request to retrieve the thumbnail data.
323    media_request_parameters: MediaRequestParameters,
324
325    /// The thumbnail's mime type.
326    content_type: Mime,
327
328    /// The thumbnail's file size in bytes.
329    file_size: usize,
330}
331
332/// A specific room's send queue ran into an error, and it has disabled itself.
333#[derive(Clone, Debug)]
334pub struct SendQueueRoomError {
335    /// For which room is the send queue failing?
336    pub room_id: OwnedRoomId,
337
338    /// The error the room has ran into, when trying to send a request.
339    pub error: Arc<crate::Error>,
340
341    /// Whether the error is considered recoverable or not.
342    ///
343    /// An error that's recoverable will disable the room's send queue, while an
344    /// unrecoverable error will be parked, until the user decides to do
345    /// something about it.
346    pub is_recoverable: bool,
347}
348
349impl Client {
350    /// Returns a [`SendQueue`] that handles sending, retrying and not
351    /// forgetting about requests that are to be sent.
352    pub fn send_queue(&self) -> SendQueue {
353        SendQueue::new(self.clone())
354    }
355}
356
357pub(super) struct SendQueueData {
358    /// Mapping of room to their unique send queue.
359    rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
360
361    /// Is the whole mechanism enabled or disabled?
362    ///
363    /// This is only kept in memory to initialize new room queues with an
364    /// initial enablement state.
365    globally_enabled: AtomicBool,
366
367    /// Global sender to send [`SendQueueUpdate`].
368    ///
369    /// See [`SendQueue::subscribe`].
370    global_update_sender: broadcast::Sender<SendQueueUpdate>,
371
372    /// Global error updates for the send queue.
373    error_sender: broadcast::Sender<SendQueueRoomError>,
374
375    /// Are we currently dropping the Client?
376    is_dropping: Arc<AtomicBool>,
377
378    /// Will media upload progress be reported via send queue updates?
379    report_media_upload_progress: Arc<AtomicBool>,
380}
381
382impl SendQueueData {
383    /// Create the data for a send queue, in the given enabled state.
384    pub fn new(globally_enabled: bool) -> Self {
385        let (global_update_sender, _) = broadcast::channel(32);
386        let (error_sender, _) = broadcast::channel(32);
387
388        Self {
389            rooms: Default::default(),
390            globally_enabled: AtomicBool::new(globally_enabled),
391            global_update_sender,
392            error_sender,
393            is_dropping: Arc::new(false.into()),
394            report_media_upload_progress: Arc::new(false.into()),
395        }
396    }
397}
398
399impl Drop for SendQueueData {
400    fn drop(&mut self) {
401        // Mark the whole send queue as shutting down, then wake up all the room
402        // queues so they're stopped too.
403        debug!("globally dropping the send queue");
404        self.is_dropping.store(true, Ordering::SeqCst);
405
406        let rooms = self.rooms.read().unwrap();
407        for room in rooms.values() {
408            room.inner.notifier.notify_one();
409        }
410    }
411}
412
413impl Room {
414    /// Returns the [`RoomSendQueue`] for this specific room.
415    pub fn send_queue(&self) -> RoomSendQueue {
416        self.client.send_queue().for_room(self.clone())
417    }
418}
419
420/// A per-room send queue.
421///
422/// This is cheap to clone.
423#[derive(Clone)]
424pub struct RoomSendQueue {
425    inner: Arc<RoomSendQueueInner>,
426}
427
428#[cfg(not(tarpaulin_include))]
429impl std::fmt::Debug for RoomSendQueue {
430    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
431        f.debug_struct("RoomSendQueue").finish_non_exhaustive()
432    }
433}
434
435impl RoomSendQueue {
436    fn new(
437        globally_enabled: bool,
438        global_update_sender: broadcast::Sender<SendQueueUpdate>,
439        global_error_sender: broadcast::Sender<SendQueueRoomError>,
440        is_dropping: Arc<AtomicBool>,
441        client: &Client,
442        room_id: OwnedRoomId,
443        report_media_upload_progress: Arc<AtomicBool>,
444    ) -> Self {
445        let (update_sender, _) = broadcast::channel(32);
446
447        let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
448        let notifier = Arc::new(Notify::new());
449
450        let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
451        let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
452
453        let task = spawn(Self::sending_task(
454            weak_room.clone(),
455            queue.clone(),
456            notifier.clone(),
457            global_update_sender.clone(),
458            update_sender.clone(),
459            locally_enabled.clone(),
460            global_error_sender,
461            is_dropping,
462            report_media_upload_progress,
463        ));
464
465        Self {
466            inner: Arc::new(RoomSendQueueInner {
467                room: weak_room,
468                global_update_sender,
469                update_sender,
470                _task: task,
471                queue,
472                notifier,
473                locally_enabled,
474            }),
475        }
476    }
477
478    /// Queues a raw event for sending it to this room.
479    ///
480    /// This immediately returns, and will push the event to be sent into a
481    /// queue, handled in the background.
482    ///
483    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
484    /// the [`Self::subscribe()`] method to get updates about the sending of
485    /// that event.
486    ///
487    /// By default, if sending failed on the first attempt, it will be retried a
488    /// few times. If sending failed after those retries, the entire
489    /// client's sending queue will be disabled, and it will need to be
490    /// manually re-enabled by the caller (e.g. after network is back, or when
491    /// something has been done about the faulty requests).
492    pub async fn send_raw(
493        &self,
494        content: Raw<AnyMessageLikeEventContent>,
495        event_type: String,
496    ) -> Result<SendHandle, RoomSendQueueError> {
497        let Some(room) = self.inner.room.get() else {
498            return Err(RoomSendQueueError::RoomDisappeared);
499        };
500        if room.state() != RoomState::Joined {
501            return Err(RoomSendQueueError::RoomNotJoined);
502        }
503
504        let content = SerializableEventContent::from_raw(content, event_type);
505
506        let created_at = MilliSecondsSinceUnixEpoch::now();
507        let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
508        trace!(%transaction_id, "manager sends a raw event to the background task");
509
510        self.inner.notifier.notify_one();
511
512        let send_handle = SendHandle {
513            room: self.clone(),
514            transaction_id: transaction_id.clone(),
515            media_handles: vec![],
516            created_at,
517        };
518
519        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
520            transaction_id,
521            content: LocalEchoContent::Event {
522                serialized_event: content,
523                send_handle: send_handle.clone(),
524                send_error: None,
525            },
526        }));
527
528        Ok(send_handle)
529    }
530
531    /// Queues an event for sending it to this room.
532    ///
533    /// This immediately returns, and will push the event to be sent into a
534    /// queue, handled in the background.
535    ///
536    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
537    /// the [`Self::subscribe()`] method to get updates about the sending of
538    /// that event.
539    ///
540    /// By default, if sending failed on the first attempt, it will be retried a
541    /// few times. If sending failed after those retries, the entire
542    /// client's sending queue will be disabled, and it will need to be
543    /// manually re-enabled by the caller (e.g. after network is back, or when
544    /// something has been done about the faulty requests).
545    pub async fn send(
546        &self,
547        content: AnyMessageLikeEventContent,
548    ) -> Result<SendHandle, RoomSendQueueError> {
549        self.send_raw(
550            Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
551            content.event_type().to_string(),
552        )
553        .await
554    }
555
556    /// Returns the current local requests as well as a receiver to listen to
557    /// the send queue updates, as defined in [`RoomSendQueueUpdate`].
558    ///
559    /// Use [`SendQueue::subscribe`] to subscribe to update for _all rooms_ with
560    /// a single receiver.
561    pub async fn subscribe(
562        &self,
563    ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
564    {
565        let local_echoes = self.inner.queue.local_echoes(self).await?;
566
567        Ok((local_echoes, self.inner.update_sender.subscribe()))
568    }
569
570    /// A task that must be spawned in the async runtime, running in the
571    /// background for each room that has a send queue.
572    ///
573    /// It only progresses forward: nothing can be cancelled at any point, which
574    /// makes the implementation not overly complicated to follow.
575    #[allow(clippy::too_many_arguments)]
576    #[instrument(skip_all, fields(room_id = %room.room_id()))]
577    async fn sending_task(
578        room: WeakRoom,
579        queue: QueueStorage,
580        notifier: Arc<Notify>,
581        global_update_sender: broadcast::Sender<SendQueueUpdate>,
582        update_sender: broadcast::Sender<RoomSendQueueUpdate>,
583        locally_enabled: Arc<AtomicBool>,
584        global_error_sender: broadcast::Sender<SendQueueRoomError>,
585        is_dropping: Arc<AtomicBool>,
586        report_media_upload_progress: Arc<AtomicBool>,
587    ) {
588        trace!("spawned the sending task");
589
590        let room_id = room.room_id();
591
592        loop {
593            // A request to shut down should be preferred above everything else.
594            if is_dropping.load(Ordering::SeqCst) {
595                trace!("shutting down!");
596                break;
597            }
598
599            // Try to apply dependent requests now; those applying to previously failed
600            // attempts (local echoes) would succeed now.
601            let mut new_updates = Vec::new();
602            if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
603                warn!("errors when applying dependent requests: {err}");
604            }
605
606            for up in new_updates {
607                send_update(&global_update_sender, &update_sender, room_id, up);
608            }
609
610            if !locally_enabled.load(Ordering::SeqCst) {
611                trace!("not enabled, sleeping");
612                // Wait for an explicit wakeup.
613                notifier.notified().await;
614                continue;
615            }
616
617            let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
618                Ok(Some(request)) => request,
619
620                Ok(None) => {
621                    trace!("queue is empty, sleeping");
622                    // Wait for an explicit wakeup.
623                    notifier.notified().await;
624                    continue;
625                }
626
627                Err(err) => {
628                    warn!("error when loading next request to send: {err}");
629                    continue;
630                }
631            };
632
633            let txn_id = queued_request.transaction_id.clone();
634            trace!(txn_id = %txn_id, "received a request to send!");
635
636            let Some(room) = room.get() else {
637                if is_dropping.load(Ordering::SeqCst) {
638                    break;
639                }
640                error!("the weak room couldn't be upgraded but we're not shutting down?");
641                continue;
642            };
643
644            // If this is a media/gallery upload, prepare the following:
645            // - transaction id for the related media event request,
646            // - progress metadata to feed the final media upload progress
647            // - an observable to watch the media upload progress.
648            let (related_txn_id, media_upload_progress_info, http_progress) =
649                if let QueuedRequestKind::MediaUpload {
650                    cache_key,
651                    thumbnail_source,
652                    #[cfg(feature = "unstable-msc4274")]
653                    accumulated,
654                    related_to,
655                    ..
656                } = &queued_request.kind
657                {
658                    // Prepare to watch and communicate the request's progress for media uploads, if
659                    // it has been requested.
660                    let (media_upload_progress_info, http_progress) =
661                        if report_media_upload_progress.load(Ordering::SeqCst) {
662                            let media_upload_progress_info =
663                                RoomSendQueue::create_media_upload_progress_info(
664                                    &queued_request.transaction_id,
665                                    related_to,
666                                    cache_key,
667                                    thumbnail_source.as_ref(),
668                                    #[cfg(feature = "unstable-msc4274")]
669                                    accumulated,
670                                    &room,
671                                    &queue,
672                                )
673                                .await;
674
675                            let progress = RoomSendQueue::create_media_upload_progress_observable(
676                                &media_upload_progress_info,
677                                related_to,
678                                &update_sender,
679                            );
680
681                            (Some(media_upload_progress_info), Some(progress))
682                        } else {
683                            Default::default()
684                        };
685
686                    (Some(related_to.clone()), media_upload_progress_info, http_progress)
687                } else {
688                    Default::default()
689                };
690
691            match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
692            {
693                Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
694                {
695                    Ok(()) => match parent_key {
696                        SentRequestKey::Event(event_id) => {
697                            send_update(
698                                &global_update_sender,
699                                &update_sender,
700                                room_id,
701                                RoomSendQueueUpdate::SentEvent { transaction_id: txn_id, event_id },
702                            );
703                        }
704
705                        SentRequestKey::Media(sent_media_info) => {
706                            // Generate some final progress information, even if incremental
707                            // progress wasn't requested.
708                            let index =
709                                media_upload_progress_info.as_ref().map_or(0, |info| info.index);
710                            let progress = media_upload_progress_info
711                                .as_ref()
712                                .map(|info| {
713                                    AbstractProgress { current: info.bytes, total: info.bytes }
714                                        + info.offsets
715                                })
716                                .unwrap_or(AbstractProgress { current: 1, total: 1 });
717
718                            // Purposefully don't use `send_update` here, because we don't want to
719                            // notify the global listeners about an upload progress update.
720                            let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
721                                related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
722                                file: Some(sent_media_info.file),
723                                index,
724                                progress,
725                            });
726                        }
727                    },
728
729                    Err(err) => {
730                        warn!("unable to mark queued request as sent: {err}");
731                    }
732                },
733
734                Ok(None) => {
735                    debug!("Request has been aborted while running, continuing.");
736                }
737
738                Err(err) => {
739                    let is_recoverable = match err {
740                        crate::Error::Http(ref http_err) => {
741                            // All transient errors are recoverable.
742                            matches!(
743                                http_err.retry_kind(),
744                                RetryKind::Transient { .. } | RetryKind::NetworkFailure
745                            )
746                        }
747
748                        // `ConcurrentRequestFailed` typically happens because of an HTTP failure;
749                        // since we don't get the underlying error, be lax and consider it
750                        // recoverable, and let observers decide to retry it or not. At some point
751                        // we'll get the actual underlying error.
752                        crate::Error::ConcurrentRequestFailed => true,
753
754                        // As of 2024-06-27, all other error types are considered unrecoverable.
755                        _ => false,
756                    };
757
758                    // Disable the queue for this room after any kind of error happened.
759                    locally_enabled.store(false, Ordering::SeqCst);
760
761                    if is_recoverable {
762                        warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
763
764                        // In this case, we intentionally keep the request in the queue, but mark it
765                        // as not being sent anymore.
766                        queue.mark_as_not_being_sent(&txn_id).await;
767
768                        // Let observers know about a failure *after* we've
769                        // marked the item as not being sent anymore. Otherwise,
770                        // there's a possible race where a caller might try to
771                        // remove an item, while it's still marked as being
772                        // sent, resulting in a cancellation failure.
773                    } else {
774                        warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
775
776                        // Mark the request as wedged, so it's not picked at any future point.
777                        if let Err(storage_error) =
778                            queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
779                        {
780                            warn!("unable to mark request as wedged: {storage_error}");
781                        }
782                    }
783
784                    let error = Arc::new(err);
785
786                    let _ = global_error_sender.send(SendQueueRoomError {
787                        room_id: room_id.to_owned(),
788                        error: error.clone(),
789                        is_recoverable,
790                    });
791
792                    send_update(
793                        &global_update_sender,
794                        &update_sender,
795                        room_id,
796                        RoomSendQueueUpdate::SendError {
797                            transaction_id: related_txn_id.unwrap_or(txn_id),
798                            error,
799                            is_recoverable,
800                        },
801                    );
802                }
803            }
804        }
805
806        info!("exited sending task");
807    }
808
809    /// Handles a single request and returns the [`SentRequestKey`] on success
810    /// (unless the request was cancelled, in which case it'll return
811    /// `None`).
812    async fn handle_request(
813        room: &Room,
814        request: QueuedRequest,
815        cancel_upload_rx: Option<oneshot::Receiver<()>>,
816        progress: Option<SharedObservable<TransmissionProgress>>,
817    ) -> Result<Option<SentRequestKey>, crate::Error> {
818        match request.kind {
819            QueuedRequestKind::Event { content } => {
820                let (event, event_type) = content.raw();
821
822                let res = room
823                    .send_raw(event_type, event)
824                    .with_transaction_id(&request.transaction_id)
825                    .with_request_config(RequestConfig::short_retry())
826                    .await?;
827
828                trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
829                Ok(Some(SentRequestKey::Event(res.event_id)))
830            }
831
832            QueuedRequestKind::MediaUpload {
833                content_type,
834                cache_key,
835                thumbnail_source,
836                related_to: relates_to,
837                #[cfg(feature = "unstable-msc4274")]
838                accumulated,
839            } => {
840                trace!(%relates_to, "uploading media related to event");
841
842                let fut = async move {
843                    let data = room
844                        .client()
845                        .event_cache_store()
846                        .lock()
847                        .await?
848                        .get_media_content(&cache_key)
849                        .await?
850                        .ok_or(crate::Error::SendQueueWedgeError(Box::new(
851                            QueueWedgeError::MissingMediaContent,
852                        )))?;
853
854                    let mime = Mime::from_str(&content_type).map_err(|_| {
855                        crate::Error::SendQueueWedgeError(Box::new(
856                            QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
857                        ))
858                    })?;
859
860                    #[cfg(feature = "e2e-encryption")]
861                    let media_source = if room.latest_encryption_state().await?.is_encrypted() {
862                        trace!("upload will be encrypted (encrypted room)");
863
864                        let mut cursor = std::io::Cursor::new(data);
865                        let mut req = room
866                            .client
867                            .upload_encrypted_file(&mut cursor)
868                            .with_request_config(RequestConfig::short_retry());
869                        if let Some(progress) = progress {
870                            req = req.with_send_progress_observable(progress);
871                        }
872                        let encrypted_file = req.await?;
873
874                        MediaSource::Encrypted(Box::new(encrypted_file))
875                    } else {
876                        trace!("upload will be in clear text (room without encryption)");
877
878                        let request_config = RequestConfig::short_retry()
879                            .timeout(Media::reasonable_upload_timeout(&data));
880                        let mut req =
881                            room.client().media().upload(&mime, data, Some(request_config));
882                        if let Some(progress) = progress {
883                            req = req.with_send_progress_observable(progress);
884                        }
885                        let res = req.await?;
886
887                        MediaSource::Plain(res.content_uri)
888                    };
889
890                    #[cfg(not(feature = "e2e-encryption"))]
891                    let media_source = {
892                        let request_config = RequestConfig::short_retry()
893                            .timeout(Media::reasonable_upload_timeout(&data));
894                        let mut req =
895                            room.client().media().upload(&mime, data, Some(request_config));
896                        if let Some(progress) = progress {
897                            req = req.with_send_progress_observable(progress);
898                        }
899                        let res = req.await?;
900                        MediaSource::Plain(res.content_uri)
901                    };
902
903                    let uri = match &media_source {
904                        MediaSource::Plain(uri) => uri,
905                        MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
906                    };
907                    trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
908
909                    Ok(SentRequestKey::Media(SentMediaInfo {
910                        file: media_source,
911                        thumbnail: thumbnail_source,
912                        #[cfg(feature = "unstable-msc4274")]
913                        accumulated,
914                    }))
915                };
916
917                let wait_for_cancel = async move {
918                    if let Some(rx) = cancel_upload_rx {
919                        rx.await
920                    } else {
921                        std::future::pending().await
922                    }
923                };
924
925                tokio::select! {
926                    biased;
927
928                    _ = wait_for_cancel => {
929                        Ok(None)
930                    }
931
932                    res = fut => {
933                        res.map(Some)
934                    }
935                }
936            }
937        }
938    }
939
940    /// Returns whether the room is enabled, at the room level.
941    pub fn is_enabled(&self) -> bool {
942        self.inner.locally_enabled.load(Ordering::SeqCst)
943    }
944
945    /// Set the locally enabled flag for this room queue.
946    pub fn set_enabled(&self, enabled: bool) {
947        self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
948
949        // No need to wake a task to tell it it's been disabled, so only notify if we're
950        // re-enabling the queue.
951        if enabled {
952            self.inner.notifier.notify_one();
953        }
954    }
955
956    /// Send an update on the room send queue channel, and on the global send
957    /// queue channel, i.e. it sends a [`RoomSendQueueUpdate`] and a
958    /// [`SendQueueUpdate`].
959    fn send_update(&self, update: RoomSendQueueUpdate) {
960        let _ = self.inner.update_sender.send(update.clone());
961        let _ = self
962            .inner
963            .global_update_sender
964            .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
965    }
966}
967
968fn send_update(
969    global_update_sender: &broadcast::Sender<SendQueueUpdate>,
970    update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
971    room_id: &RoomId,
972    update: RoomSendQueueUpdate,
973) {
974    let _ = update_sender.send(update.clone());
975    let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
976}
977
978impl From<&crate::Error> for QueueWedgeError {
979    fn from(value: &crate::Error) -> Self {
980        match value {
981            #[cfg(feature = "e2e-encryption")]
982            crate::Error::OlmError(error) => match &**error {
983                OlmError::SessionRecipientCollectionError(error) => match error {
984                    SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
985                        QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
986                    }
987
988                    SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
989                        QueueWedgeError::IdentityViolations { users: users.clone() }
990                    }
991
992                    SessionRecipientCollectionError::CrossSigningNotSetup
993                    | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
994                        QueueWedgeError::CrossVerificationRequired
995                    }
996                },
997                _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
998            },
999
1000            // Flatten errors of `Self` type.
1001            crate::Error::SendQueueWedgeError(error) => *error.clone(),
1002
1003            _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1004        }
1005    }
1006}
1007
1008struct RoomSendQueueInner {
1009    /// The room which this send queue relates to.
1010    room: WeakRoom,
1011
1012    /// Global sender to send [`SendQueueUpdate`].
1013    ///
1014    /// See [`SendQueue::subscribe`].
1015    global_update_sender: broadcast::Sender<SendQueueUpdate>,
1016
1017    /// Broadcaster for notifications about the statuses of requests to be sent.
1018    ///
1019    /// Can be subscribed to from the outside.
1020    ///
1021    /// See [`RoomSendQueue::subscribe`].
1022    update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1023
1024    /// Queue of requests that are either to be sent, or being sent.
1025    ///
1026    /// When a request has been sent to the server, it is removed from that
1027    /// queue *after* being sent. That way, we will retry sending upon
1028    /// failure, in the same order requests have been inserted in the first
1029    /// place.
1030    queue: QueueStorage,
1031
1032    /// A notifier that's updated any time common data is touched (stopped or
1033    /// enabled statuses), or the associated room [`QueueStorage`].
1034    notifier: Arc<Notify>,
1035
1036    /// Should the room process new requests or not (because e.g. it might be
1037    /// running off the network)?
1038    locally_enabled: Arc<AtomicBool>,
1039
1040    /// Handle to the actual sending task. Unused, but kept alive along this
1041    /// data structure.
1042    _task: JoinHandle<()>,
1043}
1044
1045/// Information about a request being sent right this moment.
1046struct BeingSentInfo {
1047    /// Transaction id of the thing being sent.
1048    transaction_id: OwnedTransactionId,
1049
1050    /// For an upload request, a trigger to cancel the upload before it
1051    /// completes.
1052    cancel_upload: Option<oneshot::Sender<()>>,
1053}
1054
1055impl BeingSentInfo {
1056    /// Aborts the upload, if a trigger is available.
1057    ///
1058    /// Consumes the object because the sender is a oneshot and will be consumed
1059    /// upon sending.
1060    fn cancel_upload(self) -> bool {
1061        if let Some(cancel_upload) = self.cancel_upload {
1062            let _ = cancel_upload.send(());
1063            true
1064        } else {
1065            false
1066        }
1067    }
1068}
1069
1070/// A specialized lock that guards both against the state store and the
1071/// [`Self::being_sent`] data.
1072#[derive(Clone)]
1073struct StoreLock {
1074    /// Reference to the client, to get access to the underlying store.
1075    client: WeakClient,
1076
1077    /// The one queued request that is being sent at the moment, along with
1078    /// associated data that can be useful to act upon it.
1079    ///
1080    /// Also used as the lock to access the state store.
1081    being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1082}
1083
1084impl StoreLock {
1085    /// Gets a hold of the locked store and [`Self::being_sent`] pair.
1086    async fn lock(&self) -> StoreLockGuard {
1087        StoreLockGuard {
1088            client: self.client.clone(),
1089            being_sent: self.being_sent.clone().lock_owned().await,
1090        }
1091    }
1092}
1093
1094/// A lock guard obtained through locking with [`StoreLock`].
1095/// `being_sent` data.
1096struct StoreLockGuard {
1097    /// Reference to the client, to get access to the underlying store.
1098    client: WeakClient,
1099
1100    /// The one queued request that is being sent at the moment, along with
1101    /// associated data that can be useful to act upon it.
1102    being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1103}
1104
1105impl StoreLockGuard {
1106    /// Get a client from the locked state, useful to get a handle on a store.
1107    fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1108        self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1109    }
1110}
1111
1112#[derive(Clone)]
1113struct QueueStorage {
1114    /// A lock to make sure the state store is only accessed once at a time, to
1115    /// make some store operations atomic.
1116    store: StoreLock,
1117
1118    /// To which room is this storage related.
1119    room_id: OwnedRoomId,
1120
1121    /// In-memory mapping of media transaction IDs to thumbnail sizes for the
1122    /// purpose of progress reporting.
1123    ///
1124    /// The keys are the transaction IDs for sending the media or gallery event
1125    /// after all uploads have finished. This allows us to easily clean up the
1126    /// cache after the event was sent.
1127    ///
1128    /// For media uploads, the value vector will always have a single element.
1129    ///
1130    /// For galleries, some gallery items might not have a thumbnail while
1131    /// others do. Since we access the thumbnails by their index within the
1132    /// gallery, the vector needs to hold optional usize's.
1133    thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1134}
1135
1136impl QueueStorage {
1137    /// Default priority for a queued request.
1138    const LOW_PRIORITY: usize = 0;
1139
1140    /// High priority for a queued request that must be handled before others.
1141    const HIGH_PRIORITY: usize = 10;
1142
1143    /// Create a new queue for queuing requests to be sent later.
1144    fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1145        Self {
1146            room_id: room,
1147            store: StoreLock { client, being_sent: Default::default() },
1148            thumbnail_file_sizes: Default::default(),
1149        }
1150    }
1151
1152    /// Push a new event to be sent in the queue, with a default priority of 0.
1153    ///
1154    /// Returns the transaction id chosen to identify the request.
1155    async fn push(
1156        &self,
1157        request: QueuedRequestKind,
1158        created_at: MilliSecondsSinceUnixEpoch,
1159    ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1160        let transaction_id = TransactionId::new();
1161
1162        self.store
1163            .lock()
1164            .await
1165            .client()?
1166            .state_store()
1167            .save_send_queue_request(
1168                &self.room_id,
1169                transaction_id.clone(),
1170                created_at,
1171                request,
1172                Self::LOW_PRIORITY,
1173            )
1174            .await?;
1175
1176        Ok(transaction_id)
1177    }
1178
1179    /// Peeks the next request to be sent, marking it as being sent.
1180    ///
1181    /// It is required to call [`Self::mark_as_sent`] after it's been
1182    /// effectively sent.
1183    async fn peek_next_to_send(
1184        &self,
1185    ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1186    {
1187        let mut guard = self.store.lock().await;
1188        let queued_requests =
1189            guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1190
1191        if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1192            let (cancel_upload_tx, cancel_upload_rx) =
1193                if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1194                    let (tx, rx) = oneshot::channel();
1195                    (Some(tx), Some(rx))
1196                } else {
1197                    Default::default()
1198                };
1199
1200            let prev = guard.being_sent.replace(BeingSentInfo {
1201                transaction_id: request.transaction_id.clone(),
1202                cancel_upload: cancel_upload_tx,
1203            });
1204
1205            if let Some(prev) = prev {
1206                error!(
1207                    prev_txn = ?prev.transaction_id,
1208                    "a previous request was still active while picking a new one"
1209                );
1210            }
1211
1212            Ok(Some((request.clone(), cancel_upload_rx)))
1213        } else {
1214            Ok(None)
1215        }
1216    }
1217
1218    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1219    /// with the given transaction id as not being sent anymore, so it can
1220    /// be removed from the queue later.
1221    async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1222        let was_being_sent = self.store.lock().await.being_sent.take();
1223
1224        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1225        if prev_txn != Some(transaction_id) {
1226            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1227        }
1228    }
1229
1230    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1231    /// with the given transaction id as being wedged (and not being sent
1232    /// anymore), so it can be removed from the queue later.
1233    async fn mark_as_wedged(
1234        &self,
1235        transaction_id: &TransactionId,
1236        reason: QueueWedgeError,
1237    ) -> Result<(), RoomSendQueueStorageError> {
1238        // Keep the lock until we're done touching the storage.
1239        let mut guard = self.store.lock().await;
1240        let was_being_sent = guard.being_sent.take();
1241
1242        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1243        if prev_txn != Some(transaction_id) {
1244            error!(
1245                ?prev_txn,
1246                "previous active request didn't match that we expect (after permanent error)",
1247            );
1248        }
1249
1250        Ok(guard
1251            .client()?
1252            .state_store()
1253            .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1254            .await?)
1255    }
1256
1257    /// Marks a request identified with the given transaction id as being now
1258    /// unwedged and adds it back to the queue.
1259    async fn mark_as_unwedged(
1260        &self,
1261        transaction_id: &TransactionId,
1262    ) -> Result<(), RoomSendQueueStorageError> {
1263        Ok(self
1264            .store
1265            .lock()
1266            .await
1267            .client()?
1268            .state_store()
1269            .update_send_queue_request_status(&self.room_id, transaction_id, None)
1270            .await?)
1271    }
1272
1273    /// Marks a request pushed with [`Self::push`] and identified with the given
1274    /// transaction id as sent, by removing it from the local queue.
1275    async fn mark_as_sent(
1276        &self,
1277        transaction_id: &TransactionId,
1278        parent_key: SentRequestKey,
1279    ) -> Result<(), RoomSendQueueStorageError> {
1280        // Keep the lock until we're done touching the storage.
1281        let mut guard = self.store.lock().await;
1282        let was_being_sent = guard.being_sent.take();
1283
1284        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1285        if prev_txn != Some(transaction_id) {
1286            error!(
1287                ?prev_txn,
1288                "previous active request didn't match that we expect (after successful send)",
1289            );
1290        }
1291
1292        let client = guard.client()?;
1293        let store = client.state_store();
1294
1295        // Update all dependent requests.
1296        store
1297            .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1298            .await?;
1299
1300        let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1301
1302        if !removed {
1303            warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1304        }
1305
1306        self.thumbnail_file_sizes.lock().remove(transaction_id);
1307
1308        Ok(())
1309    }
1310
1311    /// Cancel a sending command for an event that has been sent with
1312    /// [`Self::push`] with the given transaction id.
1313    ///
1314    /// Returns whether the given transaction has been effectively removed. If
1315    /// false, this either means that the transaction id was unrelated to
1316    /// this queue, or that the request was sent before we cancelled it.
1317    async fn cancel_event(
1318        &self,
1319        transaction_id: &TransactionId,
1320    ) -> Result<bool, RoomSendQueueStorageError> {
1321        let guard = self.store.lock().await;
1322
1323        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1324            == Some(transaction_id)
1325        {
1326            // Save the intent to redact the event.
1327            guard
1328                .client()?
1329                .state_store()
1330                .save_dependent_queued_request(
1331                    &self.room_id,
1332                    transaction_id,
1333                    ChildTransactionId::new(),
1334                    MilliSecondsSinceUnixEpoch::now(),
1335                    DependentQueuedRequestKind::RedactEvent,
1336                )
1337                .await?;
1338
1339            return Ok(true);
1340        }
1341
1342        let removed = guard
1343            .client()?
1344            .state_store()
1345            .remove_send_queue_request(&self.room_id, transaction_id)
1346            .await?;
1347
1348        self.thumbnail_file_sizes.lock().remove(transaction_id);
1349
1350        Ok(removed)
1351    }
1352
1353    /// Replace an event that has been sent with [`Self::push`] with the given
1354    /// transaction id, before it's been actually sent.
1355    ///
1356    /// Returns whether the given transaction has been effectively edited. If
1357    /// false, this either means that the transaction id was unrelated to
1358    /// this queue, or that the request was sent before we edited it.
1359    async fn replace_event(
1360        &self,
1361        transaction_id: &TransactionId,
1362        serializable: SerializableEventContent,
1363    ) -> Result<bool, RoomSendQueueStorageError> {
1364        let guard = self.store.lock().await;
1365
1366        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1367            == Some(transaction_id)
1368        {
1369            // Save the intent to edit the associated event.
1370            guard
1371                .client()?
1372                .state_store()
1373                .save_dependent_queued_request(
1374                    &self.room_id,
1375                    transaction_id,
1376                    ChildTransactionId::new(),
1377                    MilliSecondsSinceUnixEpoch::now(),
1378                    DependentQueuedRequestKind::EditEvent { new_content: serializable },
1379                )
1380                .await?;
1381
1382            return Ok(true);
1383        }
1384
1385        let edited = guard
1386            .client()?
1387            .state_store()
1388            .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1389            .await?;
1390
1391        Ok(edited)
1392    }
1393
1394    /// Push requests (and dependents) to upload a media.
1395    ///
1396    /// See the module-level description for details of the whole processus.
1397    #[allow(clippy::too_many_arguments)]
1398    async fn push_media(
1399        &self,
1400        event: RoomMessageEventContent,
1401        content_type: Mime,
1402        send_event_txn: OwnedTransactionId,
1403        created_at: MilliSecondsSinceUnixEpoch,
1404        upload_file_txn: OwnedTransactionId,
1405        file_media_request: MediaRequestParameters,
1406        thumbnail: Option<QueueThumbnailInfo>,
1407    ) -> Result<(), RoomSendQueueStorageError> {
1408        let guard = self.store.lock().await;
1409        let client = guard.client()?;
1410        let store = client.state_store();
1411
1412        // There's only a single media to be sent, so it has at most one thumbnail.
1413        let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1414
1415        let thumbnail_info = self
1416            .push_thumbnail_and_media_uploads(
1417                store,
1418                &content_type,
1419                send_event_txn.clone(),
1420                created_at,
1421                upload_file_txn.clone(),
1422                file_media_request,
1423                thumbnail,
1424            )
1425            .await?;
1426
1427        // Push the dependent request for the event itself.
1428        store
1429            .save_dependent_queued_request(
1430                &self.room_id,
1431                &upload_file_txn,
1432                send_event_txn.clone().into(),
1433                created_at,
1434                DependentQueuedRequestKind::FinishUpload {
1435                    local_echo: Box::new(event),
1436                    file_upload: upload_file_txn.clone(),
1437                    thumbnail_info,
1438                },
1439            )
1440            .await?;
1441
1442        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1443
1444        Ok(())
1445    }
1446
1447    /// Push requests (and dependents) to upload a gallery.
1448    ///
1449    /// See the module-level description for details of the whole processus.
1450    #[cfg(feature = "unstable-msc4274")]
1451    #[allow(clippy::too_many_arguments)]
1452    async fn push_gallery(
1453        &self,
1454        event: RoomMessageEventContent,
1455        send_event_txn: OwnedTransactionId,
1456        created_at: MilliSecondsSinceUnixEpoch,
1457        item_queue_infos: Vec<GalleryItemQueueInfo>,
1458    ) -> Result<(), RoomSendQueueStorageError> {
1459        let guard = self.store.lock().await;
1460        let client = guard.client()?;
1461        let store = client.state_store();
1462
1463        let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1464        let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1465
1466        let Some((first, rest)) = item_queue_infos.split_first() else {
1467            return Ok(());
1468        };
1469
1470        let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1471            first;
1472
1473        let thumbnail_info = self
1474            .push_thumbnail_and_media_uploads(
1475                store,
1476                content_type,
1477                send_event_txn.clone(),
1478                created_at,
1479                upload_file_txn.clone(),
1480                file_media_request.clone(),
1481                thumbnail.clone(),
1482            )
1483            .await?;
1484
1485        finish_item_infos
1486            .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1487        thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1488
1489        let mut last_upload_file_txn = upload_file_txn.clone();
1490
1491        for item_queue_info in rest {
1492            let GalleryItemQueueInfo {
1493                content_type,
1494                upload_file_txn,
1495                file_media_request,
1496                thumbnail,
1497            } = item_queue_info;
1498
1499            let thumbnail_info = if let Some(QueueThumbnailInfo {
1500                finish_upload_thumbnail_info: thumbnail_info,
1501                media_request_parameters: thumbnail_media_request,
1502                content_type: thumbnail_content_type,
1503                ..
1504            }) = thumbnail
1505            {
1506                let upload_thumbnail_txn = thumbnail_info.txn.clone();
1507
1508                // Save the thumbnail upload request as a dependent request of the last file
1509                // upload.
1510                store
1511                    .save_dependent_queued_request(
1512                        &self.room_id,
1513                        &last_upload_file_txn,
1514                        upload_thumbnail_txn.clone().into(),
1515                        created_at,
1516                        DependentQueuedRequestKind::UploadFileOrThumbnail {
1517                            content_type: thumbnail_content_type.to_string(),
1518                            cache_key: thumbnail_media_request.clone(),
1519                            related_to: send_event_txn.clone(),
1520                            parent_is_thumbnail_upload: false,
1521                        },
1522                    )
1523                    .await?;
1524
1525                last_upload_file_txn = upload_thumbnail_txn;
1526
1527                Some(thumbnail_info)
1528            } else {
1529                None
1530            };
1531
1532            // Save the file upload as a dependent request of the previous upload.
1533            store
1534                .save_dependent_queued_request(
1535                    &self.room_id,
1536                    &last_upload_file_txn,
1537                    upload_file_txn.clone().into(),
1538                    created_at,
1539                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1540                        content_type: content_type.to_string(),
1541                        cache_key: file_media_request.clone(),
1542                        related_to: send_event_txn.clone(),
1543                        parent_is_thumbnail_upload: thumbnail.is_some(),
1544                    },
1545                )
1546                .await?;
1547
1548            finish_item_infos.push(FinishGalleryItemInfo {
1549                file_upload: upload_file_txn.clone(),
1550                thumbnail_info: thumbnail_info.cloned(),
1551            });
1552            thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1553
1554            last_upload_file_txn = upload_file_txn.clone();
1555        }
1556
1557        // Push the request for the event itself as a dependent request of the last file
1558        // upload.
1559        store
1560            .save_dependent_queued_request(
1561                &self.room_id,
1562                &last_upload_file_txn,
1563                send_event_txn.clone().into(),
1564                created_at,
1565                DependentQueuedRequestKind::FinishGallery {
1566                    local_echo: Box::new(event),
1567                    item_infos: finish_item_infos,
1568                },
1569            )
1570            .await?;
1571
1572        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1573
1574        Ok(())
1575    }
1576
1577    /// If a thumbnail exists, pushes a [`QueuedRequestKind::MediaUpload`] to
1578    /// upload it
1579    /// and a [`DependentQueuedRequestKind::UploadFileOrThumbnail`] to upload
1580    /// the media itself. Otherwise, pushes a
1581    /// [`QueuedRequestKind::MediaUpload`] to upload the media directly.
1582    #[allow(clippy::too_many_arguments)]
1583    async fn push_thumbnail_and_media_uploads(
1584        &self,
1585        store: &DynStateStore,
1586        content_type: &Mime,
1587        send_event_txn: OwnedTransactionId,
1588        created_at: MilliSecondsSinceUnixEpoch,
1589        upload_file_txn: OwnedTransactionId,
1590        file_media_request: MediaRequestParameters,
1591        thumbnail: Option<QueueThumbnailInfo>,
1592    ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1593        if let Some(QueueThumbnailInfo {
1594            finish_upload_thumbnail_info: thumbnail_info,
1595            media_request_parameters: thumbnail_media_request,
1596            content_type: thumbnail_content_type,
1597            ..
1598        }) = thumbnail
1599        {
1600            let upload_thumbnail_txn = thumbnail_info.txn.clone();
1601
1602            // Save the thumbnail upload request.
1603            store
1604                .save_send_queue_request(
1605                    &self.room_id,
1606                    upload_thumbnail_txn.clone(),
1607                    created_at,
1608                    QueuedRequestKind::MediaUpload {
1609                        content_type: thumbnail_content_type.to_string(),
1610                        cache_key: thumbnail_media_request,
1611                        thumbnail_source: None, // the thumbnail has no thumbnails :)
1612                        related_to: send_event_txn.clone(),
1613                        #[cfg(feature = "unstable-msc4274")]
1614                        accumulated: vec![],
1615                    },
1616                    Self::LOW_PRIORITY,
1617                )
1618                .await?;
1619
1620            // Save the file upload request as a dependent request of the thumbnail upload.
1621            store
1622                .save_dependent_queued_request(
1623                    &self.room_id,
1624                    &upload_thumbnail_txn,
1625                    upload_file_txn.into(),
1626                    created_at,
1627                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1628                        content_type: content_type.to_string(),
1629                        cache_key: file_media_request,
1630                        related_to: send_event_txn,
1631                        parent_is_thumbnail_upload: true,
1632                    },
1633                )
1634                .await?;
1635
1636            Ok(Some(thumbnail_info))
1637        } else {
1638            // Save the file upload as its own request, not a dependent one.
1639            store
1640                .save_send_queue_request(
1641                    &self.room_id,
1642                    upload_file_txn,
1643                    created_at,
1644                    QueuedRequestKind::MediaUpload {
1645                        content_type: content_type.to_string(),
1646                        cache_key: file_media_request,
1647                        thumbnail_source: None,
1648                        related_to: send_event_txn,
1649                        #[cfg(feature = "unstable-msc4274")]
1650                        accumulated: vec![],
1651                    },
1652                    Self::LOW_PRIORITY,
1653                )
1654                .await?;
1655
1656            Ok(None)
1657        }
1658    }
1659
1660    /// Reacts to the given local echo of an event.
1661    #[instrument(skip(self))]
1662    async fn react(
1663        &self,
1664        transaction_id: &TransactionId,
1665        key: String,
1666        created_at: MilliSecondsSinceUnixEpoch,
1667    ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1668        let guard = self.store.lock().await;
1669        let client = guard.client()?;
1670        let store = client.state_store();
1671
1672        let requests = store.load_send_queue_requests(&self.room_id).await?;
1673
1674        // If the target event has been already sent, abort immediately.
1675        if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1676            // We didn't find it as a queued request; try to find it as a dependent queued
1677            // request.
1678            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1679            if !dependent_requests
1680                .into_iter()
1681                .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1682                .any(|child_txn| *child_txn == *transaction_id)
1683            {
1684                // We didn't find it as either a request or a dependent request, abort.
1685                return Ok(None);
1686            }
1687        }
1688
1689        // Record the dependent request.
1690        let reaction_txn_id = ChildTransactionId::new();
1691        store
1692            .save_dependent_queued_request(
1693                &self.room_id,
1694                transaction_id,
1695                reaction_txn_id.clone(),
1696                created_at,
1697                DependentQueuedRequestKind::ReactEvent { key },
1698            )
1699            .await?;
1700
1701        Ok(Some(reaction_txn_id))
1702    }
1703
1704    /// Returns a list of the local echoes, that is, all the requests that we're
1705    /// about to send but that haven't been sent yet (or are being sent).
1706    async fn local_echoes(
1707        &self,
1708        room: &RoomSendQueue,
1709    ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1710        let guard = self.store.lock().await;
1711        let client = guard.client()?;
1712        let store = client.state_store();
1713
1714        let local_requests =
1715            store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1716                Some(LocalEcho {
1717                    transaction_id: queued.transaction_id.clone(),
1718                    content: match queued.kind {
1719                        QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1720                            serialized_event: content,
1721                            send_handle: SendHandle {
1722                                room: room.clone(),
1723                                transaction_id: queued.transaction_id,
1724                                media_handles: vec![],
1725                                created_at: queued.created_at,
1726                            },
1727                            send_error: queued.error,
1728                        },
1729
1730                        QueuedRequestKind::MediaUpload { .. } => {
1731                            // Don't return uploaded medias as their own things; the accompanying
1732                            // event represented as a dependent request should be sufficient.
1733                            return None;
1734                        }
1735                    },
1736                })
1737            });
1738
1739        let reactions_and_medias = store
1740            .load_dependent_queued_requests(&self.room_id)
1741            .await?
1742            .into_iter()
1743            .filter_map(|dep| match dep.kind {
1744                DependentQueuedRequestKind::EditEvent { .. }
1745                | DependentQueuedRequestKind::RedactEvent => {
1746                    // TODO: reflect local edits/redacts too?
1747                    None
1748                }
1749
1750                DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1751                    transaction_id: dep.own_transaction_id.clone().into(),
1752                    content: LocalEchoContent::React {
1753                        key,
1754                        send_handle: SendReactionHandle {
1755                            room: room.clone(),
1756                            transaction_id: dep.own_transaction_id,
1757                        },
1758                        applies_to: dep.parent_transaction_id,
1759                    },
1760                }),
1761
1762                DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1763                    // Don't reflect these: only the associated event is interesting to observers.
1764                    None
1765                }
1766
1767                DependentQueuedRequestKind::FinishUpload {
1768                    local_echo,
1769                    file_upload,
1770                    thumbnail_info,
1771                } => {
1772                    // Materialize as an event local echo.
1773                    Some(LocalEcho {
1774                        transaction_id: dep.own_transaction_id.clone().into(),
1775                        content: LocalEchoContent::Event {
1776                            serialized_event: SerializableEventContent::new(&(*local_echo).into())
1777                                .ok()?,
1778                            send_handle: SendHandle {
1779                                room: room.clone(),
1780                                transaction_id: dep.own_transaction_id.into(),
1781                                media_handles: vec![MediaHandles {
1782                                    upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1783                                    upload_file_txn: file_upload,
1784                                }],
1785                                created_at: dep.created_at,
1786                            },
1787                            send_error: None,
1788                        },
1789                    })
1790                }
1791
1792                #[cfg(feature = "unstable-msc4274")]
1793                DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1794                    // Materialize as an event local echo.
1795                    self.create_gallery_local_echo(
1796                        dep.own_transaction_id,
1797                        room,
1798                        dep.created_at,
1799                        local_echo,
1800                        item_infos,
1801                    )
1802                }
1803            });
1804
1805        Ok(local_requests.chain(reactions_and_medias).collect())
1806    }
1807
1808    /// Create a local echo for a gallery event.
1809    #[cfg(feature = "unstable-msc4274")]
1810    fn create_gallery_local_echo(
1811        &self,
1812        transaction_id: ChildTransactionId,
1813        room: &RoomSendQueue,
1814        created_at: MilliSecondsSinceUnixEpoch,
1815        local_echo: Box<RoomMessageEventContent>,
1816        item_infos: Vec<FinishGalleryItemInfo>,
1817    ) -> Option<LocalEcho> {
1818        Some(LocalEcho {
1819            transaction_id: transaction_id.clone().into(),
1820            content: LocalEchoContent::Event {
1821                serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1822                send_handle: SendHandle {
1823                    room: room.clone(),
1824                    transaction_id: transaction_id.into(),
1825                    media_handles: item_infos
1826                        .into_iter()
1827                        .map(|i| MediaHandles {
1828                            upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1829                            upload_file_txn: i.file_upload,
1830                        })
1831                        .collect(),
1832                    created_at,
1833                },
1834                send_error: None,
1835            },
1836        })
1837    }
1838
1839    /// Try to apply a single dependent request, whether it's local or remote.
1840    ///
1841    /// This swallows errors that would retrigger every time if we retried
1842    /// applying the dependent request: invalid edit content, etc.
1843    ///
1844    /// Returns true if the dependent request has been sent (or should not be
1845    /// retried later).
1846    #[instrument(skip_all)]
1847    async fn try_apply_single_dependent_request(
1848        &self,
1849        client: &Client,
1850        dependent_request: DependentQueuedRequest,
1851        new_updates: &mut Vec<RoomSendQueueUpdate>,
1852    ) -> Result<bool, RoomSendQueueError> {
1853        let store = client.state_store();
1854
1855        let parent_key = dependent_request.parent_key;
1856
1857        match dependent_request.kind {
1858            DependentQueuedRequestKind::EditEvent { new_content } => {
1859                if let Some(parent_key) = parent_key {
1860                    let Some(event_id) = parent_key.into_event_id() else {
1861                        return Err(RoomSendQueueError::StorageError(
1862                            RoomSendQueueStorageError::InvalidParentKey,
1863                        ));
1864                    };
1865
1866                    // The parent event has been sent, so send an edit event.
1867                    let room = client
1868                        .get_room(&self.room_id)
1869                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1870
1871                    // Check the event is one we know how to edit with an edit event.
1872
1873                    // It must be deserializable…
1874                    let edited_content = match new_content.deserialize() {
1875                        Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1876                            // Assume no relationships.
1877                            EditedContent::RoomMessage(c.into())
1878                        }
1879
1880                        Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1881                            let poll_start = c.poll_start().clone();
1882                            EditedContent::PollStart {
1883                                fallback_text: poll_start.question.text.clone(),
1884                                new_content: poll_start,
1885                            }
1886                        }
1887
1888                        Ok(c) => {
1889                            warn!("Unsupported edit content type: {:?}", c.event_type());
1890                            return Ok(true);
1891                        }
1892
1893                        Err(err) => {
1894                            warn!("Unable to deserialize: {err}");
1895                            return Ok(true);
1896                        }
1897                    };
1898
1899                    let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1900                        Ok(e) => e,
1901                        Err(err) => {
1902                            warn!("couldn't create edited event: {err}");
1903                            return Ok(true);
1904                        }
1905                    };
1906
1907                    // Queue the edit event in the send queue 🧠.
1908                    let serializable = SerializableEventContent::from_raw(
1909                        Raw::new(&edit_event)
1910                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1911                        edit_event.event_type().to_string(),
1912                    );
1913
1914                    store
1915                        .save_send_queue_request(
1916                            &self.room_id,
1917                            dependent_request.own_transaction_id.into(),
1918                            dependent_request.created_at,
1919                            serializable.into(),
1920                            Self::HIGH_PRIORITY,
1921                        )
1922                        .await
1923                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1924                } else {
1925                    // The parent event is still local; update the local echo.
1926                    let edited = store
1927                        .update_send_queue_request(
1928                            &self.room_id,
1929                            &dependent_request.parent_transaction_id,
1930                            new_content.into(),
1931                        )
1932                        .await
1933                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1934
1935                    if !edited {
1936                        warn!("missing local echo upon dependent edit");
1937                    }
1938                }
1939            }
1940
1941            DependentQueuedRequestKind::RedactEvent => {
1942                if let Some(parent_key) = parent_key {
1943                    let Some(event_id) = parent_key.into_event_id() else {
1944                        return Err(RoomSendQueueError::StorageError(
1945                            RoomSendQueueStorageError::InvalidParentKey,
1946                        ));
1947                    };
1948
1949                    // The parent event has been sent; send a redaction.
1950                    let room = client
1951                        .get_room(&self.room_id)
1952                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1953
1954                    // Ideally we'd use the send queue to send the redaction, but the protocol has
1955                    // changed the shape of a room.redaction after v11, so keep it simple and try
1956                    // once here.
1957
1958                    // Note: no reason is provided because we materialize the intent of "cancel
1959                    // sending the parent event".
1960
1961                    if let Err(err) = room
1962                        .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1963                        .await
1964                    {
1965                        warn!("error when sending a redact for {event_id}: {err}");
1966                        return Ok(false);
1967                    }
1968                } else {
1969                    // The parent event is still local (sending must have failed); redact the local
1970                    // echo.
1971                    let removed = store
1972                        .remove_send_queue_request(
1973                            &self.room_id,
1974                            &dependent_request.parent_transaction_id,
1975                        )
1976                        .await
1977                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1978
1979                    if !removed {
1980                        warn!("missing local echo upon dependent redact");
1981                    }
1982                }
1983            }
1984
1985            DependentQueuedRequestKind::ReactEvent { key } => {
1986                if let Some(parent_key) = parent_key {
1987                    let Some(parent_event_id) = parent_key.into_event_id() else {
1988                        return Err(RoomSendQueueError::StorageError(
1989                            RoomSendQueueStorageError::InvalidParentKey,
1990                        ));
1991                    };
1992
1993                    // Queue the reaction event in the send queue 🧠.
1994                    let react_event =
1995                        ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1996                    let serializable = SerializableEventContent::from_raw(
1997                        Raw::new(&react_event)
1998                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1999                        react_event.event_type().to_string(),
2000                    );
2001
2002                    store
2003                        .save_send_queue_request(
2004                            &self.room_id,
2005                            dependent_request.own_transaction_id.into(),
2006                            dependent_request.created_at,
2007                            serializable.into(),
2008                            Self::HIGH_PRIORITY,
2009                        )
2010                        .await
2011                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2012                } else {
2013                    // Not applied yet, we should retry later => false.
2014                    return Ok(false);
2015                }
2016            }
2017
2018            DependentQueuedRequestKind::UploadFileOrThumbnail {
2019                content_type,
2020                cache_key,
2021                related_to,
2022                parent_is_thumbnail_upload,
2023            } => {
2024                let Some(parent_key) = parent_key else {
2025                    // Not finished yet, we should retry later => false.
2026                    return Ok(false);
2027                };
2028                self.handle_dependent_file_or_thumbnail_upload(
2029                    client,
2030                    dependent_request.own_transaction_id.into(),
2031                    parent_key,
2032                    content_type,
2033                    cache_key,
2034                    related_to,
2035                    parent_is_thumbnail_upload,
2036                )
2037                .await?;
2038            }
2039
2040            DependentQueuedRequestKind::FinishUpload {
2041                local_echo,
2042                file_upload,
2043                thumbnail_info,
2044            } => {
2045                let Some(parent_key) = parent_key else {
2046                    // Not finished yet, we should retry later => false.
2047                    return Ok(false);
2048                };
2049                self.handle_dependent_finish_upload(
2050                    client,
2051                    dependent_request.own_transaction_id.into(),
2052                    parent_key,
2053                    *local_echo,
2054                    file_upload,
2055                    thumbnail_info,
2056                    new_updates,
2057                )
2058                .await?;
2059            }
2060
2061            #[cfg(feature = "unstable-msc4274")]
2062            DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2063                let Some(parent_key) = parent_key else {
2064                    // Not finished yet, we should retry later => false.
2065                    return Ok(false);
2066                };
2067                self.handle_dependent_finish_gallery_upload(
2068                    client,
2069                    dependent_request.own_transaction_id.into(),
2070                    parent_key,
2071                    *local_echo,
2072                    item_infos,
2073                    new_updates,
2074                )
2075                .await?;
2076            }
2077        }
2078
2079        Ok(true)
2080    }
2081
2082    #[instrument(skip(self))]
2083    async fn apply_dependent_requests(
2084        &self,
2085        new_updates: &mut Vec<RoomSendQueueUpdate>,
2086    ) -> Result<(), RoomSendQueueError> {
2087        let guard = self.store.lock().await;
2088
2089        let client = guard.client()?;
2090        let store = client.state_store();
2091
2092        let dependent_requests = store
2093            .load_dependent_queued_requests(&self.room_id)
2094            .await
2095            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2096
2097        let num_initial_dependent_requests = dependent_requests.len();
2098        if num_initial_dependent_requests == 0 {
2099            // Returning early here avoids a bit of useless logging.
2100            return Ok(());
2101        }
2102
2103        let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2104
2105        // Get rid of the all non-canonical dependent events.
2106        for original in &dependent_requests {
2107            if !canonicalized_dependent_requests
2108                .iter()
2109                .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2110            {
2111                store
2112                    .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2113                    .await
2114                    .map_err(RoomSendQueueStorageError::StateStoreError)?;
2115            }
2116        }
2117
2118        let mut num_dependent_requests = canonicalized_dependent_requests.len();
2119
2120        debug!(
2121            num_dependent_requests,
2122            num_initial_dependent_requests, "starting handling of dependent requests"
2123        );
2124
2125        for dependent in canonicalized_dependent_requests {
2126            let dependent_id = dependent.own_transaction_id.clone();
2127
2128            match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2129                Ok(should_remove) => {
2130                    if should_remove {
2131                        // The dependent request has been successfully applied, forget about it.
2132                        store
2133                            .remove_dependent_queued_request(&self.room_id, &dependent_id)
2134                            .await
2135                            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2136
2137                        num_dependent_requests -= 1;
2138                    }
2139                }
2140
2141                Err(err) => {
2142                    warn!("error when applying single dependent request: {err}");
2143                }
2144            }
2145        }
2146
2147        debug!(
2148            leftover_dependent_requests = num_dependent_requests,
2149            "stopped handling dependent request"
2150        );
2151
2152        Ok(())
2153    }
2154
2155    /// Remove a single dependent request from storage.
2156    async fn remove_dependent_send_queue_request(
2157        &self,
2158        dependent_event_id: &ChildTransactionId,
2159    ) -> Result<bool, RoomSendQueueStorageError> {
2160        Ok(self
2161            .store
2162            .lock()
2163            .await
2164            .client()?
2165            .state_store()
2166            .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2167            .await?)
2168    }
2169}
2170
2171#[cfg(feature = "unstable-msc4274")]
2172/// Metadata needed for pushing gallery item uploads onto the send queue.
2173struct GalleryItemQueueInfo {
2174    content_type: Mime,
2175    upload_file_txn: OwnedTransactionId,
2176    file_media_request: MediaRequestParameters,
2177    thumbnail: Option<QueueThumbnailInfo>,
2178}
2179
2180/// The content of a local echo.
2181#[derive(Clone, Debug)]
2182pub enum LocalEchoContent {
2183    /// The local echo contains an actual event ready to display.
2184    Event {
2185        /// Content of the event itself (along with its type) that we are about
2186        /// to send.
2187        serialized_event: SerializableEventContent,
2188        /// A handle to manipulate the sending of the associated event.
2189        send_handle: SendHandle,
2190        /// Whether trying to send this local echo failed in the past with an
2191        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
2192        send_error: Option<QueueWedgeError>,
2193    },
2194
2195    /// A local echo has been reacted to.
2196    React {
2197        /// The key with which the local echo has been reacted to.
2198        key: String,
2199        /// A handle to manipulate the sending of the reaction.
2200        send_handle: SendReactionHandle,
2201        /// The local echo which has been reacted to.
2202        applies_to: OwnedTransactionId,
2203    },
2204}
2205
2206/// A local representation for a request that hasn't been sent yet to the user's
2207/// homeserver.
2208#[derive(Clone, Debug)]
2209pub struct LocalEcho {
2210    /// Transaction id used to identify the associated request.
2211    pub transaction_id: OwnedTransactionId,
2212    /// The content for the local echo.
2213    pub content: LocalEchoContent,
2214}
2215
2216/// An update to a room send queue, observable with
2217/// [`RoomSendQueue::subscribe`].
2218#[derive(Clone, Debug)]
2219pub enum RoomSendQueueUpdate {
2220    /// A new local event is being sent.
2221    ///
2222    /// There's been a user query to create this event. It is being sent to the
2223    /// server.
2224    NewLocalEvent(LocalEcho),
2225
2226    /// A local event that hadn't been sent to the server yet has been cancelled
2227    /// before sending.
2228    CancelledLocalEvent {
2229        /// Transaction id used to identify this event.
2230        transaction_id: OwnedTransactionId,
2231    },
2232
2233    /// A local event's content has been replaced with something else.
2234    ReplacedLocalEvent {
2235        /// Transaction id used to identify this event.
2236        transaction_id: OwnedTransactionId,
2237
2238        /// The new content replacing the previous one.
2239        new_content: SerializableEventContent,
2240    },
2241
2242    /// An error happened when an event was being sent.
2243    ///
2244    /// The event has not been removed from the queue. All the send queues
2245    /// will be disabled after this happens, and must be manually re-enabled.
2246    SendError {
2247        /// Transaction id used to identify this event.
2248        transaction_id: OwnedTransactionId,
2249        /// Error received while sending the event.
2250        error: Arc<crate::Error>,
2251        /// Whether the error is considered recoverable or not.
2252        ///
2253        /// An error that's recoverable will disable the room's send queue,
2254        /// while an unrecoverable error will be parked, until the user
2255        /// decides to cancel sending it.
2256        is_recoverable: bool,
2257    },
2258
2259    /// The event has been unwedged and sending is now being retried.
2260    RetryEvent {
2261        /// Transaction id used to identify this event.
2262        transaction_id: OwnedTransactionId,
2263    },
2264
2265    /// The event has been sent to the server, and the query returned
2266    /// successfully.
2267    SentEvent {
2268        /// Transaction id used to identify this event.
2269        transaction_id: OwnedTransactionId,
2270        /// Received event id from the send response.
2271        event_id: OwnedEventId,
2272    },
2273
2274    /// A media upload (consisting of a file and possibly a thumbnail) has made
2275    /// progress.
2276    MediaUpload {
2277        /// The media event this uploaded media relates to.
2278        related_to: OwnedTransactionId,
2279
2280        /// The final media source for the file if it has finished uploading.
2281        file: Option<MediaSource>,
2282
2283        /// The index of the media within the transaction. A file and its
2284        /// thumbnail share the same index. Will always be 0 for non-gallery
2285        /// media uploads.
2286        index: u64,
2287
2288        /// The combined upload progress across the file and, if existing, its
2289        /// thumbnail. For gallery uploads, the progress is reported per indexed
2290        /// gallery item.
2291        progress: AbstractProgress,
2292    },
2293}
2294
2295/// A [`RoomSendQueueUpdate`] with an associated [`OwnedRoomId`].
2296///
2297/// This is used by [`SendQueue::subscribe`] to get a single channel to receive
2298/// updates for all [`RoomSendQueue`]s.
2299#[derive(Clone, Debug)]
2300pub struct SendQueueUpdate {
2301    /// The room where the update happened.
2302    pub room_id: OwnedRoomId,
2303
2304    /// The update for this room.
2305    pub update: RoomSendQueueUpdate,
2306}
2307
2308/// An error triggered by the send queue module.
2309#[derive(Debug, thiserror::Error)]
2310pub enum RoomSendQueueError {
2311    /// The room isn't in the joined state.
2312    #[error("the room isn't in the joined state")]
2313    RoomNotJoined,
2314
2315    /// The room is missing from the client.
2316    ///
2317    /// This happens only whenever the client is shutting down.
2318    #[error("the room is now missing from the client")]
2319    RoomDisappeared,
2320
2321    /// Error coming from storage.
2322    #[error(transparent)]
2323    StorageError(#[from] RoomSendQueueStorageError),
2324
2325    /// The attachment event failed to be created.
2326    #[error("the attachment event could not be created")]
2327    FailedToCreateAttachment,
2328
2329    /// The gallery contains no items.
2330    #[cfg(feature = "unstable-msc4274")]
2331    #[error("the gallery contains no items")]
2332    EmptyGallery,
2333
2334    /// The gallery event failed to be created.
2335    #[cfg(feature = "unstable-msc4274")]
2336    #[error("the gallery event could not be created")]
2337    FailedToCreateGallery,
2338}
2339
2340/// An error triggered by the send queue storage.
2341#[derive(Debug, thiserror::Error)]
2342pub enum RoomSendQueueStorageError {
2343    /// Error caused by the state store.
2344    #[error(transparent)]
2345    StateStoreError(#[from] StoreError),
2346
2347    /// Error caused by the event cache store.
2348    #[error(transparent)]
2349    EventCacheStoreError(#[from] EventCacheStoreError),
2350
2351    /// Error caused when attempting to get a handle on the event cache store.
2352    #[error(transparent)]
2353    LockError(#[from] LockStoreError),
2354
2355    /// Error caused when (de)serializing into/from json.
2356    #[error(transparent)]
2357    JsonSerialization(#[from] serde_json::Error),
2358
2359    /// A parent key was expected to be of a certain type, and it was another
2360    /// type instead.
2361    #[error("a dependent event had an invalid parent key type")]
2362    InvalidParentKey,
2363
2364    /// The client is shutting down.
2365    #[error("The client is shutting down.")]
2366    ClientShuttingDown,
2367
2368    /// An operation not implemented on a send handle.
2369    #[error("This operation is not implemented for media uploads")]
2370    OperationNotImplementedYet,
2371
2372    /// Trying to edit a media caption for something that's not a media.
2373    #[error("Can't edit a media caption when the underlying event isn't a media")]
2374    InvalidMediaCaptionEdit,
2375}
2376
2377/// Extra transaction IDs useful during an upload.
2378#[derive(Clone, Debug)]
2379struct MediaHandles {
2380    /// Transaction id used when uploading the thumbnail.
2381    ///
2382    /// Optional because a media can be uploaded without a thumbnail.
2383    upload_thumbnail_txn: Option<OwnedTransactionId>,
2384
2385    /// Transaction id used when uploading the media itself.
2386    upload_file_txn: OwnedTransactionId,
2387}
2388
2389/// A handle to manipulate an event that was scheduled to be sent to a room.
2390#[derive(Clone, Debug)]
2391pub struct SendHandle {
2392    /// Link to the send queue used to send this request.
2393    room: RoomSendQueue,
2394
2395    /// Transaction id used for the sent request.
2396    ///
2397    /// If this is a media upload, this is the "main" transaction id, i.e. the
2398    /// one used to send the event, and that will be seen by observers.
2399    transaction_id: OwnedTransactionId,
2400
2401    /// Additional handles for a media upload.
2402    media_handles: Vec<MediaHandles>,
2403
2404    /// The time at which the event to be sent has been created.
2405    pub created_at: MilliSecondsSinceUnixEpoch,
2406}
2407
2408impl SendHandle {
2409    /// Creates a new [`SendHandle`].
2410    #[cfg(test)]
2411    pub(crate) fn new(
2412        room: RoomSendQueue,
2413        transaction_id: OwnedTransactionId,
2414        created_at: MilliSecondsSinceUnixEpoch,
2415    ) -> Self {
2416        Self { room, transaction_id, media_handles: vec![], created_at }
2417    }
2418
2419    fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2420        if !self.media_handles.is_empty() {
2421            Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2422        } else {
2423            Ok(())
2424        }
2425    }
2426
2427    /// Aborts the sending of the event, if it wasn't sent yet.
2428    ///
2429    /// Returns true if the sending could be aborted, false if not (i.e. the
2430    /// event had already been sent).
2431    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2432    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2433        trace!("received an abort request");
2434
2435        let queue = &self.room.inner.queue;
2436
2437        for handles in &self.media_handles {
2438            if queue.abort_upload(&self.transaction_id, handles).await? {
2439                // Propagate a cancelled update.
2440                self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2441                    transaction_id: self.transaction_id.clone(),
2442                });
2443
2444                return Ok(true);
2445            }
2446
2447            // If it failed, it means the sending of the event is not a
2448            // dependent request anymore. Fall back to the regular
2449            // code path below, that handles aborting sending of an event.
2450        }
2451
2452        if queue.cancel_event(&self.transaction_id).await? {
2453            trace!("successful abort");
2454
2455            // Propagate a cancelled update too.
2456            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2457                transaction_id: self.transaction_id.clone(),
2458            });
2459
2460            Ok(true)
2461        } else {
2462            debug!("local echo didn't exist anymore, can't abort");
2463            Ok(false)
2464        }
2465    }
2466
2467    /// Edits the content of a local echo with a raw event content.
2468    ///
2469    /// Returns true if the event to be sent was replaced, false if not (i.e.
2470    /// the event had already been sent).
2471    #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2472    pub async fn edit_raw(
2473        &self,
2474        new_content: Raw<AnyMessageLikeEventContent>,
2475        event_type: String,
2476    ) -> Result<bool, RoomSendQueueStorageError> {
2477        trace!("received an edit request");
2478        self.nyi_for_uploads()?;
2479
2480        let serializable = SerializableEventContent::from_raw(new_content, event_type);
2481
2482        if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2483            trace!("successful edit");
2484
2485            // Wake up the queue, in case the room was asleep before the edit.
2486            self.room.inner.notifier.notify_one();
2487
2488            // Propagate a replaced update too.
2489            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2490                transaction_id: self.transaction_id.clone(),
2491                new_content: serializable,
2492            });
2493
2494            Ok(true)
2495        } else {
2496            debug!("local echo doesn't exist anymore, can't edit");
2497            Ok(false)
2498        }
2499    }
2500
2501    /// Edits the content of a local echo with an event content.
2502    ///
2503    /// Returns true if the event to be sent was replaced, false if not (i.e.
2504    /// the event had already been sent).
2505    pub async fn edit(
2506        &self,
2507        new_content: AnyMessageLikeEventContent,
2508    ) -> Result<bool, RoomSendQueueStorageError> {
2509        self.edit_raw(
2510            Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2511            new_content.event_type().to_string(),
2512        )
2513        .await
2514    }
2515
2516    /// Edits the content of a local echo with a media caption.
2517    ///
2518    /// Will fail if the event to be sent, represented by this send handle,
2519    /// wasn't a media.
2520    pub async fn edit_media_caption(
2521        &self,
2522        caption: Option<String>,
2523        formatted_caption: Option<FormattedBody>,
2524        mentions: Option<Mentions>,
2525    ) -> Result<bool, RoomSendQueueStorageError> {
2526        if let Some(new_content) = self
2527            .room
2528            .inner
2529            .queue
2530            .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2531            .await?
2532        {
2533            trace!("successful edit of media caption");
2534
2535            // Wake up the queue, in case the room was asleep before the edit.
2536            self.room.inner.notifier.notify_one();
2537
2538            let new_content = SerializableEventContent::new(&new_content)
2539                .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2540
2541            // Propagate a replaced update too.
2542            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2543                transaction_id: self.transaction_id.clone(),
2544                new_content,
2545            });
2546
2547            Ok(true)
2548        } else {
2549            debug!("local echo doesn't exist anymore, can't edit media caption");
2550            Ok(false)
2551        }
2552    }
2553
2554    /// Unwedge a local echo identified by its transaction identifier and try to
2555    /// resend it.
2556    pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2557        let room = &self.room.inner;
2558        room.queue
2559            .mark_as_unwedged(&self.transaction_id)
2560            .await
2561            .map_err(RoomSendQueueError::StorageError)?;
2562
2563        // If we have media handles, also try to unwedge them.
2564        //
2565        // It's fine to always do it to *all* the transaction IDs at once, because only
2566        // one of the three requests will be active at the same time, i.e. only
2567        // one entry will be updated in the store. The other two are either
2568        // done, or dependent requests.
2569
2570        for handles in &self.media_handles {
2571            room.queue
2572                .mark_as_unwedged(&handles.upload_file_txn)
2573                .await
2574                .map_err(RoomSendQueueError::StorageError)?;
2575
2576            if let Some(txn) = &handles.upload_thumbnail_txn {
2577                room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2578            }
2579        }
2580
2581        // Wake up the queue, in case the room was asleep before unwedging the request.
2582        room.notifier.notify_one();
2583
2584        self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2585            transaction_id: self.transaction_id.clone(),
2586        });
2587
2588        Ok(())
2589    }
2590
2591    /// Send a reaction to the event as soon as it's sent.
2592    ///
2593    /// If returning `Ok(None)`; this means the reaction couldn't be sent
2594    /// because the event is already a remote one.
2595    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2596    pub async fn react(
2597        &self,
2598        key: String,
2599    ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2600        trace!("received an intent to react");
2601
2602        let created_at = MilliSecondsSinceUnixEpoch::now();
2603        if let Some(reaction_txn_id) =
2604            self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2605        {
2606            trace!("successfully queued react");
2607
2608            // Wake up the queue, in case the room was asleep before the sending.
2609            self.room.inner.notifier.notify_one();
2610
2611            // Propagate a new local event.
2612            let send_handle = SendReactionHandle {
2613                room: self.room.clone(),
2614                transaction_id: reaction_txn_id.clone(),
2615            };
2616
2617            self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2618                // Note: we do want to use the `txn_id` we're going to use for the reaction, not
2619                // the one for the event we're reacting to.
2620                transaction_id: reaction_txn_id.into(),
2621                content: LocalEchoContent::React {
2622                    key,
2623                    send_handle: send_handle.clone(),
2624                    applies_to: self.transaction_id.clone(),
2625                },
2626            }));
2627
2628            Ok(Some(send_handle))
2629        } else {
2630            debug!("local echo doesn't exist anymore, can't react");
2631            Ok(None)
2632        }
2633    }
2634}
2635
2636/// A handle to execute actions on the sending of a reaction.
2637#[derive(Clone, Debug)]
2638pub struct SendReactionHandle {
2639    /// Reference to the send queue for the room where this reaction was sent.
2640    room: RoomSendQueue,
2641    /// The own transaction id for the reaction.
2642    transaction_id: ChildTransactionId,
2643}
2644
2645impl SendReactionHandle {
2646    /// Abort the sending of the reaction.
2647    ///
2648    /// Will return true if the reaction could be aborted, false if it's been
2649    /// sent (and there's no matching local echo anymore).
2650    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2651        if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2652            // Simple case: the reaction was found in the dependent event list.
2653
2654            // Propagate a cancelled update too.
2655            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2656                transaction_id: self.transaction_id.clone().into(),
2657            });
2658
2659            return Ok(true);
2660        }
2661
2662        // The reaction has already been queued for sending, try to abort it using a
2663        // regular abort.
2664        let handle = SendHandle {
2665            room: self.room.clone(),
2666            transaction_id: self.transaction_id.clone().into(),
2667            media_handles: vec![],
2668            created_at: MilliSecondsSinceUnixEpoch::now(),
2669        };
2670
2671        handle.abort().await
2672    }
2673
2674    /// The transaction id that will be used to send this reaction later.
2675    pub fn transaction_id(&self) -> &TransactionId {
2676        &self.transaction_id
2677    }
2678}
2679
2680/// From a given source of [`DependentQueuedRequest`], return only the most
2681/// meaningful, i.e. the ones that wouldn't be overridden after applying the
2682/// others.
2683fn canonicalize_dependent_requests(
2684    dependent: &[DependentQueuedRequest],
2685) -> Vec<DependentQueuedRequest> {
2686    let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2687
2688    for d in dependent {
2689        let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2690
2691        if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2692            // The parent event has already been flagged for redaction, don't consider the
2693            // other dependent events.
2694            continue;
2695        }
2696
2697        match &d.kind {
2698            DependentQueuedRequestKind::EditEvent { .. } => {
2699                // Replace any previous edit with this one.
2700                if let Some(prev_edit) = prevs
2701                    .iter_mut()
2702                    .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2703                {
2704                    *prev_edit = d;
2705                } else {
2706                    prevs.insert(0, d);
2707                }
2708            }
2709
2710            DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2711            | DependentQueuedRequestKind::FinishUpload { .. }
2712            | DependentQueuedRequestKind::ReactEvent { .. } => {
2713                // These requests can't be canonicalized, push them as is.
2714                prevs.push(d);
2715            }
2716
2717            #[cfg(feature = "unstable-msc4274")]
2718            DependentQueuedRequestKind::FinishGallery { .. } => {
2719                // This request can't be canonicalized, push it as is.
2720                prevs.push(d);
2721            }
2722
2723            DependentQueuedRequestKind::RedactEvent => {
2724                // Remove every other dependent action.
2725                prevs.clear();
2726                prevs.push(d);
2727            }
2728        }
2729    }
2730
2731    by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2732}
2733
2734#[cfg(all(test, not(target_family = "wasm")))]
2735mod tests {
2736    use std::{sync::Arc, time::Duration};
2737
2738    use assert_matches2::{assert_let, assert_matches};
2739    use matrix_sdk_base::store::{
2740        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2741        SerializableEventContent,
2742    };
2743    use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2744    use ruma::{
2745        events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2746        room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2747    };
2748
2749    use super::canonicalize_dependent_requests;
2750    use crate::{client::WeakClient, test_utils::logged_in_client};
2751
2752    #[test]
2753    fn test_canonicalize_dependent_events_created_at() {
2754        // Test to ensure the created_at field is being serialized and retrieved
2755        // correctly.
2756        let txn = TransactionId::new();
2757        let created_at = MilliSecondsSinceUnixEpoch::now();
2758
2759        let edit = DependentQueuedRequest {
2760            own_transaction_id: ChildTransactionId::new(),
2761            parent_transaction_id: txn.clone(),
2762            kind: DependentQueuedRequestKind::EditEvent {
2763                new_content: SerializableEventContent::new(
2764                    &RoomMessageEventContent::text_plain("edit").into(),
2765                )
2766                .unwrap(),
2767            },
2768            parent_key: None,
2769            created_at,
2770        };
2771
2772        let res = canonicalize_dependent_requests(&[edit]);
2773
2774        assert_eq!(res.len(), 1);
2775        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2776        assert_let!(
2777            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2778        );
2779        assert_eq!(msg.body(), "edit");
2780        assert_eq!(res[0].parent_transaction_id, txn);
2781        assert_eq!(res[0].created_at, created_at);
2782    }
2783
2784    #[async_test]
2785    async fn test_client_no_cycle_with_send_queue() {
2786        for enabled in [true, false] {
2787            let client = logged_in_client(None).await;
2788            let weak_client = WeakClient::from_client(&client);
2789
2790            {
2791                let mut sync_response_builder = SyncResponseBuilder::new();
2792
2793                let room_id = room_id!("!a:b.c");
2794
2795                // Make sure the client knows about the room.
2796                client
2797                    .base_client()
2798                    .receive_sync_response(
2799                        sync_response_builder
2800                            .add_joined_room(JoinedRoomBuilder::new(room_id))
2801                            .build_sync_response(),
2802                    )
2803                    .await
2804                    .unwrap();
2805
2806                let room = client.get_room(room_id).unwrap();
2807                let q = room.send_queue();
2808
2809                let _watcher = q.subscribe().await;
2810
2811                client.send_queue().set_enabled(enabled).await;
2812            }
2813
2814            drop(client);
2815
2816            // Give a bit of time for background tasks to die.
2817            tokio::time::sleep(Duration::from_millis(500)).await;
2818
2819            // The weak client must be the last reference to the client now.
2820            let client = weak_client.get();
2821            assert!(
2822                client.is_none(),
2823                "too many strong references to the client: {}",
2824                Arc::strong_count(&client.unwrap().inner)
2825            );
2826        }
2827    }
2828
2829    #[test]
2830    fn test_canonicalize_dependent_events_smoke_test() {
2831        // Smoke test: canonicalizing a single dependent event returns it.
2832        let txn = TransactionId::new();
2833
2834        let edit = DependentQueuedRequest {
2835            own_transaction_id: ChildTransactionId::new(),
2836            parent_transaction_id: txn.clone(),
2837            kind: DependentQueuedRequestKind::EditEvent {
2838                new_content: SerializableEventContent::new(
2839                    &RoomMessageEventContent::text_plain("edit").into(),
2840                )
2841                .unwrap(),
2842            },
2843            parent_key: None,
2844            created_at: MilliSecondsSinceUnixEpoch::now(),
2845        };
2846        let res = canonicalize_dependent_requests(&[edit]);
2847
2848        assert_eq!(res.len(), 1);
2849        assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2850        assert_eq!(res[0].parent_transaction_id, txn);
2851        assert!(res[0].parent_key.is_none());
2852    }
2853
2854    #[test]
2855    fn test_canonicalize_dependent_events_redaction_preferred() {
2856        // A redaction is preferred over any other kind of dependent event.
2857        let txn = TransactionId::new();
2858
2859        let mut inputs = Vec::with_capacity(100);
2860        let redact = DependentQueuedRequest {
2861            own_transaction_id: ChildTransactionId::new(),
2862            parent_transaction_id: txn.clone(),
2863            kind: DependentQueuedRequestKind::RedactEvent,
2864            parent_key: None,
2865            created_at: MilliSecondsSinceUnixEpoch::now(),
2866        };
2867
2868        let edit = DependentQueuedRequest {
2869            own_transaction_id: ChildTransactionId::new(),
2870            parent_transaction_id: txn.clone(),
2871            kind: DependentQueuedRequestKind::EditEvent {
2872                new_content: SerializableEventContent::new(
2873                    &RoomMessageEventContent::text_plain("edit").into(),
2874                )
2875                .unwrap(),
2876            },
2877            parent_key: None,
2878            created_at: MilliSecondsSinceUnixEpoch::now(),
2879        };
2880
2881        inputs.push({
2882            let mut edit = edit.clone();
2883            edit.own_transaction_id = ChildTransactionId::new();
2884            edit
2885        });
2886
2887        inputs.push(redact);
2888
2889        for _ in 0..98 {
2890            let mut edit = edit.clone();
2891            edit.own_transaction_id = ChildTransactionId::new();
2892            inputs.push(edit);
2893        }
2894
2895        let res = canonicalize_dependent_requests(&inputs);
2896
2897        assert_eq!(res.len(), 1);
2898        assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2899        assert_eq!(res[0].parent_transaction_id, txn);
2900    }
2901
2902    #[test]
2903    fn test_canonicalize_dependent_events_last_edit_preferred() {
2904        let parent_txn = TransactionId::new();
2905
2906        // The latest edit of a list is always preferred.
2907        let inputs = (0..10)
2908            .map(|i| DependentQueuedRequest {
2909                own_transaction_id: ChildTransactionId::new(),
2910                parent_transaction_id: parent_txn.clone(),
2911                kind: DependentQueuedRequestKind::EditEvent {
2912                    new_content: SerializableEventContent::new(
2913                        &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2914                    )
2915                    .unwrap(),
2916                },
2917                parent_key: None,
2918                created_at: MilliSecondsSinceUnixEpoch::now(),
2919            })
2920            .collect::<Vec<_>>();
2921
2922        let txn = inputs[9].parent_transaction_id.clone();
2923
2924        let res = canonicalize_dependent_requests(&inputs);
2925
2926        assert_eq!(res.len(), 1);
2927        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2928        assert_let!(
2929            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2930        );
2931        assert_eq!(msg.body(), "edit9");
2932        assert_eq!(res[0].parent_transaction_id, txn);
2933    }
2934
2935    #[test]
2936    fn test_canonicalize_multiple_local_echoes() {
2937        let txn1 = TransactionId::new();
2938        let txn2 = TransactionId::new();
2939
2940        let child1 = ChildTransactionId::new();
2941        let child2 = ChildTransactionId::new();
2942
2943        let inputs = vec![
2944            // This one pertains to txn1.
2945            DependentQueuedRequest {
2946                own_transaction_id: child1.clone(),
2947                kind: DependentQueuedRequestKind::RedactEvent,
2948                parent_transaction_id: txn1.clone(),
2949                parent_key: None,
2950                created_at: MilliSecondsSinceUnixEpoch::now(),
2951            },
2952            // This one pertains to txn2.
2953            DependentQueuedRequest {
2954                own_transaction_id: child2,
2955                kind: DependentQueuedRequestKind::EditEvent {
2956                    new_content: SerializableEventContent::new(
2957                        &RoomMessageEventContent::text_plain("edit").into(),
2958                    )
2959                    .unwrap(),
2960                },
2961                parent_transaction_id: txn2.clone(),
2962                parent_key: None,
2963                created_at: MilliSecondsSinceUnixEpoch::now(),
2964            },
2965        ];
2966
2967        let res = canonicalize_dependent_requests(&inputs);
2968
2969        // The canonicalization shouldn't depend per event id.
2970        assert_eq!(res.len(), 2);
2971
2972        for dependent in res {
2973            if dependent.own_transaction_id == child1 {
2974                assert_eq!(dependent.parent_transaction_id, txn1);
2975                assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2976            } else {
2977                assert_eq!(dependent.parent_transaction_id, txn2);
2978                assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2979            }
2980        }
2981    }
2982
2983    #[test]
2984    fn test_canonicalize_reactions_after_edits() {
2985        // Sending reactions should happen after edits to a given event.
2986        let txn = TransactionId::new();
2987
2988        let react_id = ChildTransactionId::new();
2989        let react = DependentQueuedRequest {
2990            own_transaction_id: react_id.clone(),
2991            kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2992            parent_transaction_id: txn.clone(),
2993            parent_key: None,
2994            created_at: MilliSecondsSinceUnixEpoch::now(),
2995        };
2996
2997        let edit_id = ChildTransactionId::new();
2998        let edit = DependentQueuedRequest {
2999            own_transaction_id: edit_id.clone(),
3000            kind: DependentQueuedRequestKind::EditEvent {
3001                new_content: SerializableEventContent::new(
3002                    &RoomMessageEventContent::text_plain("edit").into(),
3003                )
3004                .unwrap(),
3005            },
3006            parent_transaction_id: txn,
3007            parent_key: None,
3008            created_at: MilliSecondsSinceUnixEpoch::now(),
3009        };
3010
3011        let res = canonicalize_dependent_requests(&[react, edit]);
3012
3013        assert_eq!(res.len(), 2);
3014        assert_eq!(res[0].own_transaction_id, edit_id);
3015        assert_eq!(res[1].own_transaction_id, react_id);
3016    }
3017}