matrix_sdk/send_queue/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A send queue facility to serializing queuing and sending of messages.
16//!
17//! # [`Room`] send queue
18//!
19//! Each room gets its own [`RoomSendQueue`], that's available by calling
20//! [`Room::send_queue()`]. The first time this method is called, it will spawn
21//! a background task that's used to actually send events, in the order they
22//! were passed from calls to [`RoomSendQueue::send()`].
23//!
24//! This queue tries to simplify error management around sending events, using
25//! [`RoomSendQueue::send`] or [`RoomSendQueue::send_raw`]: by default, it will retry to send the
26//! same event a few times, before automatically disabling itself, and emitting
27//! a notification that can be listened to with the global send queue (see
28//! paragraph below) or using [`RoomSendQueue::subscribe()`].
29//!
30//! It is possible to control whether a single room is enabled using
31//! [`RoomSendQueue::set_enabled()`].
32//!
33//! # Global [`SendQueue`] object
34//!
35//! The [`Client::send_queue()`] method returns an API object allowing to
36//! control all the room send queues:
37//!
38//! - enable/disable them all at once with [`SendQueue::set_enabled()`].
39//! - get notifications about send errors with [`SendQueue::subscribe_errors`].
40//! - reload all unsent events that had been persisted in storage using
41//!   [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is
42//!   recommended to call this method during initialization of a client,
43//!   otherwise persisted unsent events will only be re-sent after the send
44//!   queue for the given room has been reopened for the first time.
45//!
46//! # Send handle
47//!
48//! Just after queuing a request to send something, a [`SendHandle`] is
49//! returned, allowing manipulating the inflight request.
50//!
51//! For a send handle for an event, it's possible to edit the event / abort
52//! sending it. If it was still in the queue (i.e. not sent yet, or not being
53//! sent), then such an action would happen locally (i.e. in the database).
54//! Otherwise, it is "too late": the background task may be sending
55//! the event already, or has sent it; in that case, the edit/aborting must
56//! happen as an actual event materializing this, on the server. To accomplish
57//! this, the send queue may send such an event, using the dependency system
58//! described below.
59//!
60//! # Dependency system
61//!
62//! The send queue includes a simple dependency system, where a
63//! [`QueuedRequest`] can have zero or more dependents in the form of
64//! [`DependentQueuedRequest`]. A dependent queued request can have at most one
65//! depended-upon (parent) queued request.
66//!
67//! This allows implementing deferred edits/redacts, as hinted to in the
68//! previous section.
69//!
70//! ## Media upload
71//!
72//! This dependency system also allows uploading medias, since the media's
73//! *content* must be uploaded before we send the media *event* that describes
74//! it.
75//!
76//! In the simplest case, that is, a media file and its event must be sent (i.e.
77//! no thumbnails):
78//!
79//! - The file's content is immediately cached in the
80//!   [`matrix_sdk_base::event_cache::store::EventCacheStore`], using an MXC ID
81//!   that is temporary and designates a local URI without any possible doubt.
82//! - An initial media event is created and uses this temporary MXC ID, and
83//!   propagated as a local echo for an event.
84//! - A [`QueuedRequest`] is pushed to upload the file's media
85//!   ([`QueuedRequestKind::MediaUpload`]).
86//! - A [`DependentQueuedRequest`] is pushed to finish the upload
87//!   ([`DependentQueuedRequestKind::FinishUpload`]).
88//!
89//! What is expected to happen, if all goes well, is the following:
90//!
91//! - the media is uploaded to the media homeserver, which returns the final MXC
92//!   ID.
93//! - when marking the upload request as sent, the MXC ID is injected (as a
94//!   [`matrix_sdk_base::store::SentRequestKey`]) into the dependent request
95//!   [`DependentQueuedRequestKind::FinishUpload`] created in the last step
96//!   above.
97//! - next time the send queue handles dependent queries, it'll see this one is
98//!   ready to be sent, and it will transform it into an event queued request
99//!   ([`QueuedRequestKind::Event`]), with the event created in the local echo
100//!   before, updated with the MXC ID returned from the server.
101//! - this updated local echo is also propagated as an edit of the local echo to
102//!   observers, who get the final version with the final MXC IDs at this point
103//!   too.
104//! - then the event is sent normally, as any event sent with the send queue.
105//!
106//! When there is a thumbnail, things behave similarly, with some tweaks:
107//!
108//! - the thumbnail's content is also stored into the cache store immediately,
109//! - the thumbnail is sent first as an [`QueuedRequestKind::MediaUpload`]
110//!   request,
111//! - the file upload is pushed as a dependent request of kind
112//!   [`DependentQueuedRequestKind::UploadFileOrThumbnail`] (this variant keeps
113//!   the file's key used to look it up in the cache store).
114//! - the media event is then sent as a dependent request as described in the
115//!   previous section.
116//!
117//! What's expected to happen is thus the following:
118//!
119//! - After the thumbnail has been uploaded, the dependent query will retrieve
120//!   the final MXC ID returned by the homeserver for the thumbnail, and store
121//!   it into the [`QueuedRequestKind::MediaUpload`]'s `thumbnail_source` field,
122//!   allowing to remember the thumbnail MXC ID when it's time to finish the
123//!   upload later.
124//! - The dependent request is morphed into another
125//!   [`QueuedRequestKind::MediaUpload`], for the file itself.
126//!
127//! The rest of the process is then similar to that of uploading a file without
128//! a thumbnail. The only difference is that there's a thumbnail source (MXC ID)
129//! remembered and fixed up into the media event, just before sending it.
130
131use std::{
132    collections::{BTreeMap, HashMap},
133    str::FromStr as _,
134    sync::{
135        Arc, RwLock,
136        atomic::{AtomicBool, Ordering},
137    },
138};
139
140use eyeball::SharedObservable;
141#[cfg(feature = "e2e-encryption")]
142use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
143#[cfg(feature = "unstable-msc4274")]
144use matrix_sdk_base::store::FinishGalleryItemInfo;
145use matrix_sdk_base::{
146    RoomState, StoreError,
147    cross_process_lock::CrossProcessLockError,
148    event_cache::store::EventCacheStoreError,
149    media::{MediaRequestParameters, store::MediaStoreError},
150    store::{
151        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
152        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
153        SentMediaInfo, SentRequestKey, SerializableEventContent,
154    },
155};
156use matrix_sdk_common::{
157    executor::{JoinHandle, spawn},
158    locks::Mutex as SyncMutex,
159};
160use mime::Mime;
161use ruma::{
162    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
163    TransactionId,
164    events::{
165        AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
166        reaction::ReactionEventContent,
167        relation::Annotation,
168        room::{
169            MediaSource,
170            message::{FormattedBody, RoomMessageEventContent},
171        },
172    },
173    serde::Raw,
174};
175use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
176use tracing::{debug, error, info, instrument, trace, warn};
177
178use crate::{
179    Client, Media, Room, TransmissionProgress,
180    client::WeakClient,
181    config::RequestConfig,
182    error::RetryKind,
183    room::{WeakRoom, edit::EditedContent},
184};
185
186mod progress;
187mod upload;
188
189pub use progress::AbstractProgress;
190
191/// A client-wide send queue, for all the rooms known by a client.
192pub struct SendQueue {
193    client: Client,
194}
195
196#[cfg(not(tarpaulin_include))]
197impl std::fmt::Debug for SendQueue {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        f.debug_struct("SendQueue").finish_non_exhaustive()
200    }
201}
202
203impl SendQueue {
204    pub(super) fn new(client: Client) -> Self {
205        Self { client }
206    }
207
208    /// Reload all the rooms which had unsent requests, and respawn tasks for
209    /// those rooms.
210    pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
211        if !self.is_enabled() {
212            return;
213        }
214
215        let room_ids =
216            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
217                |err| {
218                    warn!("error when loading rooms with unsent requests: {err}");
219                    Vec::new()
220                },
221            );
222
223        // Getting the [`RoomSendQueue`] is sufficient to spawn the task if needs be.
224        for room_id in room_ids {
225            if let Some(room) = self.client.get_room(&room_id) {
226                let _ = self.for_room(room);
227            }
228        }
229    }
230
231    /// Tiny helper to get the send queue's global context from the [`Client`].
232    #[inline(always)]
233    fn data(&self) -> &SendQueueData {
234        &self.client.inner.send_queue_data
235    }
236
237    /// Get or create a new send queue for a given room, and insert it into our
238    /// memoized rooms mapping.
239    pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
240        let data = self.data();
241
242        let mut map = data.rooms.write().unwrap();
243
244        let room_id = room.room_id();
245        if let Some(room_q) = map.get(room_id).cloned() {
246            return room_q;
247        }
248
249        let owned_room_id = room_id.to_owned();
250        let room_q = RoomSendQueue::new(
251            self.is_enabled(),
252            data.global_update_sender.clone(),
253            data.error_sender.clone(),
254            data.is_dropping.clone(),
255            &self.client,
256            owned_room_id.clone(),
257            data.report_media_upload_progress.clone(),
258        );
259
260        map.insert(owned_room_id, room_q.clone());
261
262        room_q
263    }
264
265    /// Enable or disable the send queue for the entire client, i.e. all rooms.
266    ///
267    /// If we're disabling the queue, and requests were being sent, they're not
268    /// aborted, and will continue until a status resolves (error responses
269    /// will keep the events in the buffer of events to send later). The
270    /// disablement will happen before the next request is sent.
271    ///
272    /// This may wake up background tasks and resume sending of requests in the
273    /// background.
274    pub async fn set_enabled(&self, enabled: bool) {
275        debug!(?enabled, "setting global send queue enablement");
276
277        self.data().globally_enabled.store(enabled, Ordering::SeqCst);
278
279        // Wake up individual rooms we already know about.
280        for room in self.data().rooms.read().unwrap().values() {
281            room.set_enabled(enabled);
282        }
283
284        // Reload some extra rooms that might not have been awaken yet, but could have
285        // requests from previous sessions.
286        self.respawn_tasks_for_rooms_with_unsent_requests().await;
287    }
288
289    /// Returns whether the send queue is enabled, at a client-wide
290    /// granularity.
291    pub fn is_enabled(&self) -> bool {
292        self.data().globally_enabled.load(Ordering::SeqCst)
293    }
294
295    /// Enable or disable progress reporting for media uploads.
296    pub fn enable_upload_progress(&self, enabled: bool) {
297        self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
298    }
299
300    /// Subscribe to all updates for all rooms.
301    ///
302    /// Use [`RoomSendQueue::subscribe`] to subscribe to update for a _specific
303    /// room_.
304    pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
305        self.data().global_update_sender.subscribe()
306    }
307
308    /// Get local echoes from all room send queues.
309    pub async fn local_echoes(
310        &self,
311    ) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
312        let room_ids =
313            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
314                |err| {
315                    warn!("error when loading rooms with unsent requests: {err}");
316                    Vec::new()
317                },
318            );
319
320        let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
321
322        for room_id in room_ids {
323            if let Some(room) = self.client.get_room(&room_id) {
324                let queue = self.for_room(room);
325                local_echoes
326                    .insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
327            }
328        }
329
330        Ok(local_echoes)
331    }
332
333    /// A subscriber to the enablement status (enabled or disabled) of the
334    /// send queue, along with useful errors.
335    pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
336        self.data().error_sender.subscribe()
337    }
338}
339
340/// Metadata about a thumbnail needed when pushing media uploads to the send
341/// queue.
342#[derive(Clone, Debug)]
343struct QueueThumbnailInfo {
344    /// Metadata about the thumbnail needed when finishing a media upload.
345    finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
346
347    /// The parameters for the request to retrieve the thumbnail data.
348    media_request_parameters: MediaRequestParameters,
349
350    /// The thumbnail's mime type.
351    content_type: Mime,
352
353    /// The thumbnail's file size in bytes.
354    file_size: usize,
355}
356
357/// A specific room's send queue ran into an error, and it has disabled itself.
358#[derive(Clone, Debug)]
359pub struct SendQueueRoomError {
360    /// For which room is the send queue failing?
361    pub room_id: OwnedRoomId,
362
363    /// The error the room has ran into, when trying to send a request.
364    pub error: Arc<crate::Error>,
365
366    /// Whether the error is considered recoverable or not.
367    ///
368    /// An error that's recoverable will disable the room's send queue, while an
369    /// unrecoverable error will be parked, until the user decides to do
370    /// something about it.
371    pub is_recoverable: bool,
372}
373
374impl Client {
375    /// Returns a [`SendQueue`] that handles sending, retrying and not
376    /// forgetting about requests that are to be sent.
377    pub fn send_queue(&self) -> SendQueue {
378        SendQueue::new(self.clone())
379    }
380}
381
382pub(super) struct SendQueueData {
383    /// Mapping of room to their unique send queue.
384    rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
385
386    /// Is the whole mechanism enabled or disabled?
387    ///
388    /// This is only kept in memory to initialize new room queues with an
389    /// initial enablement state.
390    globally_enabled: AtomicBool,
391
392    /// Global sender to send [`SendQueueUpdate`].
393    ///
394    /// See [`SendQueue::subscribe`].
395    global_update_sender: broadcast::Sender<SendQueueUpdate>,
396
397    /// Global error updates for the send queue.
398    error_sender: broadcast::Sender<SendQueueRoomError>,
399
400    /// Are we currently dropping the Client?
401    is_dropping: Arc<AtomicBool>,
402
403    /// Will media upload progress be reported via send queue updates?
404    report_media_upload_progress: Arc<AtomicBool>,
405}
406
407impl SendQueueData {
408    /// Create the data for a send queue, in the given enabled state.
409    pub fn new(globally_enabled: bool) -> Self {
410        let (global_update_sender, _) = broadcast::channel(32);
411        let (error_sender, _) = broadcast::channel(32);
412
413        Self {
414            rooms: Default::default(),
415            globally_enabled: AtomicBool::new(globally_enabled),
416            global_update_sender,
417            error_sender,
418            is_dropping: Arc::new(false.into()),
419            report_media_upload_progress: Arc::new(false.into()),
420        }
421    }
422}
423
424impl Drop for SendQueueData {
425    fn drop(&mut self) {
426        // Mark the whole send queue as shutting down, then wake up all the room
427        // queues so they're stopped too.
428        debug!("globally dropping the send queue");
429        self.is_dropping.store(true, Ordering::SeqCst);
430
431        let rooms = self.rooms.read().unwrap();
432        for room in rooms.values() {
433            room.inner.notifier.notify_one();
434        }
435    }
436}
437
438impl Room {
439    /// Returns the [`RoomSendQueue`] for this specific room.
440    pub fn send_queue(&self) -> RoomSendQueue {
441        self.client.send_queue().for_room(self.clone())
442    }
443}
444
445/// A per-room send queue.
446///
447/// This is cheap to clone.
448#[derive(Clone)]
449pub struct RoomSendQueue {
450    inner: Arc<RoomSendQueueInner>,
451}
452
453#[cfg(not(tarpaulin_include))]
454impl std::fmt::Debug for RoomSendQueue {
455    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456        f.debug_struct("RoomSendQueue").finish_non_exhaustive()
457    }
458}
459
460impl RoomSendQueue {
461    fn new(
462        globally_enabled: bool,
463        global_update_sender: broadcast::Sender<SendQueueUpdate>,
464        global_error_sender: broadcast::Sender<SendQueueRoomError>,
465        is_dropping: Arc<AtomicBool>,
466        client: &Client,
467        room_id: OwnedRoomId,
468        report_media_upload_progress: Arc<AtomicBool>,
469    ) -> Self {
470        let (update_sender, _) = broadcast::channel(32);
471
472        let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
473        let notifier = Arc::new(Notify::new());
474
475        let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
476        let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
477
478        let task = spawn(Self::sending_task(
479            weak_room.clone(),
480            queue.clone(),
481            notifier.clone(),
482            global_update_sender.clone(),
483            update_sender.clone(),
484            locally_enabled.clone(),
485            global_error_sender,
486            is_dropping,
487            report_media_upload_progress,
488        ));
489
490        Self {
491            inner: Arc::new(RoomSendQueueInner {
492                room: weak_room,
493                global_update_sender,
494                update_sender,
495                _task: task,
496                queue,
497                notifier,
498                locally_enabled,
499            }),
500        }
501    }
502
503    /// Queues a raw event for sending it to this room.
504    ///
505    /// This immediately returns, and will push the event to be sent into a
506    /// queue, handled in the background.
507    ///
508    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
509    /// the [`Self::subscribe()`] method to get updates about the sending of
510    /// that event.
511    ///
512    /// By default, if sending failed on the first attempt, it will be retried a
513    /// few times. If sending failed after those retries, the entire
514    /// client's sending queue will be disabled, and it will need to be
515    /// manually re-enabled by the caller (e.g. after network is back, or when
516    /// something has been done about the faulty requests).
517    pub async fn send_raw(
518        &self,
519        content: Raw<AnyMessageLikeEventContent>,
520        event_type: String,
521    ) -> Result<SendHandle, RoomSendQueueError> {
522        let Some(room) = self.inner.room.get() else {
523            return Err(RoomSendQueueError::RoomDisappeared);
524        };
525        if room.state() != RoomState::Joined {
526            return Err(RoomSendQueueError::RoomNotJoined);
527        }
528
529        let content = SerializableEventContent::from_raw(content, event_type);
530
531        let created_at = MilliSecondsSinceUnixEpoch::now();
532        let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
533        trace!(%transaction_id, "manager sends a raw event to the background task");
534
535        self.inner.notifier.notify_one();
536
537        let send_handle = SendHandle {
538            room: self.clone(),
539            transaction_id: transaction_id.clone(),
540            media_handles: vec![],
541            created_at,
542        };
543
544        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
545            transaction_id,
546            content: LocalEchoContent::Event {
547                serialized_event: content,
548                send_handle: send_handle.clone(),
549                send_error: None,
550            },
551        }));
552
553        Ok(send_handle)
554    }
555
556    /// Queues an event for sending it to this room.
557    ///
558    /// This immediately returns, and will push the event to be sent into a
559    /// queue, handled in the background.
560    ///
561    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
562    /// the [`Self::subscribe()`] method to get updates about the sending of
563    /// that event.
564    ///
565    /// By default, if sending failed on the first attempt, it will be retried a
566    /// few times. If sending failed after those retries, the entire
567    /// client's sending queue will be disabled, and it will need to be
568    /// manually re-enabled by the caller (e.g. after network is back, or when
569    /// something has been done about the faulty requests).
570    pub async fn send(
571        &self,
572        content: AnyMessageLikeEventContent,
573    ) -> Result<SendHandle, RoomSendQueueError> {
574        self.send_raw(
575            Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
576            content.event_type().to_string(),
577        )
578        .await
579    }
580
581    /// Returns the current local requests as well as a receiver to listen to
582    /// the send queue updates, as defined in [`RoomSendQueueUpdate`].
583    ///
584    /// Use [`SendQueue::subscribe`] to subscribe to update for _all rooms_ with
585    /// a single receiver.
586    pub async fn subscribe(
587        &self,
588    ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
589    {
590        let local_echoes = self.inner.queue.local_echoes(self).await?;
591
592        Ok((local_echoes, self.inner.update_sender.subscribe()))
593    }
594
595    /// A task that must be spawned in the async runtime, running in the
596    /// background for each room that has a send queue.
597    ///
598    /// It only progresses forward: nothing can be cancelled at any point, which
599    /// makes the implementation not overly complicated to follow.
600    #[allow(clippy::too_many_arguments)]
601    #[instrument(skip_all, fields(room_id = %room.room_id()))]
602    async fn sending_task(
603        room: WeakRoom,
604        queue: QueueStorage,
605        notifier: Arc<Notify>,
606        global_update_sender: broadcast::Sender<SendQueueUpdate>,
607        update_sender: broadcast::Sender<RoomSendQueueUpdate>,
608        locally_enabled: Arc<AtomicBool>,
609        global_error_sender: broadcast::Sender<SendQueueRoomError>,
610        is_dropping: Arc<AtomicBool>,
611        report_media_upload_progress: Arc<AtomicBool>,
612    ) {
613        trace!("spawned the sending task");
614
615        let room_id = room.room_id();
616
617        loop {
618            // A request to shut down should be preferred above everything else.
619            if is_dropping.load(Ordering::SeqCst) {
620                trace!("shutting down!");
621                break;
622            }
623
624            // Try to apply dependent requests now; those applying to previously failed
625            // attempts (local echoes) would succeed now.
626            let mut new_updates = Vec::new();
627            if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
628                warn!("errors when applying dependent requests: {err}");
629            }
630
631            for up in new_updates {
632                send_update(&global_update_sender, &update_sender, room_id, up);
633            }
634
635            if !locally_enabled.load(Ordering::SeqCst) {
636                trace!("not enabled, sleeping");
637                // Wait for an explicit wakeup.
638                notifier.notified().await;
639                continue;
640            }
641
642            let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
643                Ok(Some(request)) => request,
644
645                Ok(None) => {
646                    trace!("queue is empty, sleeping");
647                    // Wait for an explicit wakeup.
648                    notifier.notified().await;
649                    continue;
650                }
651
652                Err(err) => {
653                    warn!("error when loading next request to send: {err}");
654                    continue;
655                }
656            };
657
658            let txn_id = queued_request.transaction_id.clone();
659            trace!(txn_id = %txn_id, "received a request to send!");
660
661            let Some(room) = room.get() else {
662                if is_dropping.load(Ordering::SeqCst) {
663                    break;
664                }
665                error!("the weak room couldn't be upgraded but we're not shutting down?");
666                continue;
667            };
668
669            // If this is a media/gallery upload, prepare the following:
670            // - transaction id for the related media event request,
671            // - progress metadata to feed the final media upload progress
672            // - an observable to watch the media upload progress.
673            let (related_txn_id, media_upload_progress_info, http_progress) =
674                if let QueuedRequestKind::MediaUpload {
675                    cache_key,
676                    thumbnail_source,
677                    #[cfg(feature = "unstable-msc4274")]
678                    accumulated,
679                    related_to,
680                    ..
681                } = &queued_request.kind
682                {
683                    // Prepare to watch and communicate the request's progress for media uploads, if
684                    // it has been requested.
685                    let (media_upload_progress_info, http_progress) =
686                        if report_media_upload_progress.load(Ordering::SeqCst) {
687                            let media_upload_progress_info =
688                                RoomSendQueue::create_media_upload_progress_info(
689                                    &queued_request.transaction_id,
690                                    related_to,
691                                    cache_key,
692                                    thumbnail_source.as_ref(),
693                                    #[cfg(feature = "unstable-msc4274")]
694                                    accumulated,
695                                    &room,
696                                    &queue,
697                                )
698                                .await;
699
700                            let progress = RoomSendQueue::create_media_upload_progress_observable(
701                                &media_upload_progress_info,
702                                related_to,
703                                &update_sender,
704                            );
705
706                            (Some(media_upload_progress_info), Some(progress))
707                        } else {
708                            Default::default()
709                        };
710
711                    (Some(related_to.clone()), media_upload_progress_info, http_progress)
712                } else {
713                    Default::default()
714                };
715
716            match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
717            {
718                Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
719                {
720                    Ok(()) => match parent_key {
721                        SentRequestKey::Event(event_id) => {
722                            send_update(
723                                &global_update_sender,
724                                &update_sender,
725                                room_id,
726                                RoomSendQueueUpdate::SentEvent { transaction_id: txn_id, event_id },
727                            );
728                        }
729
730                        SentRequestKey::Media(sent_media_info) => {
731                            // Generate some final progress information, even if incremental
732                            // progress wasn't requested.
733                            let index =
734                                media_upload_progress_info.as_ref().map_or(0, |info| info.index);
735                            let progress = media_upload_progress_info
736                                .as_ref()
737                                .map(|info| {
738                                    AbstractProgress { current: info.bytes, total: info.bytes }
739                                        + info.offsets
740                                })
741                                .unwrap_or(AbstractProgress { current: 1, total: 1 });
742
743                            // Purposefully don't use `send_update` here, because we don't want to
744                            // notify the global listeners about an upload progress update.
745                            let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
746                                related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
747                                file: Some(sent_media_info.file),
748                                index,
749                                progress,
750                            });
751                        }
752                    },
753
754                    Err(err) => {
755                        warn!("unable to mark queued request as sent: {err}");
756                    }
757                },
758
759                Ok(None) => {
760                    debug!("Request has been aborted while running, continuing.");
761                }
762
763                Err(err) => {
764                    let is_recoverable = match err {
765                        crate::Error::Http(ref http_err) => {
766                            // All transient errors are recoverable.
767                            matches!(
768                                http_err.retry_kind(),
769                                RetryKind::Transient { .. } | RetryKind::NetworkFailure
770                            )
771                        }
772
773                        // `ConcurrentRequestFailed` typically happens because of an HTTP failure;
774                        // since we don't get the underlying error, be lax and consider it
775                        // recoverable, and let observers decide to retry it or not. At some point
776                        // we'll get the actual underlying error.
777                        crate::Error::ConcurrentRequestFailed => true,
778
779                        // As of 2024-06-27, all other error types are considered unrecoverable.
780                        _ => false,
781                    };
782
783                    // Disable the queue for this room after any kind of error happened.
784                    locally_enabled.store(false, Ordering::SeqCst);
785
786                    if is_recoverable {
787                        warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
788
789                        // In this case, we intentionally keep the request in the queue, but mark it
790                        // as not being sent anymore.
791                        queue.mark_as_not_being_sent(&txn_id).await;
792
793                        // Let observers know about a failure *after* we've
794                        // marked the item as not being sent anymore. Otherwise,
795                        // there's a possible race where a caller might try to
796                        // remove an item, while it's still marked as being
797                        // sent, resulting in a cancellation failure.
798                    } else {
799                        warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
800
801                        // Mark the request as wedged, so it's not picked at any future point.
802                        if let Err(storage_error) =
803                            queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
804                        {
805                            warn!("unable to mark request as wedged: {storage_error}");
806                        }
807                    }
808
809                    let error = Arc::new(err);
810
811                    let _ = global_error_sender.send(SendQueueRoomError {
812                        room_id: room_id.to_owned(),
813                        error: error.clone(),
814                        is_recoverable,
815                    });
816
817                    send_update(
818                        &global_update_sender,
819                        &update_sender,
820                        room_id,
821                        RoomSendQueueUpdate::SendError {
822                            transaction_id: related_txn_id.unwrap_or(txn_id),
823                            error,
824                            is_recoverable,
825                        },
826                    );
827                }
828            }
829        }
830
831        info!("exited sending task");
832    }
833
834    /// Handles a single request and returns the [`SentRequestKey`] on success
835    /// (unless the request was cancelled, in which case it'll return
836    /// `None`).
837    async fn handle_request(
838        room: &Room,
839        request: QueuedRequest,
840        cancel_upload_rx: Option<oneshot::Receiver<()>>,
841        progress: Option<SharedObservable<TransmissionProgress>>,
842    ) -> Result<Option<SentRequestKey>, crate::Error> {
843        match request.kind {
844            QueuedRequestKind::Event { content } => {
845                let (event, event_type) = content.raw();
846
847                let res = room
848                    .send_raw(event_type, event)
849                    .with_transaction_id(&request.transaction_id)
850                    .with_request_config(RequestConfig::short_retry())
851                    .await?;
852
853                trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
854                Ok(Some(SentRequestKey::Event(res.event_id)))
855            }
856
857            QueuedRequestKind::MediaUpload {
858                content_type,
859                cache_key,
860                thumbnail_source,
861                related_to: relates_to,
862                #[cfg(feature = "unstable-msc4274")]
863                accumulated,
864            } => {
865                trace!(%relates_to, "uploading media related to event");
866
867                let fut = async move {
868                    let data = room
869                        .client()
870                        .media_store()
871                        .lock()
872                        .await?
873                        .get_media_content(&cache_key)
874                        .await?
875                        .ok_or(crate::Error::SendQueueWedgeError(Box::new(
876                            QueueWedgeError::MissingMediaContent,
877                        )))?;
878
879                    let mime = Mime::from_str(&content_type).map_err(|_| {
880                        crate::Error::SendQueueWedgeError(Box::new(
881                            QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
882                        ))
883                    })?;
884
885                    #[cfg(feature = "e2e-encryption")]
886                    let media_source = if room.latest_encryption_state().await?.is_encrypted() {
887                        trace!("upload will be encrypted (encrypted room)");
888
889                        let mut cursor = std::io::Cursor::new(data);
890                        let mut req = room
891                            .client
892                            .upload_encrypted_file(&mut cursor)
893                            .with_request_config(RequestConfig::short_retry());
894                        if let Some(progress) = progress {
895                            req = req.with_send_progress_observable(progress);
896                        }
897                        let encrypted_file = req.await?;
898
899                        MediaSource::Encrypted(Box::new(encrypted_file))
900                    } else {
901                        trace!("upload will be in clear text (room without encryption)");
902
903                        let request_config = RequestConfig::short_retry()
904                            .timeout(Media::reasonable_upload_timeout(&data));
905                        let mut req =
906                            room.client().media().upload(&mime, data, Some(request_config));
907                        if let Some(progress) = progress {
908                            req = req.with_send_progress_observable(progress);
909                        }
910                        let res = req.await?;
911
912                        MediaSource::Plain(res.content_uri)
913                    };
914
915                    #[cfg(not(feature = "e2e-encryption"))]
916                    let media_source = {
917                        let request_config = RequestConfig::short_retry()
918                            .timeout(Media::reasonable_upload_timeout(&data));
919                        let mut req =
920                            room.client().media().upload(&mime, data, Some(request_config));
921                        if let Some(progress) = progress {
922                            req = req.with_send_progress_observable(progress);
923                        }
924                        let res = req.await?;
925                        MediaSource::Plain(res.content_uri)
926                    };
927
928                    let uri = match &media_source {
929                        MediaSource::Plain(uri) => uri,
930                        MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
931                    };
932                    trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
933
934                    Ok(SentRequestKey::Media(SentMediaInfo {
935                        file: media_source,
936                        thumbnail: thumbnail_source,
937                        #[cfg(feature = "unstable-msc4274")]
938                        accumulated,
939                    }))
940                };
941
942                let wait_for_cancel = async move {
943                    if let Some(rx) = cancel_upload_rx {
944                        rx.await
945                    } else {
946                        std::future::pending().await
947                    }
948                };
949
950                tokio::select! {
951                    biased;
952
953                    _ = wait_for_cancel => {
954                        Ok(None)
955                    }
956
957                    res = fut => {
958                        res.map(Some)
959                    }
960                }
961            }
962        }
963    }
964
965    /// Returns whether the room is enabled, at the room level.
966    pub fn is_enabled(&self) -> bool {
967        self.inner.locally_enabled.load(Ordering::SeqCst)
968    }
969
970    /// Set the locally enabled flag for this room queue.
971    pub fn set_enabled(&self, enabled: bool) {
972        self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
973
974        // No need to wake a task to tell it it's been disabled, so only notify if we're
975        // re-enabling the queue.
976        if enabled {
977            self.inner.notifier.notify_one();
978        }
979    }
980
981    /// Send an update on the room send queue channel, and on the global send
982    /// queue channel, i.e. it sends a [`RoomSendQueueUpdate`] and a
983    /// [`SendQueueUpdate`].
984    fn send_update(&self, update: RoomSendQueueUpdate) {
985        let _ = self.inner.update_sender.send(update.clone());
986        let _ = self
987            .inner
988            .global_update_sender
989            .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
990    }
991}
992
993fn send_update(
994    global_update_sender: &broadcast::Sender<SendQueueUpdate>,
995    update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
996    room_id: &RoomId,
997    update: RoomSendQueueUpdate,
998) {
999    let _ = update_sender.send(update.clone());
1000    let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
1001}
1002
1003impl From<&crate::Error> for QueueWedgeError {
1004    fn from(value: &crate::Error) -> Self {
1005        match value {
1006            #[cfg(feature = "e2e-encryption")]
1007            crate::Error::OlmError(error) => match &**error {
1008                OlmError::SessionRecipientCollectionError(error) => match error {
1009                    SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
1010                        QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
1011                    }
1012
1013                    SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
1014                        QueueWedgeError::IdentityViolations { users: users.clone() }
1015                    }
1016
1017                    SessionRecipientCollectionError::CrossSigningNotSetup
1018                    | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
1019                        QueueWedgeError::CrossVerificationRequired
1020                    }
1021                },
1022                _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1023            },
1024
1025            // Flatten errors of `Self` type.
1026            crate::Error::SendQueueWedgeError(error) => *error.clone(),
1027
1028            _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1029        }
1030    }
1031}
1032
1033struct RoomSendQueueInner {
1034    /// The room which this send queue relates to.
1035    room: WeakRoom,
1036
1037    /// Global sender to send [`SendQueueUpdate`].
1038    ///
1039    /// See [`SendQueue::subscribe`].
1040    global_update_sender: broadcast::Sender<SendQueueUpdate>,
1041
1042    /// Broadcaster for notifications about the statuses of requests to be sent.
1043    ///
1044    /// Can be subscribed to from the outside.
1045    ///
1046    /// See [`RoomSendQueue::subscribe`].
1047    update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1048
1049    /// Queue of requests that are either to be sent, or being sent.
1050    ///
1051    /// When a request has been sent to the server, it is removed from that
1052    /// queue *after* being sent. That way, we will retry sending upon
1053    /// failure, in the same order requests have been inserted in the first
1054    /// place.
1055    queue: QueueStorage,
1056
1057    /// A notifier that's updated any time common data is touched (stopped or
1058    /// enabled statuses), or the associated room [`QueueStorage`].
1059    notifier: Arc<Notify>,
1060
1061    /// Should the room process new requests or not (because e.g. it might be
1062    /// running off the network)?
1063    locally_enabled: Arc<AtomicBool>,
1064
1065    /// Handle to the actual sending task. Unused, but kept alive along this
1066    /// data structure.
1067    _task: JoinHandle<()>,
1068}
1069
1070/// Information about a request being sent right this moment.
1071struct BeingSentInfo {
1072    /// Transaction id of the thing being sent.
1073    transaction_id: OwnedTransactionId,
1074
1075    /// For an upload request, a trigger to cancel the upload before it
1076    /// completes.
1077    cancel_upload: Option<oneshot::Sender<()>>,
1078}
1079
1080impl BeingSentInfo {
1081    /// Aborts the upload, if a trigger is available.
1082    ///
1083    /// Consumes the object because the sender is a oneshot and will be consumed
1084    /// upon sending.
1085    fn cancel_upload(self) -> bool {
1086        if let Some(cancel_upload) = self.cancel_upload {
1087            let _ = cancel_upload.send(());
1088            true
1089        } else {
1090            false
1091        }
1092    }
1093}
1094
1095/// A specialized lock that guards both against the state store and the
1096/// [`Self::being_sent`] data.
1097#[derive(Clone)]
1098struct StoreLock {
1099    /// Reference to the client, to get access to the underlying store.
1100    client: WeakClient,
1101
1102    /// The one queued request that is being sent at the moment, along with
1103    /// associated data that can be useful to act upon it.
1104    ///
1105    /// Also used as the lock to access the state store.
1106    being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1107}
1108
1109impl StoreLock {
1110    /// Gets a hold of the locked store and [`Self::being_sent`] pair.
1111    async fn lock(&self) -> StoreLockGuard {
1112        StoreLockGuard {
1113            client: self.client.clone(),
1114            being_sent: self.being_sent.clone().lock_owned().await,
1115        }
1116    }
1117}
1118
1119/// A lock guard obtained through locking with [`StoreLock`].
1120/// `being_sent` data.
1121struct StoreLockGuard {
1122    /// Reference to the client, to get access to the underlying store.
1123    client: WeakClient,
1124
1125    /// The one queued request that is being sent at the moment, along with
1126    /// associated data that can be useful to act upon it.
1127    being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1128}
1129
1130impl StoreLockGuard {
1131    /// Get a client from the locked state, useful to get a handle on a store.
1132    fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1133        self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1134    }
1135}
1136
1137#[derive(Clone)]
1138struct QueueStorage {
1139    /// A lock to make sure the state store is only accessed once at a time, to
1140    /// make some store operations atomic.
1141    store: StoreLock,
1142
1143    /// To which room is this storage related.
1144    room_id: OwnedRoomId,
1145
1146    /// In-memory mapping of media transaction IDs to thumbnail sizes for the
1147    /// purpose of progress reporting.
1148    ///
1149    /// The keys are the transaction IDs for sending the media or gallery event
1150    /// after all uploads have finished. This allows us to easily clean up the
1151    /// cache after the event was sent.
1152    ///
1153    /// For media uploads, the value vector will always have a single element.
1154    ///
1155    /// For galleries, some gallery items might not have a thumbnail while
1156    /// others do. Since we access the thumbnails by their index within the
1157    /// gallery, the vector needs to hold optional usize's.
1158    thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1159}
1160
1161impl QueueStorage {
1162    /// Default priority for a queued request.
1163    const LOW_PRIORITY: usize = 0;
1164
1165    /// High priority for a queued request that must be handled before others.
1166    const HIGH_PRIORITY: usize = 10;
1167
1168    /// Create a new queue for queuing requests to be sent later.
1169    fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1170        Self {
1171            room_id: room,
1172            store: StoreLock { client, being_sent: Default::default() },
1173            thumbnail_file_sizes: Default::default(),
1174        }
1175    }
1176
1177    /// Push a new event to be sent in the queue, with a default priority of 0.
1178    ///
1179    /// Returns the transaction id chosen to identify the request.
1180    async fn push(
1181        &self,
1182        request: QueuedRequestKind,
1183        created_at: MilliSecondsSinceUnixEpoch,
1184    ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1185        let transaction_id = TransactionId::new();
1186
1187        self.store
1188            .lock()
1189            .await
1190            .client()?
1191            .state_store()
1192            .save_send_queue_request(
1193                &self.room_id,
1194                transaction_id.clone(),
1195                created_at,
1196                request,
1197                Self::LOW_PRIORITY,
1198            )
1199            .await?;
1200
1201        Ok(transaction_id)
1202    }
1203
1204    /// Peeks the next request to be sent, marking it as being sent.
1205    ///
1206    /// It is required to call [`Self::mark_as_sent`] after it's been
1207    /// effectively sent.
1208    async fn peek_next_to_send(
1209        &self,
1210    ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1211    {
1212        let mut guard = self.store.lock().await;
1213        let queued_requests =
1214            guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1215
1216        if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1217            let (cancel_upload_tx, cancel_upload_rx) =
1218                if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1219                    let (tx, rx) = oneshot::channel();
1220                    (Some(tx), Some(rx))
1221                } else {
1222                    Default::default()
1223                };
1224
1225            let prev = guard.being_sent.replace(BeingSentInfo {
1226                transaction_id: request.transaction_id.clone(),
1227                cancel_upload: cancel_upload_tx,
1228            });
1229
1230            if let Some(prev) = prev {
1231                error!(
1232                    prev_txn = ?prev.transaction_id,
1233                    "a previous request was still active while picking a new one"
1234                );
1235            }
1236
1237            Ok(Some((request.clone(), cancel_upload_rx)))
1238        } else {
1239            Ok(None)
1240        }
1241    }
1242
1243    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1244    /// with the given transaction id as not being sent anymore, so it can
1245    /// be removed from the queue later.
1246    async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1247        let was_being_sent = self.store.lock().await.being_sent.take();
1248
1249        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1250        if prev_txn != Some(transaction_id) {
1251            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1252        }
1253    }
1254
1255    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1256    /// with the given transaction id as being wedged (and not being sent
1257    /// anymore), so it can be removed from the queue later.
1258    async fn mark_as_wedged(
1259        &self,
1260        transaction_id: &TransactionId,
1261        reason: QueueWedgeError,
1262    ) -> Result<(), RoomSendQueueStorageError> {
1263        // Keep the lock until we're done touching the storage.
1264        let mut guard = self.store.lock().await;
1265        let was_being_sent = guard.being_sent.take();
1266
1267        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1268        if prev_txn != Some(transaction_id) {
1269            error!(
1270                ?prev_txn,
1271                "previous active request didn't match that we expect (after permanent error)",
1272            );
1273        }
1274
1275        Ok(guard
1276            .client()?
1277            .state_store()
1278            .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1279            .await?)
1280    }
1281
1282    /// Marks a request identified with the given transaction id as being now
1283    /// unwedged and adds it back to the queue.
1284    async fn mark_as_unwedged(
1285        &self,
1286        transaction_id: &TransactionId,
1287    ) -> Result<(), RoomSendQueueStorageError> {
1288        Ok(self
1289            .store
1290            .lock()
1291            .await
1292            .client()?
1293            .state_store()
1294            .update_send_queue_request_status(&self.room_id, transaction_id, None)
1295            .await?)
1296    }
1297
1298    /// Marks a request pushed with [`Self::push`] and identified with the given
1299    /// transaction id as sent, by removing it from the local queue.
1300    async fn mark_as_sent(
1301        &self,
1302        transaction_id: &TransactionId,
1303        parent_key: SentRequestKey,
1304    ) -> Result<(), RoomSendQueueStorageError> {
1305        // Keep the lock until we're done touching the storage.
1306        let mut guard = self.store.lock().await;
1307        let was_being_sent = guard.being_sent.take();
1308
1309        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1310        if prev_txn != Some(transaction_id) {
1311            error!(
1312                ?prev_txn,
1313                "previous active request didn't match that we expect (after successful send)",
1314            );
1315        }
1316
1317        let client = guard.client()?;
1318        let store = client.state_store();
1319
1320        // Update all dependent requests.
1321        store
1322            .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1323            .await?;
1324
1325        let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1326
1327        if !removed {
1328            warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1329        }
1330
1331        self.thumbnail_file_sizes.lock().remove(transaction_id);
1332
1333        Ok(())
1334    }
1335
1336    /// Cancel a sending command for an event that has been sent with
1337    /// [`Self::push`] with the given transaction id.
1338    ///
1339    /// Returns whether the given transaction has been effectively removed. If
1340    /// false, this either means that the transaction id was unrelated to
1341    /// this queue, or that the request was sent before we cancelled it.
1342    async fn cancel_event(
1343        &self,
1344        transaction_id: &TransactionId,
1345    ) -> Result<bool, RoomSendQueueStorageError> {
1346        let guard = self.store.lock().await;
1347
1348        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1349            == Some(transaction_id)
1350        {
1351            // Save the intent to redact the event.
1352            guard
1353                .client()?
1354                .state_store()
1355                .save_dependent_queued_request(
1356                    &self.room_id,
1357                    transaction_id,
1358                    ChildTransactionId::new(),
1359                    MilliSecondsSinceUnixEpoch::now(),
1360                    DependentQueuedRequestKind::RedactEvent,
1361                )
1362                .await?;
1363
1364            return Ok(true);
1365        }
1366
1367        let removed = guard
1368            .client()?
1369            .state_store()
1370            .remove_send_queue_request(&self.room_id, transaction_id)
1371            .await?;
1372
1373        self.thumbnail_file_sizes.lock().remove(transaction_id);
1374
1375        Ok(removed)
1376    }
1377
1378    /// Replace an event that has been sent with [`Self::push`] with the given
1379    /// transaction id, before it's been actually sent.
1380    ///
1381    /// Returns whether the given transaction has been effectively edited. If
1382    /// false, this either means that the transaction id was unrelated to
1383    /// this queue, or that the request was sent before we edited it.
1384    async fn replace_event(
1385        &self,
1386        transaction_id: &TransactionId,
1387        serializable: SerializableEventContent,
1388    ) -> Result<bool, RoomSendQueueStorageError> {
1389        let guard = self.store.lock().await;
1390
1391        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1392            == Some(transaction_id)
1393        {
1394            // Save the intent to edit the associated event.
1395            guard
1396                .client()?
1397                .state_store()
1398                .save_dependent_queued_request(
1399                    &self.room_id,
1400                    transaction_id,
1401                    ChildTransactionId::new(),
1402                    MilliSecondsSinceUnixEpoch::now(),
1403                    DependentQueuedRequestKind::EditEvent { new_content: serializable },
1404                )
1405                .await?;
1406
1407            return Ok(true);
1408        }
1409
1410        let edited = guard
1411            .client()?
1412            .state_store()
1413            .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1414            .await?;
1415
1416        Ok(edited)
1417    }
1418
1419    /// Push requests (and dependents) to upload a media.
1420    ///
1421    /// See the module-level description for details of the whole processus.
1422    #[allow(clippy::too_many_arguments)]
1423    async fn push_media(
1424        &self,
1425        event: RoomMessageEventContent,
1426        content_type: Mime,
1427        send_event_txn: OwnedTransactionId,
1428        created_at: MilliSecondsSinceUnixEpoch,
1429        upload_file_txn: OwnedTransactionId,
1430        file_media_request: MediaRequestParameters,
1431        thumbnail: Option<QueueThumbnailInfo>,
1432    ) -> Result<(), RoomSendQueueStorageError> {
1433        let guard = self.store.lock().await;
1434        let client = guard.client()?;
1435        let store = client.state_store();
1436
1437        // There's only a single media to be sent, so it has at most one thumbnail.
1438        let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1439
1440        let thumbnail_info = self
1441            .push_thumbnail_and_media_uploads(
1442                store,
1443                &content_type,
1444                send_event_txn.clone(),
1445                created_at,
1446                upload_file_txn.clone(),
1447                file_media_request,
1448                thumbnail,
1449            )
1450            .await?;
1451
1452        // Push the dependent request for the event itself.
1453        store
1454            .save_dependent_queued_request(
1455                &self.room_id,
1456                &upload_file_txn,
1457                send_event_txn.clone().into(),
1458                created_at,
1459                DependentQueuedRequestKind::FinishUpload {
1460                    local_echo: Box::new(event),
1461                    file_upload: upload_file_txn.clone(),
1462                    thumbnail_info,
1463                },
1464            )
1465            .await?;
1466
1467        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1468
1469        Ok(())
1470    }
1471
1472    /// Push requests (and dependents) to upload a gallery.
1473    ///
1474    /// See the module-level description for details of the whole processus.
1475    #[cfg(feature = "unstable-msc4274")]
1476    #[allow(clippy::too_many_arguments)]
1477    async fn push_gallery(
1478        &self,
1479        event: RoomMessageEventContent,
1480        send_event_txn: OwnedTransactionId,
1481        created_at: MilliSecondsSinceUnixEpoch,
1482        item_queue_infos: Vec<GalleryItemQueueInfo>,
1483    ) -> Result<(), RoomSendQueueStorageError> {
1484        let guard = self.store.lock().await;
1485        let client = guard.client()?;
1486        let store = client.state_store();
1487
1488        let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1489        let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1490
1491        let Some((first, rest)) = item_queue_infos.split_first() else {
1492            return Ok(());
1493        };
1494
1495        let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1496            first;
1497
1498        let thumbnail_info = self
1499            .push_thumbnail_and_media_uploads(
1500                store,
1501                content_type,
1502                send_event_txn.clone(),
1503                created_at,
1504                upload_file_txn.clone(),
1505                file_media_request.clone(),
1506                thumbnail.clone(),
1507            )
1508            .await?;
1509
1510        finish_item_infos
1511            .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1512        thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1513
1514        let mut last_upload_file_txn = upload_file_txn.clone();
1515
1516        for item_queue_info in rest {
1517            let GalleryItemQueueInfo {
1518                content_type,
1519                upload_file_txn,
1520                file_media_request,
1521                thumbnail,
1522            } = item_queue_info;
1523
1524            let thumbnail_info = if let Some(QueueThumbnailInfo {
1525                finish_upload_thumbnail_info: thumbnail_info,
1526                media_request_parameters: thumbnail_media_request,
1527                content_type: thumbnail_content_type,
1528                ..
1529            }) = thumbnail
1530            {
1531                let upload_thumbnail_txn = thumbnail_info.txn.clone();
1532
1533                // Save the thumbnail upload request as a dependent request of the last file
1534                // upload.
1535                store
1536                    .save_dependent_queued_request(
1537                        &self.room_id,
1538                        &last_upload_file_txn,
1539                        upload_thumbnail_txn.clone().into(),
1540                        created_at,
1541                        DependentQueuedRequestKind::UploadFileOrThumbnail {
1542                            content_type: thumbnail_content_type.to_string(),
1543                            cache_key: thumbnail_media_request.clone(),
1544                            related_to: send_event_txn.clone(),
1545                            parent_is_thumbnail_upload: false,
1546                        },
1547                    )
1548                    .await?;
1549
1550                last_upload_file_txn = upload_thumbnail_txn;
1551
1552                Some(thumbnail_info)
1553            } else {
1554                None
1555            };
1556
1557            // Save the file upload as a dependent request of the previous upload.
1558            store
1559                .save_dependent_queued_request(
1560                    &self.room_id,
1561                    &last_upload_file_txn,
1562                    upload_file_txn.clone().into(),
1563                    created_at,
1564                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1565                        content_type: content_type.to_string(),
1566                        cache_key: file_media_request.clone(),
1567                        related_to: send_event_txn.clone(),
1568                        parent_is_thumbnail_upload: thumbnail.is_some(),
1569                    },
1570                )
1571                .await?;
1572
1573            finish_item_infos.push(FinishGalleryItemInfo {
1574                file_upload: upload_file_txn.clone(),
1575                thumbnail_info: thumbnail_info.cloned(),
1576            });
1577            thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1578
1579            last_upload_file_txn = upload_file_txn.clone();
1580        }
1581
1582        // Push the request for the event itself as a dependent request of the last file
1583        // upload.
1584        store
1585            .save_dependent_queued_request(
1586                &self.room_id,
1587                &last_upload_file_txn,
1588                send_event_txn.clone().into(),
1589                created_at,
1590                DependentQueuedRequestKind::FinishGallery {
1591                    local_echo: Box::new(event),
1592                    item_infos: finish_item_infos,
1593                },
1594            )
1595            .await?;
1596
1597        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1598
1599        Ok(())
1600    }
1601
1602    /// If a thumbnail exists, pushes a [`QueuedRequestKind::MediaUpload`] to
1603    /// upload it
1604    /// and a [`DependentQueuedRequestKind::UploadFileOrThumbnail`] to upload
1605    /// the media itself. Otherwise, pushes a
1606    /// [`QueuedRequestKind::MediaUpload`] to upload the media directly.
1607    #[allow(clippy::too_many_arguments)]
1608    async fn push_thumbnail_and_media_uploads(
1609        &self,
1610        store: &DynStateStore,
1611        content_type: &Mime,
1612        send_event_txn: OwnedTransactionId,
1613        created_at: MilliSecondsSinceUnixEpoch,
1614        upload_file_txn: OwnedTransactionId,
1615        file_media_request: MediaRequestParameters,
1616        thumbnail: Option<QueueThumbnailInfo>,
1617    ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1618        if let Some(QueueThumbnailInfo {
1619            finish_upload_thumbnail_info: thumbnail_info,
1620            media_request_parameters: thumbnail_media_request,
1621            content_type: thumbnail_content_type,
1622            ..
1623        }) = thumbnail
1624        {
1625            let upload_thumbnail_txn = thumbnail_info.txn.clone();
1626
1627            // Save the thumbnail upload request.
1628            store
1629                .save_send_queue_request(
1630                    &self.room_id,
1631                    upload_thumbnail_txn.clone(),
1632                    created_at,
1633                    QueuedRequestKind::MediaUpload {
1634                        content_type: thumbnail_content_type.to_string(),
1635                        cache_key: thumbnail_media_request,
1636                        thumbnail_source: None, // the thumbnail has no thumbnails :)
1637                        related_to: send_event_txn.clone(),
1638                        #[cfg(feature = "unstable-msc4274")]
1639                        accumulated: vec![],
1640                    },
1641                    Self::LOW_PRIORITY,
1642                )
1643                .await?;
1644
1645            // Save the file upload request as a dependent request of the thumbnail upload.
1646            store
1647                .save_dependent_queued_request(
1648                    &self.room_id,
1649                    &upload_thumbnail_txn,
1650                    upload_file_txn.into(),
1651                    created_at,
1652                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1653                        content_type: content_type.to_string(),
1654                        cache_key: file_media_request,
1655                        related_to: send_event_txn,
1656                        parent_is_thumbnail_upload: true,
1657                    },
1658                )
1659                .await?;
1660
1661            Ok(Some(thumbnail_info))
1662        } else {
1663            // Save the file upload as its own request, not a dependent one.
1664            store
1665                .save_send_queue_request(
1666                    &self.room_id,
1667                    upload_file_txn,
1668                    created_at,
1669                    QueuedRequestKind::MediaUpload {
1670                        content_type: content_type.to_string(),
1671                        cache_key: file_media_request,
1672                        thumbnail_source: None,
1673                        related_to: send_event_txn,
1674                        #[cfg(feature = "unstable-msc4274")]
1675                        accumulated: vec![],
1676                    },
1677                    Self::LOW_PRIORITY,
1678                )
1679                .await?;
1680
1681            Ok(None)
1682        }
1683    }
1684
1685    /// Reacts to the given local echo of an event.
1686    #[instrument(skip(self))]
1687    async fn react(
1688        &self,
1689        transaction_id: &TransactionId,
1690        key: String,
1691        created_at: MilliSecondsSinceUnixEpoch,
1692    ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1693        let guard = self.store.lock().await;
1694        let client = guard.client()?;
1695        let store = client.state_store();
1696
1697        let requests = store.load_send_queue_requests(&self.room_id).await?;
1698
1699        // If the target event has been already sent, abort immediately.
1700        if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1701            // We didn't find it as a queued request; try to find it as a dependent queued
1702            // request.
1703            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1704            if !dependent_requests
1705                .into_iter()
1706                .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1707                .any(|child_txn| *child_txn == *transaction_id)
1708            {
1709                // We didn't find it as either a request or a dependent request, abort.
1710                return Ok(None);
1711            }
1712        }
1713
1714        // Record the dependent request.
1715        let reaction_txn_id = ChildTransactionId::new();
1716        store
1717            .save_dependent_queued_request(
1718                &self.room_id,
1719                transaction_id,
1720                reaction_txn_id.clone(),
1721                created_at,
1722                DependentQueuedRequestKind::ReactEvent { key },
1723            )
1724            .await?;
1725
1726        Ok(Some(reaction_txn_id))
1727    }
1728
1729    /// Returns a list of the local echoes, that is, all the requests that we're
1730    /// about to send but that haven't been sent yet (or are being sent).
1731    async fn local_echoes(
1732        &self,
1733        room: &RoomSendQueue,
1734    ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1735        let guard = self.store.lock().await;
1736        let client = guard.client()?;
1737        let store = client.state_store();
1738
1739        let local_requests =
1740            store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1741                Some(LocalEcho {
1742                    transaction_id: queued.transaction_id.clone(),
1743                    content: match queued.kind {
1744                        QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1745                            serialized_event: content,
1746                            send_handle: SendHandle {
1747                                room: room.clone(),
1748                                transaction_id: queued.transaction_id,
1749                                media_handles: vec![],
1750                                created_at: queued.created_at,
1751                            },
1752                            send_error: queued.error,
1753                        },
1754
1755                        QueuedRequestKind::MediaUpload { .. } => {
1756                            // Don't return uploaded medias as their own things; the accompanying
1757                            // event represented as a dependent request should be sufficient.
1758                            return None;
1759                        }
1760                    },
1761                })
1762            });
1763
1764        let reactions_and_medias = store
1765            .load_dependent_queued_requests(&self.room_id)
1766            .await?
1767            .into_iter()
1768            .filter_map(|dep| match dep.kind {
1769                DependentQueuedRequestKind::EditEvent { .. }
1770                | DependentQueuedRequestKind::RedactEvent => {
1771                    // TODO: reflect local edits/redacts too?
1772                    None
1773                }
1774
1775                DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1776                    transaction_id: dep.own_transaction_id.clone().into(),
1777                    content: LocalEchoContent::React {
1778                        key,
1779                        send_handle: SendReactionHandle {
1780                            room: room.clone(),
1781                            transaction_id: dep.own_transaction_id,
1782                        },
1783                        applies_to: dep.parent_transaction_id,
1784                    },
1785                }),
1786
1787                DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1788                    // Don't reflect these: only the associated event is interesting to observers.
1789                    None
1790                }
1791
1792                DependentQueuedRequestKind::FinishUpload {
1793                    local_echo,
1794                    file_upload,
1795                    thumbnail_info,
1796                } => {
1797                    // Materialize as an event local echo.
1798                    Some(LocalEcho {
1799                        transaction_id: dep.own_transaction_id.clone().into(),
1800                        content: LocalEchoContent::Event {
1801                            serialized_event: SerializableEventContent::new(&(*local_echo).into())
1802                                .ok()?,
1803                            send_handle: SendHandle {
1804                                room: room.clone(),
1805                                transaction_id: dep.own_transaction_id.into(),
1806                                media_handles: vec![MediaHandles {
1807                                    upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1808                                    upload_file_txn: file_upload,
1809                                }],
1810                                created_at: dep.created_at,
1811                            },
1812                            send_error: None,
1813                        },
1814                    })
1815                }
1816
1817                #[cfg(feature = "unstable-msc4274")]
1818                DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1819                    // Materialize as an event local echo.
1820                    self.create_gallery_local_echo(
1821                        dep.own_transaction_id,
1822                        room,
1823                        dep.created_at,
1824                        local_echo,
1825                        item_infos,
1826                    )
1827                }
1828            });
1829
1830        Ok(local_requests.chain(reactions_and_medias).collect())
1831    }
1832
1833    /// Create a local echo for a gallery event.
1834    #[cfg(feature = "unstable-msc4274")]
1835    fn create_gallery_local_echo(
1836        &self,
1837        transaction_id: ChildTransactionId,
1838        room: &RoomSendQueue,
1839        created_at: MilliSecondsSinceUnixEpoch,
1840        local_echo: Box<RoomMessageEventContent>,
1841        item_infos: Vec<FinishGalleryItemInfo>,
1842    ) -> Option<LocalEcho> {
1843        Some(LocalEcho {
1844            transaction_id: transaction_id.clone().into(),
1845            content: LocalEchoContent::Event {
1846                serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1847                send_handle: SendHandle {
1848                    room: room.clone(),
1849                    transaction_id: transaction_id.into(),
1850                    media_handles: item_infos
1851                        .into_iter()
1852                        .map(|i| MediaHandles {
1853                            upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1854                            upload_file_txn: i.file_upload,
1855                        })
1856                        .collect(),
1857                    created_at,
1858                },
1859                send_error: None,
1860            },
1861        })
1862    }
1863
1864    /// Try to apply a single dependent request, whether it's local or remote.
1865    ///
1866    /// This swallows errors that would retrigger every time if we retried
1867    /// applying the dependent request: invalid edit content, etc.
1868    ///
1869    /// Returns true if the dependent request has been sent (or should not be
1870    /// retried later).
1871    #[instrument(skip_all)]
1872    async fn try_apply_single_dependent_request(
1873        &self,
1874        client: &Client,
1875        dependent_request: DependentQueuedRequest,
1876        new_updates: &mut Vec<RoomSendQueueUpdate>,
1877    ) -> Result<bool, RoomSendQueueError> {
1878        let store = client.state_store();
1879
1880        let parent_key = dependent_request.parent_key;
1881
1882        match dependent_request.kind {
1883            DependentQueuedRequestKind::EditEvent { new_content } => {
1884                if let Some(parent_key) = parent_key {
1885                    let Some(event_id) = parent_key.into_event_id() else {
1886                        return Err(RoomSendQueueError::StorageError(
1887                            RoomSendQueueStorageError::InvalidParentKey,
1888                        ));
1889                    };
1890
1891                    // The parent event has been sent, so send an edit event.
1892                    let room = client
1893                        .get_room(&self.room_id)
1894                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1895
1896                    // Check the event is one we know how to edit with an edit event.
1897
1898                    // It must be deserializable…
1899                    let edited_content = match new_content.deserialize() {
1900                        Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1901                            // Assume no relationships.
1902                            EditedContent::RoomMessage(c.into())
1903                        }
1904
1905                        Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1906                            let poll_start = c.poll_start().clone();
1907                            EditedContent::PollStart {
1908                                fallback_text: poll_start.question.text.clone(),
1909                                new_content: poll_start,
1910                            }
1911                        }
1912
1913                        Ok(c) => {
1914                            warn!("Unsupported edit content type: {:?}", c.event_type());
1915                            return Ok(true);
1916                        }
1917
1918                        Err(err) => {
1919                            warn!("Unable to deserialize: {err}");
1920                            return Ok(true);
1921                        }
1922                    };
1923
1924                    let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1925                        Ok(e) => e,
1926                        Err(err) => {
1927                            warn!("couldn't create edited event: {err}");
1928                            return Ok(true);
1929                        }
1930                    };
1931
1932                    // Queue the edit event in the send queue 🧠.
1933                    let serializable = SerializableEventContent::from_raw(
1934                        Raw::new(&edit_event)
1935                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1936                        edit_event.event_type().to_string(),
1937                    );
1938
1939                    store
1940                        .save_send_queue_request(
1941                            &self.room_id,
1942                            dependent_request.own_transaction_id.into(),
1943                            dependent_request.created_at,
1944                            serializable.into(),
1945                            Self::HIGH_PRIORITY,
1946                        )
1947                        .await
1948                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1949                } else {
1950                    // The parent event is still local; update the local echo.
1951                    let edited = store
1952                        .update_send_queue_request(
1953                            &self.room_id,
1954                            &dependent_request.parent_transaction_id,
1955                            new_content.into(),
1956                        )
1957                        .await
1958                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1959
1960                    if !edited {
1961                        warn!("missing local echo upon dependent edit");
1962                    }
1963                }
1964            }
1965
1966            DependentQueuedRequestKind::RedactEvent => {
1967                if let Some(parent_key) = parent_key {
1968                    let Some(event_id) = parent_key.into_event_id() else {
1969                        return Err(RoomSendQueueError::StorageError(
1970                            RoomSendQueueStorageError::InvalidParentKey,
1971                        ));
1972                    };
1973
1974                    // The parent event has been sent; send a redaction.
1975                    let room = client
1976                        .get_room(&self.room_id)
1977                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1978
1979                    // Ideally we'd use the send queue to send the redaction, but the protocol has
1980                    // changed the shape of a room.redaction after v11, so keep it simple and try
1981                    // once here.
1982
1983                    // Note: no reason is provided because we materialize the intent of "cancel
1984                    // sending the parent event".
1985
1986                    if let Err(err) = room
1987                        .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1988                        .await
1989                    {
1990                        warn!("error when sending a redact for {event_id}: {err}");
1991                        return Ok(false);
1992                    }
1993                } else {
1994                    // The parent event is still local (sending must have failed); redact the local
1995                    // echo.
1996                    let removed = store
1997                        .remove_send_queue_request(
1998                            &self.room_id,
1999                            &dependent_request.parent_transaction_id,
2000                        )
2001                        .await
2002                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2003
2004                    if !removed {
2005                        warn!("missing local echo upon dependent redact");
2006                    }
2007                }
2008            }
2009
2010            DependentQueuedRequestKind::ReactEvent { key } => {
2011                if let Some(parent_key) = parent_key {
2012                    let Some(parent_event_id) = parent_key.into_event_id() else {
2013                        return Err(RoomSendQueueError::StorageError(
2014                            RoomSendQueueStorageError::InvalidParentKey,
2015                        ));
2016                    };
2017
2018                    // Queue the reaction event in the send queue 🧠.
2019                    let react_event =
2020                        ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
2021                    let serializable = SerializableEventContent::from_raw(
2022                        Raw::new(&react_event)
2023                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2024                        react_event.event_type().to_string(),
2025                    );
2026
2027                    store
2028                        .save_send_queue_request(
2029                            &self.room_id,
2030                            dependent_request.own_transaction_id.into(),
2031                            dependent_request.created_at,
2032                            serializable.into(),
2033                            Self::HIGH_PRIORITY,
2034                        )
2035                        .await
2036                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2037                } else {
2038                    // Not applied yet, we should retry later => false.
2039                    return Ok(false);
2040                }
2041            }
2042
2043            DependentQueuedRequestKind::UploadFileOrThumbnail {
2044                content_type,
2045                cache_key,
2046                related_to,
2047                parent_is_thumbnail_upload,
2048            } => {
2049                let Some(parent_key) = parent_key else {
2050                    // Not finished yet, we should retry later => false.
2051                    return Ok(false);
2052                };
2053                self.handle_dependent_file_or_thumbnail_upload(
2054                    client,
2055                    dependent_request.own_transaction_id.into(),
2056                    parent_key,
2057                    content_type,
2058                    cache_key,
2059                    related_to,
2060                    parent_is_thumbnail_upload,
2061                )
2062                .await?;
2063            }
2064
2065            DependentQueuedRequestKind::FinishUpload {
2066                local_echo,
2067                file_upload,
2068                thumbnail_info,
2069            } => {
2070                let Some(parent_key) = parent_key else {
2071                    // Not finished yet, we should retry later => false.
2072                    return Ok(false);
2073                };
2074                self.handle_dependent_finish_upload(
2075                    client,
2076                    dependent_request.own_transaction_id.into(),
2077                    parent_key,
2078                    *local_echo,
2079                    file_upload,
2080                    thumbnail_info,
2081                    new_updates,
2082                )
2083                .await?;
2084            }
2085
2086            #[cfg(feature = "unstable-msc4274")]
2087            DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2088                let Some(parent_key) = parent_key else {
2089                    // Not finished yet, we should retry later => false.
2090                    return Ok(false);
2091                };
2092                self.handle_dependent_finish_gallery_upload(
2093                    client,
2094                    dependent_request.own_transaction_id.into(),
2095                    parent_key,
2096                    *local_echo,
2097                    item_infos,
2098                    new_updates,
2099                )
2100                .await?;
2101            }
2102        }
2103
2104        Ok(true)
2105    }
2106
2107    #[instrument(skip(self))]
2108    async fn apply_dependent_requests(
2109        &self,
2110        new_updates: &mut Vec<RoomSendQueueUpdate>,
2111    ) -> Result<(), RoomSendQueueError> {
2112        let guard = self.store.lock().await;
2113
2114        let client = guard.client()?;
2115        let store = client.state_store();
2116
2117        let dependent_requests = store
2118            .load_dependent_queued_requests(&self.room_id)
2119            .await
2120            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2121
2122        let num_initial_dependent_requests = dependent_requests.len();
2123        if num_initial_dependent_requests == 0 {
2124            // Returning early here avoids a bit of useless logging.
2125            return Ok(());
2126        }
2127
2128        let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2129
2130        // Get rid of the all non-canonical dependent events.
2131        for original in &dependent_requests {
2132            if !canonicalized_dependent_requests
2133                .iter()
2134                .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2135            {
2136                store
2137                    .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2138                    .await
2139                    .map_err(RoomSendQueueStorageError::StateStoreError)?;
2140            }
2141        }
2142
2143        let mut num_dependent_requests = canonicalized_dependent_requests.len();
2144
2145        debug!(
2146            num_dependent_requests,
2147            num_initial_dependent_requests, "starting handling of dependent requests"
2148        );
2149
2150        for dependent in canonicalized_dependent_requests {
2151            let dependent_id = dependent.own_transaction_id.clone();
2152
2153            match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2154                Ok(should_remove) => {
2155                    if should_remove {
2156                        // The dependent request has been successfully applied, forget about it.
2157                        store
2158                            .remove_dependent_queued_request(&self.room_id, &dependent_id)
2159                            .await
2160                            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2161
2162                        num_dependent_requests -= 1;
2163                    }
2164                }
2165
2166                Err(err) => {
2167                    warn!("error when applying single dependent request: {err}");
2168                }
2169            }
2170        }
2171
2172        debug!(
2173            leftover_dependent_requests = num_dependent_requests,
2174            "stopped handling dependent request"
2175        );
2176
2177        Ok(())
2178    }
2179
2180    /// Remove a single dependent request from storage.
2181    async fn remove_dependent_send_queue_request(
2182        &self,
2183        dependent_event_id: &ChildTransactionId,
2184    ) -> Result<bool, RoomSendQueueStorageError> {
2185        Ok(self
2186            .store
2187            .lock()
2188            .await
2189            .client()?
2190            .state_store()
2191            .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2192            .await?)
2193    }
2194}
2195
2196#[cfg(feature = "unstable-msc4274")]
2197/// Metadata needed for pushing gallery item uploads onto the send queue.
2198struct GalleryItemQueueInfo {
2199    content_type: Mime,
2200    upload_file_txn: OwnedTransactionId,
2201    file_media_request: MediaRequestParameters,
2202    thumbnail: Option<QueueThumbnailInfo>,
2203}
2204
2205/// The content of a local echo.
2206#[derive(Clone, Debug)]
2207pub enum LocalEchoContent {
2208    /// The local echo contains an actual event ready to display.
2209    Event {
2210        /// Content of the event itself (along with its type) that we are about
2211        /// to send.
2212        serialized_event: SerializableEventContent,
2213        /// A handle to manipulate the sending of the associated event.
2214        send_handle: SendHandle,
2215        /// Whether trying to send this local echo failed in the past with an
2216        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
2217        send_error: Option<QueueWedgeError>,
2218    },
2219
2220    /// A local echo has been reacted to.
2221    React {
2222        /// The key with which the local echo has been reacted to.
2223        key: String,
2224        /// A handle to manipulate the sending of the reaction.
2225        send_handle: SendReactionHandle,
2226        /// The local echo which has been reacted to.
2227        applies_to: OwnedTransactionId,
2228    },
2229}
2230
2231/// A local representation for a request that hasn't been sent yet to the user's
2232/// homeserver.
2233#[derive(Clone, Debug)]
2234pub struct LocalEcho {
2235    /// Transaction id used to identify the associated request.
2236    pub transaction_id: OwnedTransactionId,
2237    /// The content for the local echo.
2238    pub content: LocalEchoContent,
2239}
2240
2241/// An update to a room send queue, observable with
2242/// [`RoomSendQueue::subscribe`].
2243#[derive(Clone, Debug)]
2244pub enum RoomSendQueueUpdate {
2245    /// A new local event is being sent.
2246    ///
2247    /// There's been a user query to create this event. It is being sent to the
2248    /// server.
2249    NewLocalEvent(LocalEcho),
2250
2251    /// A local event that hadn't been sent to the server yet has been cancelled
2252    /// before sending.
2253    CancelledLocalEvent {
2254        /// Transaction id used to identify this event.
2255        transaction_id: OwnedTransactionId,
2256    },
2257
2258    /// A local event's content has been replaced with something else.
2259    ReplacedLocalEvent {
2260        /// Transaction id used to identify this event.
2261        transaction_id: OwnedTransactionId,
2262
2263        /// The new content replacing the previous one.
2264        new_content: SerializableEventContent,
2265    },
2266
2267    /// An error happened when an event was being sent.
2268    ///
2269    /// The event has not been removed from the queue. All the send queues
2270    /// will be disabled after this happens, and must be manually re-enabled.
2271    SendError {
2272        /// Transaction id used to identify this event.
2273        transaction_id: OwnedTransactionId,
2274        /// Error received while sending the event.
2275        error: Arc<crate::Error>,
2276        /// Whether the error is considered recoverable or not.
2277        ///
2278        /// An error that's recoverable will disable the room's send queue,
2279        /// while an unrecoverable error will be parked, until the user
2280        /// decides to cancel sending it.
2281        is_recoverable: bool,
2282    },
2283
2284    /// The event has been unwedged and sending is now being retried.
2285    RetryEvent {
2286        /// Transaction id used to identify this event.
2287        transaction_id: OwnedTransactionId,
2288    },
2289
2290    /// The event has been sent to the server, and the query returned
2291    /// successfully.
2292    SentEvent {
2293        /// Transaction id used to identify this event.
2294        transaction_id: OwnedTransactionId,
2295        /// Received event id from the send response.
2296        event_id: OwnedEventId,
2297    },
2298
2299    /// A media upload (consisting of a file and possibly a thumbnail) has made
2300    /// progress.
2301    MediaUpload {
2302        /// The media event this uploaded media relates to.
2303        related_to: OwnedTransactionId,
2304
2305        /// The final media source for the file if it has finished uploading.
2306        file: Option<MediaSource>,
2307
2308        /// The index of the media within the transaction. A file and its
2309        /// thumbnail share the same index. Will always be 0 for non-gallery
2310        /// media uploads.
2311        index: u64,
2312
2313        /// The combined upload progress across the file and, if existing, its
2314        /// thumbnail. For gallery uploads, the progress is reported per indexed
2315        /// gallery item.
2316        progress: AbstractProgress,
2317    },
2318}
2319
2320/// A [`RoomSendQueueUpdate`] with an associated [`OwnedRoomId`].
2321///
2322/// This is used by [`SendQueue::subscribe`] to get a single channel to receive
2323/// updates for all [`RoomSendQueue`]s.
2324#[derive(Clone, Debug)]
2325pub struct SendQueueUpdate {
2326    /// The room where the update happened.
2327    pub room_id: OwnedRoomId,
2328
2329    /// The update for this room.
2330    pub update: RoomSendQueueUpdate,
2331}
2332
2333/// An error triggered by the send queue module.
2334#[derive(Debug, thiserror::Error)]
2335pub enum RoomSendQueueError {
2336    /// The room isn't in the joined state.
2337    #[error("the room isn't in the joined state")]
2338    RoomNotJoined,
2339
2340    /// The room is missing from the client.
2341    ///
2342    /// This happens only whenever the client is shutting down.
2343    #[error("the room is now missing from the client")]
2344    RoomDisappeared,
2345
2346    /// Error coming from storage.
2347    #[error(transparent)]
2348    StorageError(#[from] RoomSendQueueStorageError),
2349
2350    /// The attachment event failed to be created.
2351    #[error("the attachment event could not be created")]
2352    FailedToCreateAttachment,
2353
2354    /// The gallery contains no items.
2355    #[cfg(feature = "unstable-msc4274")]
2356    #[error("the gallery contains no items")]
2357    EmptyGallery,
2358
2359    /// The gallery event failed to be created.
2360    #[cfg(feature = "unstable-msc4274")]
2361    #[error("the gallery event could not be created")]
2362    FailedToCreateGallery,
2363}
2364
2365/// An error triggered by the send queue storage.
2366#[derive(Debug, thiserror::Error)]
2367pub enum RoomSendQueueStorageError {
2368    /// Error caused by the state store.
2369    #[error(transparent)]
2370    StateStoreError(#[from] StoreError),
2371
2372    /// Error caused by the event cache store.
2373    #[error(transparent)]
2374    EventCacheStoreError(#[from] EventCacheStoreError),
2375
2376    /// Error caused by the event cache store.
2377    #[error(transparent)]
2378    MediaStoreError(#[from] MediaStoreError),
2379
2380    /// Error caused when attempting to get a handle on the event cache store.
2381    #[error(transparent)]
2382    LockError(#[from] CrossProcessLockError),
2383
2384    /// Error caused when (de)serializing into/from json.
2385    #[error(transparent)]
2386    JsonSerialization(#[from] serde_json::Error),
2387
2388    /// A parent key was expected to be of a certain type, and it was another
2389    /// type instead.
2390    #[error("a dependent event had an invalid parent key type")]
2391    InvalidParentKey,
2392
2393    /// The client is shutting down.
2394    #[error("The client is shutting down.")]
2395    ClientShuttingDown,
2396
2397    /// An operation not implemented on a send handle.
2398    #[error("This operation is not implemented for media uploads")]
2399    OperationNotImplementedYet,
2400
2401    /// Trying to edit a media caption for something that's not a media.
2402    #[error("Can't edit a media caption when the underlying event isn't a media")]
2403    InvalidMediaCaptionEdit,
2404}
2405
2406/// Extra transaction IDs useful during an upload.
2407#[derive(Clone, Debug)]
2408struct MediaHandles {
2409    /// Transaction id used when uploading the thumbnail.
2410    ///
2411    /// Optional because a media can be uploaded without a thumbnail.
2412    upload_thumbnail_txn: Option<OwnedTransactionId>,
2413
2414    /// Transaction id used when uploading the media itself.
2415    upload_file_txn: OwnedTransactionId,
2416}
2417
2418/// A handle to manipulate an event that was scheduled to be sent to a room.
2419#[derive(Clone, Debug)]
2420pub struct SendHandle {
2421    /// Link to the send queue used to send this request.
2422    room: RoomSendQueue,
2423
2424    /// Transaction id used for the sent request.
2425    ///
2426    /// If this is a media upload, this is the "main" transaction id, i.e. the
2427    /// one used to send the event, and that will be seen by observers.
2428    transaction_id: OwnedTransactionId,
2429
2430    /// Additional handles for a media upload.
2431    media_handles: Vec<MediaHandles>,
2432
2433    /// The time at which the event to be sent has been created.
2434    pub created_at: MilliSecondsSinceUnixEpoch,
2435}
2436
2437impl SendHandle {
2438    /// Creates a new [`SendHandle`].
2439    #[cfg(test)]
2440    pub(crate) fn new(
2441        room: RoomSendQueue,
2442        transaction_id: OwnedTransactionId,
2443        created_at: MilliSecondsSinceUnixEpoch,
2444    ) -> Self {
2445        Self { room, transaction_id, media_handles: vec![], created_at }
2446    }
2447
2448    fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2449        if !self.media_handles.is_empty() {
2450            Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2451        } else {
2452            Ok(())
2453        }
2454    }
2455
2456    /// Aborts the sending of the event, if it wasn't sent yet.
2457    ///
2458    /// Returns true if the sending could be aborted, false if not (i.e. the
2459    /// event had already been sent).
2460    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2461    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2462        trace!("received an abort request");
2463
2464        let queue = &self.room.inner.queue;
2465
2466        for handles in &self.media_handles {
2467            if queue.abort_upload(&self.transaction_id, handles).await? {
2468                // Propagate a cancelled update.
2469                self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2470                    transaction_id: self.transaction_id.clone(),
2471                });
2472
2473                return Ok(true);
2474            }
2475
2476            // If it failed, it means the sending of the event is not a
2477            // dependent request anymore. Fall back to the regular
2478            // code path below, that handles aborting sending of an event.
2479        }
2480
2481        if queue.cancel_event(&self.transaction_id).await? {
2482            trace!("successful abort");
2483
2484            // Propagate a cancelled update too.
2485            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2486                transaction_id: self.transaction_id.clone(),
2487            });
2488
2489            Ok(true)
2490        } else {
2491            debug!("local echo didn't exist anymore, can't abort");
2492            Ok(false)
2493        }
2494    }
2495
2496    /// Edits the content of a local echo with a raw event content.
2497    ///
2498    /// Returns true if the event to be sent was replaced, false if not (i.e.
2499    /// the event had already been sent).
2500    #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2501    pub async fn edit_raw(
2502        &self,
2503        new_content: Raw<AnyMessageLikeEventContent>,
2504        event_type: String,
2505    ) -> Result<bool, RoomSendQueueStorageError> {
2506        trace!("received an edit request");
2507        self.nyi_for_uploads()?;
2508
2509        let serializable = SerializableEventContent::from_raw(new_content, event_type);
2510
2511        if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2512            trace!("successful edit");
2513
2514            // Wake up the queue, in case the room was asleep before the edit.
2515            self.room.inner.notifier.notify_one();
2516
2517            // Propagate a replaced update too.
2518            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2519                transaction_id: self.transaction_id.clone(),
2520                new_content: serializable,
2521            });
2522
2523            Ok(true)
2524        } else {
2525            debug!("local echo doesn't exist anymore, can't edit");
2526            Ok(false)
2527        }
2528    }
2529
2530    /// Edits the content of a local echo with an event content.
2531    ///
2532    /// Returns true if the event to be sent was replaced, false if not (i.e.
2533    /// the event had already been sent).
2534    pub async fn edit(
2535        &self,
2536        new_content: AnyMessageLikeEventContent,
2537    ) -> Result<bool, RoomSendQueueStorageError> {
2538        self.edit_raw(
2539            Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2540            new_content.event_type().to_string(),
2541        )
2542        .await
2543    }
2544
2545    /// Edits the content of a local echo with a media caption.
2546    ///
2547    /// Will fail if the event to be sent, represented by this send handle,
2548    /// wasn't a media.
2549    pub async fn edit_media_caption(
2550        &self,
2551        caption: Option<String>,
2552        formatted_caption: Option<FormattedBody>,
2553        mentions: Option<Mentions>,
2554    ) -> Result<bool, RoomSendQueueStorageError> {
2555        if let Some(new_content) = self
2556            .room
2557            .inner
2558            .queue
2559            .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2560            .await?
2561        {
2562            trace!("successful edit of media caption");
2563
2564            // Wake up the queue, in case the room was asleep before the edit.
2565            self.room.inner.notifier.notify_one();
2566
2567            let new_content = SerializableEventContent::new(&new_content)
2568                .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2569
2570            // Propagate a replaced update too.
2571            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2572                transaction_id: self.transaction_id.clone(),
2573                new_content,
2574            });
2575
2576            Ok(true)
2577        } else {
2578            debug!("local echo doesn't exist anymore, can't edit media caption");
2579            Ok(false)
2580        }
2581    }
2582
2583    /// Unwedge a local echo identified by its transaction identifier and try to
2584    /// resend it.
2585    pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2586        let room = &self.room.inner;
2587        room.queue
2588            .mark_as_unwedged(&self.transaction_id)
2589            .await
2590            .map_err(RoomSendQueueError::StorageError)?;
2591
2592        // If we have media handles, also try to unwedge them.
2593        //
2594        // It's fine to always do it to *all* the transaction IDs at once, because only
2595        // one of the three requests will be active at the same time, i.e. only
2596        // one entry will be updated in the store. The other two are either
2597        // done, or dependent requests.
2598
2599        for handles in &self.media_handles {
2600            room.queue
2601                .mark_as_unwedged(&handles.upload_file_txn)
2602                .await
2603                .map_err(RoomSendQueueError::StorageError)?;
2604
2605            if let Some(txn) = &handles.upload_thumbnail_txn {
2606                room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2607            }
2608        }
2609
2610        // Wake up the queue, in case the room was asleep before unwedging the request.
2611        room.notifier.notify_one();
2612
2613        self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2614            transaction_id: self.transaction_id.clone(),
2615        });
2616
2617        Ok(())
2618    }
2619
2620    /// Send a reaction to the event as soon as it's sent.
2621    ///
2622    /// If returning `Ok(None)`; this means the reaction couldn't be sent
2623    /// because the event is already a remote one.
2624    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2625    pub async fn react(
2626        &self,
2627        key: String,
2628    ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2629        trace!("received an intent to react");
2630
2631        let created_at = MilliSecondsSinceUnixEpoch::now();
2632        if let Some(reaction_txn_id) =
2633            self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2634        {
2635            trace!("successfully queued react");
2636
2637            // Wake up the queue, in case the room was asleep before the sending.
2638            self.room.inner.notifier.notify_one();
2639
2640            // Propagate a new local event.
2641            let send_handle = SendReactionHandle {
2642                room: self.room.clone(),
2643                transaction_id: reaction_txn_id.clone(),
2644            };
2645
2646            self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2647                // Note: we do want to use the `txn_id` we're going to use for the reaction, not
2648                // the one for the event we're reacting to.
2649                transaction_id: reaction_txn_id.into(),
2650                content: LocalEchoContent::React {
2651                    key,
2652                    send_handle: send_handle.clone(),
2653                    applies_to: self.transaction_id.clone(),
2654                },
2655            }));
2656
2657            Ok(Some(send_handle))
2658        } else {
2659            debug!("local echo doesn't exist anymore, can't react");
2660            Ok(None)
2661        }
2662    }
2663}
2664
2665/// A handle to execute actions on the sending of a reaction.
2666#[derive(Clone, Debug)]
2667pub struct SendReactionHandle {
2668    /// Reference to the send queue for the room where this reaction was sent.
2669    room: RoomSendQueue,
2670    /// The own transaction id for the reaction.
2671    transaction_id: ChildTransactionId,
2672}
2673
2674impl SendReactionHandle {
2675    /// Abort the sending of the reaction.
2676    ///
2677    /// Will return true if the reaction could be aborted, false if it's been
2678    /// sent (and there's no matching local echo anymore).
2679    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2680        if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2681            // Simple case: the reaction was found in the dependent event list.
2682
2683            // Propagate a cancelled update too.
2684            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2685                transaction_id: self.transaction_id.clone().into(),
2686            });
2687
2688            return Ok(true);
2689        }
2690
2691        // The reaction has already been queued for sending, try to abort it using a
2692        // regular abort.
2693        let handle = SendHandle {
2694            room: self.room.clone(),
2695            transaction_id: self.transaction_id.clone().into(),
2696            media_handles: vec![],
2697            created_at: MilliSecondsSinceUnixEpoch::now(),
2698        };
2699
2700        handle.abort().await
2701    }
2702
2703    /// The transaction id that will be used to send this reaction later.
2704    pub fn transaction_id(&self) -> &TransactionId {
2705        &self.transaction_id
2706    }
2707}
2708
2709/// From a given source of [`DependentQueuedRequest`], return only the most
2710/// meaningful, i.e. the ones that wouldn't be overridden after applying the
2711/// others.
2712fn canonicalize_dependent_requests(
2713    dependent: &[DependentQueuedRequest],
2714) -> Vec<DependentQueuedRequest> {
2715    let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2716
2717    for d in dependent {
2718        let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2719
2720        if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2721            // The parent event has already been flagged for redaction, don't consider the
2722            // other dependent events.
2723            continue;
2724        }
2725
2726        match &d.kind {
2727            DependentQueuedRequestKind::EditEvent { .. } => {
2728                // Replace any previous edit with this one.
2729                if let Some(prev_edit) = prevs
2730                    .iter_mut()
2731                    .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2732                {
2733                    *prev_edit = d;
2734                } else {
2735                    prevs.insert(0, d);
2736                }
2737            }
2738
2739            DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2740            | DependentQueuedRequestKind::FinishUpload { .. }
2741            | DependentQueuedRequestKind::ReactEvent { .. } => {
2742                // These requests can't be canonicalized, push them as is.
2743                prevs.push(d);
2744            }
2745
2746            #[cfg(feature = "unstable-msc4274")]
2747            DependentQueuedRequestKind::FinishGallery { .. } => {
2748                // This request can't be canonicalized, push it as is.
2749                prevs.push(d);
2750            }
2751
2752            DependentQueuedRequestKind::RedactEvent => {
2753                // Remove every other dependent action.
2754                prevs.clear();
2755                prevs.push(d);
2756            }
2757        }
2758    }
2759
2760    by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2761}
2762
2763#[cfg(all(test, not(target_family = "wasm")))]
2764mod tests {
2765    use std::{sync::Arc, time::Duration};
2766
2767    use assert_matches2::{assert_let, assert_matches};
2768    use matrix_sdk_base::store::{
2769        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2770        SerializableEventContent,
2771    };
2772    use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
2773    use ruma::{
2774        MilliSecondsSinceUnixEpoch, TransactionId,
2775        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
2776        room_id,
2777    };
2778
2779    use super::canonicalize_dependent_requests;
2780    use crate::{client::WeakClient, test_utils::logged_in_client};
2781
2782    #[test]
2783    fn test_canonicalize_dependent_events_created_at() {
2784        // Test to ensure the created_at field is being serialized and retrieved
2785        // correctly.
2786        let txn = TransactionId::new();
2787        let created_at = MilliSecondsSinceUnixEpoch::now();
2788
2789        let edit = DependentQueuedRequest {
2790            own_transaction_id: ChildTransactionId::new(),
2791            parent_transaction_id: txn.clone(),
2792            kind: DependentQueuedRequestKind::EditEvent {
2793                new_content: SerializableEventContent::new(
2794                    &RoomMessageEventContent::text_plain("edit").into(),
2795                )
2796                .unwrap(),
2797            },
2798            parent_key: None,
2799            created_at,
2800        };
2801
2802        let res = canonicalize_dependent_requests(&[edit]);
2803
2804        assert_eq!(res.len(), 1);
2805        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2806        assert_let!(
2807            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2808        );
2809        assert_eq!(msg.body(), "edit");
2810        assert_eq!(res[0].parent_transaction_id, txn);
2811        assert_eq!(res[0].created_at, created_at);
2812    }
2813
2814    #[async_test]
2815    async fn test_client_no_cycle_with_send_queue() {
2816        for enabled in [true, false] {
2817            let client = logged_in_client(None).await;
2818            let weak_client = WeakClient::from_client(&client);
2819
2820            {
2821                let mut sync_response_builder = SyncResponseBuilder::new();
2822
2823                let room_id = room_id!("!a:b.c");
2824
2825                // Make sure the client knows about the room.
2826                client
2827                    .base_client()
2828                    .receive_sync_response(
2829                        sync_response_builder
2830                            .add_joined_room(JoinedRoomBuilder::new(room_id))
2831                            .build_sync_response(),
2832                    )
2833                    .await
2834                    .unwrap();
2835
2836                let room = client.get_room(room_id).unwrap();
2837                let q = room.send_queue();
2838
2839                let _watcher = q.subscribe().await;
2840
2841                client.send_queue().set_enabled(enabled).await;
2842            }
2843
2844            drop(client);
2845
2846            // Give a bit of time for background tasks to die.
2847            tokio::time::sleep(Duration::from_millis(500)).await;
2848
2849            // The weak client must be the last reference to the client now.
2850            let client = weak_client.get();
2851            assert!(
2852                client.is_none(),
2853                "too many strong references to the client: {}",
2854                Arc::strong_count(&client.unwrap().inner)
2855            );
2856        }
2857    }
2858
2859    #[test]
2860    fn test_canonicalize_dependent_events_smoke_test() {
2861        // Smoke test: canonicalizing a single dependent event returns it.
2862        let txn = TransactionId::new();
2863
2864        let edit = DependentQueuedRequest {
2865            own_transaction_id: ChildTransactionId::new(),
2866            parent_transaction_id: txn.clone(),
2867            kind: DependentQueuedRequestKind::EditEvent {
2868                new_content: SerializableEventContent::new(
2869                    &RoomMessageEventContent::text_plain("edit").into(),
2870                )
2871                .unwrap(),
2872            },
2873            parent_key: None,
2874            created_at: MilliSecondsSinceUnixEpoch::now(),
2875        };
2876        let res = canonicalize_dependent_requests(&[edit]);
2877
2878        assert_eq!(res.len(), 1);
2879        assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2880        assert_eq!(res[0].parent_transaction_id, txn);
2881        assert!(res[0].parent_key.is_none());
2882    }
2883
2884    #[test]
2885    fn test_canonicalize_dependent_events_redaction_preferred() {
2886        // A redaction is preferred over any other kind of dependent event.
2887        let txn = TransactionId::new();
2888
2889        let mut inputs = Vec::with_capacity(100);
2890        let redact = DependentQueuedRequest {
2891            own_transaction_id: ChildTransactionId::new(),
2892            parent_transaction_id: txn.clone(),
2893            kind: DependentQueuedRequestKind::RedactEvent,
2894            parent_key: None,
2895            created_at: MilliSecondsSinceUnixEpoch::now(),
2896        };
2897
2898        let edit = DependentQueuedRequest {
2899            own_transaction_id: ChildTransactionId::new(),
2900            parent_transaction_id: txn.clone(),
2901            kind: DependentQueuedRequestKind::EditEvent {
2902                new_content: SerializableEventContent::new(
2903                    &RoomMessageEventContent::text_plain("edit").into(),
2904                )
2905                .unwrap(),
2906            },
2907            parent_key: None,
2908            created_at: MilliSecondsSinceUnixEpoch::now(),
2909        };
2910
2911        inputs.push({
2912            let mut edit = edit.clone();
2913            edit.own_transaction_id = ChildTransactionId::new();
2914            edit
2915        });
2916
2917        inputs.push(redact);
2918
2919        for _ in 0..98 {
2920            let mut edit = edit.clone();
2921            edit.own_transaction_id = ChildTransactionId::new();
2922            inputs.push(edit);
2923        }
2924
2925        let res = canonicalize_dependent_requests(&inputs);
2926
2927        assert_eq!(res.len(), 1);
2928        assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2929        assert_eq!(res[0].parent_transaction_id, txn);
2930    }
2931
2932    #[test]
2933    fn test_canonicalize_dependent_events_last_edit_preferred() {
2934        let parent_txn = TransactionId::new();
2935
2936        // The latest edit of a list is always preferred.
2937        let inputs = (0..10)
2938            .map(|i| DependentQueuedRequest {
2939                own_transaction_id: ChildTransactionId::new(),
2940                parent_transaction_id: parent_txn.clone(),
2941                kind: DependentQueuedRequestKind::EditEvent {
2942                    new_content: SerializableEventContent::new(
2943                        &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2944                    )
2945                    .unwrap(),
2946                },
2947                parent_key: None,
2948                created_at: MilliSecondsSinceUnixEpoch::now(),
2949            })
2950            .collect::<Vec<_>>();
2951
2952        let txn = inputs[9].parent_transaction_id.clone();
2953
2954        let res = canonicalize_dependent_requests(&inputs);
2955
2956        assert_eq!(res.len(), 1);
2957        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2958        assert_let!(
2959            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2960        );
2961        assert_eq!(msg.body(), "edit9");
2962        assert_eq!(res[0].parent_transaction_id, txn);
2963    }
2964
2965    #[test]
2966    fn test_canonicalize_multiple_local_echoes() {
2967        let txn1 = TransactionId::new();
2968        let txn2 = TransactionId::new();
2969
2970        let child1 = ChildTransactionId::new();
2971        let child2 = ChildTransactionId::new();
2972
2973        let inputs = vec![
2974            // This one pertains to txn1.
2975            DependentQueuedRequest {
2976                own_transaction_id: child1.clone(),
2977                kind: DependentQueuedRequestKind::RedactEvent,
2978                parent_transaction_id: txn1.clone(),
2979                parent_key: None,
2980                created_at: MilliSecondsSinceUnixEpoch::now(),
2981            },
2982            // This one pertains to txn2.
2983            DependentQueuedRequest {
2984                own_transaction_id: child2,
2985                kind: DependentQueuedRequestKind::EditEvent {
2986                    new_content: SerializableEventContent::new(
2987                        &RoomMessageEventContent::text_plain("edit").into(),
2988                    )
2989                    .unwrap(),
2990                },
2991                parent_transaction_id: txn2.clone(),
2992                parent_key: None,
2993                created_at: MilliSecondsSinceUnixEpoch::now(),
2994            },
2995        ];
2996
2997        let res = canonicalize_dependent_requests(&inputs);
2998
2999        // The canonicalization shouldn't depend per event id.
3000        assert_eq!(res.len(), 2);
3001
3002        for dependent in res {
3003            if dependent.own_transaction_id == child1 {
3004                assert_eq!(dependent.parent_transaction_id, txn1);
3005                assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
3006            } else {
3007                assert_eq!(dependent.parent_transaction_id, txn2);
3008                assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
3009            }
3010        }
3011    }
3012
3013    #[test]
3014    fn test_canonicalize_reactions_after_edits() {
3015        // Sending reactions should happen after edits to a given event.
3016        let txn = TransactionId::new();
3017
3018        let react_id = ChildTransactionId::new();
3019        let react = DependentQueuedRequest {
3020            own_transaction_id: react_id.clone(),
3021            kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
3022            parent_transaction_id: txn.clone(),
3023            parent_key: None,
3024            created_at: MilliSecondsSinceUnixEpoch::now(),
3025        };
3026
3027        let edit_id = ChildTransactionId::new();
3028        let edit = DependentQueuedRequest {
3029            own_transaction_id: edit_id.clone(),
3030            kind: DependentQueuedRequestKind::EditEvent {
3031                new_content: SerializableEventContent::new(
3032                    &RoomMessageEventContent::text_plain("edit").into(),
3033                )
3034                .unwrap(),
3035            },
3036            parent_transaction_id: txn,
3037            parent_key: None,
3038            created_at: MilliSecondsSinceUnixEpoch::now(),
3039        };
3040
3041        let res = canonicalize_dependent_requests(&[react, edit]);
3042
3043        assert_eq!(res.len(), 2);
3044        assert_eq!(res[0].own_transaction_id, edit_id);
3045        assert_eq!(res[1].own_transaction_id, react_id);
3046    }
3047}