Skip to main content

matrix_sdk/send_queue/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A send queue facility to serializing queuing and sending of messages.
16//!
17//! # [`Room`] send queue
18//!
19//! Each room gets its own [`RoomSendQueue`], that's available by calling
20//! [`Room::send_queue()`]. The first time this method is called, it will spawn
21//! a background task that's used to actually send events, in the order they
22//! were passed from calls to [`RoomSendQueue::send()`].
23//!
24//! This queue tries to simplify error management around sending events, using
25//! [`RoomSendQueue::send`] or [`RoomSendQueue::send_raw`]: by default, it will retry to send the
26//! same event a few times, before automatically disabling itself, and emitting
27//! a notification that can be listened to with the global send queue (see
28//! paragraph below) or using [`RoomSendQueue::subscribe()`].
29//!
30//! It is possible to control whether a single room is enabled using
31//! [`RoomSendQueue::set_enabled()`].
32//!
33//! # Global [`SendQueue`] object
34//!
35//! The [`Client::send_queue()`] method returns an API object allowing to
36//! control all the room send queues:
37//!
38//! - enable/disable them all at once with [`SendQueue::set_enabled()`].
39//! - get notifications about send errors with [`SendQueue::subscribe_errors`].
40//! - reload all unsent events that had been persisted in storage using
41//!   [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is
42//!   recommended to call this method during initialization of a client,
43//!   otherwise persisted unsent events will only be re-sent after the send
44//!   queue for the given room has been reopened for the first time.
45//!
46//! # Send handle
47//!
48//! Just after queuing a request to send something, a [`SendHandle`] is
49//! returned, allowing manipulating the inflight request.
50//!
51//! For a send handle for an event, it's possible to edit the event / abort
52//! sending it. If it was still in the queue (i.e. not sent yet, or not being
53//! sent), then such an action would happen locally (i.e. in the database).
54//! Otherwise, it is "too late": the background task may be sending
55//! the event already, or has sent it; in that case, the edit/aborting must
56//! happen as an actual event materializing this, on the server. To accomplish
57//! this, the send queue may send such an event, using the dependency system
58//! described below.
59//!
60//! # Dependency system
61//!
62//! The send queue includes a simple dependency system, where a
63//! [`QueuedRequest`] can have zero or more dependents in the form of
64//! [`DependentQueuedRequest`]. A dependent queued request can have at most one
65//! depended-upon (parent) queued request.
66//!
67//! This allows implementing deferred edits/redacts, as hinted to in the
68//! previous section.
69//!
70//! ## Media upload
71//!
72//! This dependency system also allows uploading medias, since the media's
73//! *content* must be uploaded before we send the media *event* that describes
74//! it.
75//!
76//! In the simplest case, that is, a media file and its event must be sent (i.e.
77//! no thumbnails):
78//!
79//! - The file's content is immediately cached in the
80//!   [`matrix_sdk_base::event_cache::store::EventCacheStore`], using an MXC ID
81//!   that is temporary and designates a local URI without any possible doubt.
82//! - An initial media event is created and uses this temporary MXC ID, and
83//!   propagated as a local echo for an event.
84//! - A [`QueuedRequest`] is pushed to upload the file's media
85//!   ([`QueuedRequestKind::MediaUpload`]).
86//! - A [`DependentQueuedRequest`] is pushed to finish the upload
87//!   ([`DependentQueuedRequestKind::FinishUpload`]).
88//!
89//! What is expected to happen, if all goes well, is the following:
90//!
91//! - the media is uploaded to the media homeserver, which returns the final MXC
92//!   ID.
93//! - when marking the upload request as sent, the MXC ID is injected (as a
94//!   [`matrix_sdk_base::store::SentRequestKey`]) into the dependent request
95//!   [`DependentQueuedRequestKind::FinishUpload`] created in the last step
96//!   above.
97//! - next time the send queue handles dependent queries, it'll see this one is
98//!   ready to be sent, and it will transform it into an event queued request
99//!   ([`QueuedRequestKind::Event`]), with the event created in the local echo
100//!   before, updated with the MXC ID returned from the server.
101//! - this updated local echo is also propagated as an edit of the local echo to
102//!   observers, who get the final version with the final MXC IDs at this point
103//!   too.
104//! - then the event is sent normally, as any event sent with the send queue.
105//!
106//! When there is a thumbnail, things behave similarly, with some tweaks:
107//!
108//! - the thumbnail's content is also stored into the cache store immediately,
109//! - the thumbnail is sent first as an [`QueuedRequestKind::MediaUpload`]
110//!   request,
111//! - the file upload is pushed as a dependent request of kind
112//!   [`DependentQueuedRequestKind::UploadFileOrThumbnail`] (this variant keeps
113//!   the file's key used to look it up in the cache store).
114//! - the media event is then sent as a dependent request as described in the
115//!   previous section.
116//!
117//! What's expected to happen is thus the following:
118//!
119//! - After the thumbnail has been uploaded, the dependent query will retrieve
120//!   the final MXC ID returned by the homeserver for the thumbnail, and store
121//!   it into the [`QueuedRequestKind::MediaUpload`]'s `thumbnail_source` field,
122//!   allowing to remember the thumbnail MXC ID when it's time to finish the
123//!   upload later.
124//! - The dependent request is morphed into another
125//!   [`QueuedRequestKind::MediaUpload`], for the file itself.
126//!
127//! The rest of the process is then similar to that of uploading a file without
128//! a thumbnail. The only difference is that there's a thumbnail source (MXC ID)
129//! remembered and fixed up into the media event, just before sending it.
130
131use std::{
132    collections::{BTreeMap, HashMap},
133    str::FromStr as _,
134    sync::{
135        Arc, RwLock,
136        atomic::{AtomicBool, Ordering},
137    },
138};
139
140use eyeball::SharedObservable;
141#[cfg(feature = "e2e-encryption")]
142use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
143#[cfg(feature = "unstable-msc4274")]
144use matrix_sdk_base::store::FinishGalleryItemInfo;
145use matrix_sdk_base::{
146    RoomState, StoreError,
147    cross_process_lock::CrossProcessLockError,
148    deserialized_responses::{EncryptionInfo, TimelineEvent},
149    event_cache::store::EventCacheStoreError,
150    media::{MediaRequestParameters, store::MediaStoreError},
151    store::{
152        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
153        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
154        SentMediaInfo, SentRequestKey, SerializableEventContent,
155    },
156    task_monitor::BackgroundTaskHandle,
157};
158use matrix_sdk_common::locks::Mutex as SyncMutex;
159use mime::Mime;
160use ruma::{
161    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
162    TransactionId,
163    events::{
164        AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _,
165        reaction::ReactionEventContent,
166        relation::Annotation,
167        room::{
168            MediaSource,
169            message::{FormattedBody, RoomMessageEventContent},
170        },
171    },
172    serde::Raw,
173};
174use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
175use tracing::{debug, error, info, instrument, trace, warn};
176
177use crate::{
178    Client, Media, Room, TransmissionProgress,
179    client::WeakClient,
180    config::RequestConfig,
181    error::RetryKind,
182    room::{WeakRoom, edit::EditedContent},
183};
184
185mod progress;
186mod upload;
187
188pub use progress::AbstractProgress;
189
190/// A client-wide send queue, for all the rooms known by a client.
191pub struct SendQueue {
192    client: Client,
193}
194
195#[cfg(not(tarpaulin_include))]
196impl std::fmt::Debug for SendQueue {
197    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
198        f.debug_struct("SendQueue").finish_non_exhaustive()
199    }
200}
201
202impl SendQueue {
203    pub(super) fn new(client: Client) -> Self {
204        Self { client }
205    }
206
207    /// Reload all the rooms which had unsent requests, and respawn tasks for
208    /// those rooms.
209    pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
210        if !self.is_enabled() {
211            return;
212        }
213
214        let room_ids =
215            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
216                |err| {
217                    warn!("error when loading rooms with unsent requests: {err}");
218                    Vec::new()
219                },
220            );
221
222        // Getting the [`RoomSendQueue`] is sufficient to spawn the task if needs be.
223        for room_id in room_ids {
224            if let Some(room) = self.client.get_room(&room_id) {
225                let _ = self.for_room(room);
226            }
227        }
228    }
229
230    /// Tiny helper to get the send queue's global context from the [`Client`].
231    #[inline(always)]
232    fn data(&self) -> &SendQueueData {
233        &self.client.inner.send_queue_data
234    }
235
236    /// Get or create a new send queue for a given room, and insert it into our
237    /// memoized rooms mapping.
238    pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
239        let data = self.data();
240
241        let mut map = data.rooms.write().unwrap();
242
243        let room_id = room.room_id();
244        if let Some(room_q) = map.get(room_id).cloned() {
245            return room_q;
246        }
247
248        let owned_room_id = room_id.to_owned();
249        let room_q = RoomSendQueue::new(
250            self.is_enabled(),
251            data.global_update_sender.clone(),
252            data.error_sender.clone(),
253            data.is_dropping.clone(),
254            &self.client,
255            owned_room_id.clone(),
256            data.report_media_upload_progress.clone(),
257        );
258
259        map.insert(owned_room_id, room_q.clone());
260
261        room_q
262    }
263
264    /// Enable or disable the send queue for the entire client, i.e. all rooms.
265    ///
266    /// If we're disabling the queue, and requests were being sent, they're not
267    /// aborted, and will continue until a status resolves (error responses
268    /// will keep the events in the buffer of events to send later). The
269    /// disablement will happen before the next request is sent.
270    ///
271    /// This may wake up background tasks and resume sending of requests in the
272    /// background.
273    pub async fn set_enabled(&self, enabled: bool) {
274        debug!(?enabled, "setting global send queue enablement");
275
276        self.data().globally_enabled.store(enabled, Ordering::SeqCst);
277
278        // Wake up individual rooms we already know about.
279        for room in self.data().rooms.read().unwrap().values() {
280            room.set_enabled(enabled);
281        }
282
283        // Reload some extra rooms that might not have been awaken yet, but could have
284        // requests from previous sessions.
285        self.respawn_tasks_for_rooms_with_unsent_requests().await;
286    }
287
288    /// Returns whether the send queue is enabled, at a client-wide
289    /// granularity.
290    pub fn is_enabled(&self) -> bool {
291        self.data().globally_enabled.load(Ordering::SeqCst)
292    }
293
294    /// Enable or disable progress reporting for media uploads.
295    pub fn enable_upload_progress(&self, enabled: bool) {
296        self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
297    }
298
299    /// Subscribe to all updates for all rooms.
300    ///
301    /// Use [`RoomSendQueue::subscribe`] to subscribe to update for a _specific
302    /// room_.
303    pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
304        self.data().global_update_sender.subscribe()
305    }
306
307    /// Get local echoes from all room send queues.
308    pub async fn local_echoes(
309        &self,
310    ) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
311        let room_ids =
312            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
313                |err| {
314                    warn!("error when loading rooms with unsent requests: {err}");
315                    Vec::new()
316                },
317            );
318
319        let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
320
321        for room_id in room_ids {
322            if let Some(room) = self.client.get_room(&room_id) {
323                let queue = self.for_room(room);
324                local_echoes
325                    .insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
326            }
327        }
328
329        Ok(local_echoes)
330    }
331
332    /// A subscriber to the enablement status (enabled or disabled) of the
333    /// send queue, along with useful errors.
334    pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
335        self.data().error_sender.subscribe()
336    }
337}
338
339/// Metadata about a thumbnail needed when pushing media uploads to the send
340/// queue.
341#[derive(Clone, Debug)]
342struct QueueThumbnailInfo {
343    /// Metadata about the thumbnail needed when finishing a media upload.
344    finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
345
346    /// The parameters for the request to retrieve the thumbnail data.
347    media_request_parameters: MediaRequestParameters,
348
349    /// The thumbnail's mime type.
350    content_type: Mime,
351
352    /// The thumbnail's file size in bytes.
353    file_size: usize,
354}
355
356/// A specific room's send queue ran into an error, and it has disabled itself.
357#[derive(Clone, Debug)]
358pub struct SendQueueRoomError {
359    /// For which room is the send queue failing?
360    pub room_id: OwnedRoomId,
361
362    /// The error the room has ran into, when trying to send a request.
363    pub error: Arc<crate::Error>,
364
365    /// Whether the error is considered recoverable or not.
366    ///
367    /// An error that's recoverable will disable the room's send queue, while an
368    /// unrecoverable error will be parked, until the user decides to do
369    /// something about it.
370    pub is_recoverable: bool,
371}
372
373impl Client {
374    /// Returns a [`SendQueue`] that handles sending, retrying and not
375    /// forgetting about requests that are to be sent.
376    pub fn send_queue(&self) -> SendQueue {
377        SendQueue::new(self.clone())
378    }
379}
380
381pub(super) struct SendQueueData {
382    /// Mapping of room to their unique send queue.
383    rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
384
385    /// Is the whole mechanism enabled or disabled?
386    ///
387    /// This is only kept in memory to initialize new room queues with an
388    /// initial enablement state.
389    globally_enabled: AtomicBool,
390
391    /// Global sender to send [`SendQueueUpdate`].
392    ///
393    /// See [`SendQueue::subscribe`].
394    global_update_sender: broadcast::Sender<SendQueueUpdate>,
395
396    /// Global error updates for the send queue.
397    error_sender: broadcast::Sender<SendQueueRoomError>,
398
399    /// Are we currently dropping the Client?
400    is_dropping: Arc<AtomicBool>,
401
402    /// Will media upload progress be reported via send queue updates?
403    report_media_upload_progress: Arc<AtomicBool>,
404}
405
406impl SendQueueData {
407    /// Create the data for a send queue, in the given enabled state.
408    pub fn new(globally_enabled: bool) -> Self {
409        let (global_update_sender, _) = broadcast::channel(32);
410        let (error_sender, _) = broadcast::channel(32);
411
412        Self {
413            rooms: Default::default(),
414            globally_enabled: AtomicBool::new(globally_enabled),
415            global_update_sender,
416            error_sender,
417            is_dropping: Arc::new(false.into()),
418            report_media_upload_progress: Arc::new(false.into()),
419        }
420    }
421}
422
423impl Drop for SendQueueData {
424    fn drop(&mut self) {
425        // Mark the whole send queue as shutting down, then wake up all the room
426        // queues so they're stopped too.
427        debug!("globally dropping the send queue");
428        self.is_dropping.store(true, Ordering::SeqCst);
429
430        let rooms = self.rooms.read().unwrap();
431        for room in rooms.values() {
432            room.inner.notifier.notify_one();
433        }
434    }
435}
436
437impl Room {
438    /// Returns the [`RoomSendQueue`] for this specific room.
439    pub fn send_queue(&self) -> RoomSendQueue {
440        self.client.send_queue().for_room(self.clone())
441    }
442}
443
444/// A per-room send queue.
445///
446/// This is cheap to clone.
447#[derive(Clone)]
448pub struct RoomSendQueue {
449    inner: Arc<RoomSendQueueInner>,
450}
451
452#[cfg(not(tarpaulin_include))]
453impl std::fmt::Debug for RoomSendQueue {
454    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
455        f.debug_struct("RoomSendQueue").finish_non_exhaustive()
456    }
457}
458
459impl RoomSendQueue {
460    fn new(
461        globally_enabled: bool,
462        global_update_sender: broadcast::Sender<SendQueueUpdate>,
463        global_error_sender: broadcast::Sender<SendQueueRoomError>,
464        is_dropping: Arc<AtomicBool>,
465        client: &Client,
466        room_id: OwnedRoomId,
467        report_media_upload_progress: Arc<AtomicBool>,
468    ) -> Self {
469        let (update_sender, _) = broadcast::channel(32);
470
471        let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
472        let notifier = Arc::new(Notify::new());
473
474        let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
475        let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
476
477        let task = client.task_monitor().spawn_background_task(
478            "send_queue",
479            Self::sending_task(
480                weak_room.clone(),
481                queue.clone(),
482                notifier.clone(),
483                global_update_sender.clone(),
484                update_sender.clone(),
485                locally_enabled.clone(),
486                global_error_sender,
487                is_dropping,
488                report_media_upload_progress,
489            ),
490        );
491
492        Self {
493            inner: Arc::new(RoomSendQueueInner {
494                room: weak_room,
495                global_update_sender,
496                update_sender,
497                _task: task,
498                queue,
499                notifier,
500                locally_enabled,
501            }),
502        }
503    }
504
505    /// Queues a raw event for sending it to this room.
506    ///
507    /// This immediately returns, and will push the event to be sent into a
508    /// queue, handled in the background.
509    ///
510    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
511    /// the [`Self::subscribe()`] method to get updates about the sending of
512    /// that event.
513    ///
514    /// By default, if sending failed on the first attempt, it will be retried a
515    /// few times. If sending failed after those retries, the entire
516    /// client's sending queue will be disabled, and it will need to be
517    /// manually re-enabled by the caller (e.g. after network is back, or when
518    /// something has been done about the faulty requests).
519    pub async fn send_raw(
520        &self,
521        content: Raw<AnyMessageLikeEventContent>,
522        event_type: String,
523    ) -> Result<SendHandle, RoomSendQueueError> {
524        let Some(room) = self.inner.room.get() else {
525            return Err(RoomSendQueueError::RoomDisappeared);
526        };
527        if room.state() != RoomState::Joined {
528            return Err(RoomSendQueueError::RoomNotJoined);
529        }
530
531        let content = SerializableEventContent::from_raw(content, event_type);
532
533        let created_at = MilliSecondsSinceUnixEpoch::now();
534        let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
535        trace!(%transaction_id, "manager sends a raw event to the background task");
536
537        self.inner.notifier.notify_one();
538
539        let send_handle = SendHandle {
540            room: self.clone(),
541            transaction_id: transaction_id.clone(),
542            media_handles: vec![],
543            created_at,
544        };
545
546        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
547            transaction_id,
548            content: LocalEchoContent::Event {
549                serialized_event: content,
550                send_handle: send_handle.clone(),
551                send_error: None,
552            },
553        }));
554
555        Ok(send_handle)
556    }
557
558    /// Queues an event for sending it to this room.
559    ///
560    /// This immediately returns, and will push the event to be sent into a
561    /// queue, handled in the background.
562    ///
563    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
564    /// the [`Self::subscribe()`] method to get updates about the sending of
565    /// that event.
566    ///
567    /// By default, if sending failed on the first attempt, it will be retried a
568    /// few times. If sending failed after those retries, the entire
569    /// client's sending queue will be disabled, and it will need to be
570    /// manually re-enabled by the caller (e.g. after network is back, or when
571    /// something has been done about the faulty requests).
572    pub async fn send(
573        &self,
574        content: AnyMessageLikeEventContent,
575    ) -> Result<SendHandle, RoomSendQueueError> {
576        self.send_raw(
577            Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
578            content.event_type().to_string(),
579        )
580        .await
581    }
582
583    /// Returns the current local requests as well as a receiver to listen to
584    /// the send queue updates, as defined in [`RoomSendQueueUpdate`].
585    ///
586    /// Use [`SendQueue::subscribe`] to subscribe to update for _all rooms_ with
587    /// a single receiver.
588    pub async fn subscribe(
589        &self,
590    ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
591    {
592        let local_echoes = self.inner.queue.local_echoes(self).await?;
593
594        Ok((local_echoes, self.inner.update_sender.subscribe()))
595    }
596
597    /// A task that must be spawned in the async runtime, running in the
598    /// background for each room that has a send queue.
599    ///
600    /// It only progresses forward: nothing can be cancelled at any point, which
601    /// makes the implementation not overly complicated to follow.
602    #[allow(clippy::too_many_arguments)]
603    #[instrument(skip_all, fields(room_id = %room.room_id()))]
604    async fn sending_task(
605        room: WeakRoom,
606        queue: QueueStorage,
607        notifier: Arc<Notify>,
608        global_update_sender: broadcast::Sender<SendQueueUpdate>,
609        update_sender: broadcast::Sender<RoomSendQueueUpdate>,
610        locally_enabled: Arc<AtomicBool>,
611        global_error_sender: broadcast::Sender<SendQueueRoomError>,
612        is_dropping: Arc<AtomicBool>,
613        report_media_upload_progress: Arc<AtomicBool>,
614    ) {
615        trace!("spawned the sending task");
616
617        let room_id = room.room_id();
618
619        loop {
620            // A request to shut down should be preferred above everything else.
621            if is_dropping.load(Ordering::SeqCst) {
622                trace!("shutting down!");
623                break;
624            }
625
626            // Try to apply dependent requests now; those applying to previously failed
627            // attempts (local echoes) would succeed now.
628            let mut new_updates = Vec::new();
629            if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
630                warn!("errors when applying dependent requests: {err}");
631            }
632
633            for up in new_updates {
634                send_update(&global_update_sender, &update_sender, room_id, up);
635            }
636
637            if !locally_enabled.load(Ordering::SeqCst) {
638                trace!("not enabled, sleeping");
639                // Wait for an explicit wakeup.
640                notifier.notified().await;
641                continue;
642            }
643
644            let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
645                Ok(Some(request)) => request,
646
647                Ok(None) => {
648                    trace!("queue is empty, sleeping");
649                    // Wait for an explicit wakeup.
650                    notifier.notified().await;
651                    continue;
652                }
653
654                Err(err) => {
655                    warn!("error when loading next request to send: {err}");
656                    continue;
657                }
658            };
659
660            let txn_id = queued_request.transaction_id.clone();
661            trace!(txn_id = %txn_id, "received a request to send!");
662
663            let Some(room) = room.get() else {
664                if is_dropping.load(Ordering::SeqCst) {
665                    break;
666                }
667                error!("the weak room couldn't be upgraded but we're not shutting down?");
668                continue;
669            };
670
671            // If this is a media/gallery upload, prepare the following:
672            // - transaction id for the related media event request,
673            // - progress metadata to feed the final media upload progress
674            // - an observable to watch the media upload progress.
675            let (related_txn_id, media_upload_progress_info, http_progress) =
676                if let QueuedRequestKind::MediaUpload {
677                    cache_key,
678                    thumbnail_source,
679                    #[cfg(feature = "unstable-msc4274")]
680                    accumulated,
681                    related_to,
682                    ..
683                } = &queued_request.kind
684                {
685                    // Prepare to watch and communicate the request's progress for media uploads, if
686                    // it has been requested.
687                    let (media_upload_progress_info, http_progress) =
688                        if report_media_upload_progress.load(Ordering::SeqCst) {
689                            let media_upload_progress_info =
690                                RoomSendQueue::create_media_upload_progress_info(
691                                    &queued_request.transaction_id,
692                                    related_to,
693                                    cache_key,
694                                    thumbnail_source.as_ref(),
695                                    #[cfg(feature = "unstable-msc4274")]
696                                    accumulated,
697                                    &room,
698                                    &queue,
699                                )
700                                .await;
701
702                            let progress = RoomSendQueue::create_media_upload_progress_observable(
703                                &media_upload_progress_info,
704                                related_to,
705                                &update_sender,
706                            );
707
708                            (Some(media_upload_progress_info), Some(progress))
709                        } else {
710                            Default::default()
711                        };
712
713                    (Some(related_to.clone()), media_upload_progress_info, http_progress)
714                } else {
715                    Default::default()
716                };
717
718            match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
719            {
720                Ok((Some(parent_key), encryption_info)) => match queue
721                    .mark_as_sent(&txn_id, parent_key.clone())
722                    .await
723                {
724                    Ok(()) => match parent_key {
725                        SentRequestKey::Event { event_id, event, event_type } => {
726                            send_update(
727                                &global_update_sender,
728                                &update_sender,
729                                room_id,
730                                RoomSendQueueUpdate::SentEvent {
731                                    transaction_id: txn_id,
732                                    event_id: event_id.clone(),
733                                },
734                            );
735
736                            // The event has been sent to the server and the server has received it.
737                            // Yepee! Now, we usually wait on the server to give us back the event
738                            // via the sync.
739                            //
740                            // Problem: sometimes the network lags, can be down, or the server may
741                            // be slow; well, anything can happen.
742                            //
743                            // It results in a weird situation where the user sees its event being
744                            // sent, then disappears before it's received again from the server.
745                            //
746                            // To avoid this situation, we eagerly save the event in the Event
747                            // Cache. It's similar to what would happen if the event was echoed back
748                            // from the server via the sync, but we avoid any network issues. The
749                            // Event Cache is smart enough to deduplicate events based on the event
750                            // ID, so it's safe to do that.
751                            //
752                            // If this little feature fails, it MUST NOT stop the Send Queue. Any
753                            // errors are logged, but the Send Queue will continue as if everything
754                            // happened successfully. This feature is not considered “crucial”.
755                            if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
756                            {
757                                let timeline_event = match Raw::from_json_string(
758                                    // Create a compact string: remove all useless spaces.
759                                    format!(
760                                        "{{\
761                                            \"event_id\":\"{event_id}\",\
762                                            \"origin_server_ts\":{ts},\
763                                            \"sender\":\"{sender}\",\
764                                            \"type\":\"{type}\",\
765                                            \"content\":{content}\
766                                        }}",
767                                        event_id = event_id,
768                                        ts = MilliSecondsSinceUnixEpoch::now().get(),
769                                        sender = room.client().user_id().expect("Client must be logged-in"),
770                                        type = event_type,
771                                        content = event.into_json(),
772                                    ),
773                                ) {
774                                    Ok(event) => match encryption_info {
775                                        #[cfg(feature = "e2e-encryption")]
776                                        Some(encryption_info) => {
777                                            use matrix_sdk_base::deserialized_responses::DecryptedRoomEvent;
778                                            let decrypted_event = DecryptedRoomEvent {
779                                                event: event.cast_unchecked(),
780                                                encryption_info: Arc::new(encryption_info),
781                                                unsigned_encryption_info: None,
782                                            };
783                                            Some(TimelineEvent::from_decrypted(
784                                                decrypted_event,
785                                                None,
786                                            ))
787                                        }
788                                        _ => Some(TimelineEvent::from_plaintext(event)),
789                                    },
790                                    Err(err) => {
791                                        error!(
792                                            ?err,
793                                            "Failed to build the (sync) event before the saving in the Event Cache"
794                                        );
795                                        None
796                                    }
797                                };
798
799                                // In case of an error, just log the error but not stop the Send
800                                // Queue. This feature is not
801                                // crucial.
802                                if let Some(timeline_event) = timeline_event
803                                    && let Err(err) = room_event_cache
804                                        .insert_sent_event_from_send_queue(timeline_event)
805                                        .await
806                                {
807                                    error!(
808                                        ?err,
809                                        "Failed to save the sent event in the Event Cache"
810                                    );
811                                }
812                            } else {
813                                info!(
814                                    "Cannot insert the sent event in the Event Cache because \
815                                    either the room no longer exists, or the Room Event Cache cannot be retrieved"
816                                );
817                            }
818                        }
819
820                        SentRequestKey::Media(sent_media_info) => {
821                            // Generate some final progress information, even if incremental
822                            // progress wasn't requested.
823                            let index =
824                                media_upload_progress_info.as_ref().map_or(0, |info| info.index);
825                            let progress = media_upload_progress_info
826                                .as_ref()
827                                .map(|info| {
828                                    AbstractProgress { current: info.bytes, total: info.bytes }
829                                        + info.offsets
830                                })
831                                .unwrap_or(AbstractProgress { current: 1, total: 1 });
832
833                            // Purposefully don't use `send_update` here, because we don't want to
834                            // notify the global listeners about an upload progress update.
835                            let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
836                                related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
837                                file: Some(sent_media_info.file),
838                                index,
839                                progress,
840                            });
841                        }
842                    },
843
844                    Err(err) => {
845                        warn!("unable to mark queued request as sent: {err}");
846                    }
847                },
848
849                Ok((None, _)) => {
850                    debug!("Request has been aborted while running, continuing.");
851                }
852
853                Err(err) => {
854                    let is_recoverable = match err {
855                        crate::Error::Http(ref http_err) => {
856                            // All transient errors are recoverable.
857                            matches!(
858                                http_err.retry_kind(),
859                                RetryKind::Transient { .. } | RetryKind::NetworkFailure
860                            )
861                        }
862
863                        // `ConcurrentRequestFailed` typically happens because of an HTTP failure;
864                        // since we don't get the underlying error, be lax and consider it
865                        // recoverable, and let observers decide to retry it or not. At some point
866                        // we'll get the actual underlying error.
867                        crate::Error::ConcurrentRequestFailed => true,
868
869                        // As of 2024-06-27, all other error types are considered unrecoverable.
870                        _ => false,
871                    };
872
873                    // Disable the queue for this room after any kind of error happened.
874                    locally_enabled.store(false, Ordering::SeqCst);
875
876                    if is_recoverable {
877                        warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
878
879                        // In this case, we intentionally keep the request in the queue, but mark it
880                        // as not being sent anymore.
881                        queue.mark_as_not_being_sent(&txn_id).await;
882
883                        // Let observers know about a failure *after* we've
884                        // marked the item as not being sent anymore. Otherwise,
885                        // there's a possible race where a caller might try to
886                        // remove an item, while it's still marked as being
887                        // sent, resulting in a cancellation failure.
888                    } else {
889                        warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
890
891                        // Mark the request as wedged, so it's not picked at any future point.
892                        if let Err(storage_error) =
893                            queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
894                        {
895                            warn!("unable to mark request as wedged: {storage_error}");
896                        }
897                    }
898
899                    let error = Arc::new(err);
900
901                    let _ = global_error_sender.send(SendQueueRoomError {
902                        room_id: room_id.to_owned(),
903                        error: error.clone(),
904                        is_recoverable,
905                    });
906
907                    send_update(
908                        &global_update_sender,
909                        &update_sender,
910                        room_id,
911                        RoomSendQueueUpdate::SendError {
912                            transaction_id: related_txn_id.unwrap_or(txn_id),
913                            error,
914                            is_recoverable,
915                        },
916                    );
917                }
918            }
919        }
920
921        info!("exited sending task");
922    }
923
924    /// Handles a single request and returns the [`SentRequestKey`] on success
925    /// (unless the request was cancelled, in which case it'll return
926    /// `None`).
927    async fn handle_request(
928        room: &Room,
929        request: QueuedRequest,
930        cancel_upload_rx: Option<oneshot::Receiver<()>>,
931        progress: Option<SharedObservable<TransmissionProgress>>,
932    ) -> Result<(Option<SentRequestKey>, Option<EncryptionInfo>), crate::Error> {
933        match request.kind {
934            QueuedRequestKind::Event { content } => {
935                let (event, event_type) = content.into_raw();
936
937                let result = room
938                    .send_raw(&event_type, &event)
939                    .with_transaction_id(&request.transaction_id)
940                    .with_request_config(RequestConfig::short_retry())
941                    .await?;
942
943                trace!(txn_id = %request.transaction_id, event_id = %result.response.event_id, "event successfully sent");
944
945                Ok((
946                    Some(SentRequestKey::Event {
947                        event_id: result.response.event_id,
948                        event,
949                        event_type,
950                    }),
951                    result.encryption_info,
952                ))
953            }
954
955            QueuedRequestKind::MediaUpload {
956                content_type,
957                cache_key,
958                thumbnail_source,
959                related_to: relates_to,
960                #[cfg(feature = "unstable-msc4274")]
961                accumulated,
962            } => {
963                trace!(%relates_to, "uploading media related to event");
964
965                let fut = async move {
966                    let data = room
967                        .client()
968                        .media_store()
969                        .lock()
970                        .await?
971                        .get_media_content(&cache_key)
972                        .await?
973                        .ok_or(crate::Error::SendQueueWedgeError(Box::new(
974                            QueueWedgeError::MissingMediaContent,
975                        )))?;
976
977                    let mime = Mime::from_str(&content_type).map_err(|_| {
978                        crate::Error::SendQueueWedgeError(Box::new(
979                            QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
980                        ))
981                    })?;
982
983                    #[cfg(feature = "e2e-encryption")]
984                    let media_source = if room.latest_encryption_state().await?.is_encrypted() {
985                        trace!("upload will be encrypted (encrypted room)");
986
987                        let mut cursor = std::io::Cursor::new(data);
988                        let mut req = room
989                            .client
990                            .upload_encrypted_file(&mut cursor)
991                            .with_request_config(RequestConfig::short_retry());
992                        if let Some(progress) = progress {
993                            req = req.with_send_progress_observable(progress);
994                        }
995                        let encrypted_file = req.await?;
996
997                        MediaSource::Encrypted(Box::new(encrypted_file))
998                    } else {
999                        trace!("upload will be in clear text (room without encryption)");
1000
1001                        let request_config = RequestConfig::short_retry()
1002                            .timeout(Media::reasonable_upload_timeout(&data));
1003                        let mut req =
1004                            room.client().media().upload(&mime, data, Some(request_config));
1005                        if let Some(progress) = progress {
1006                            req = req.with_send_progress_observable(progress);
1007                        }
1008                        let res = req.await?;
1009
1010                        MediaSource::Plain(res.content_uri)
1011                    };
1012
1013                    #[cfg(not(feature = "e2e-encryption"))]
1014                    let media_source = {
1015                        let request_config = RequestConfig::short_retry()
1016                            .timeout(Media::reasonable_upload_timeout(&data));
1017                        let mut req =
1018                            room.client().media().upload(&mime, data, Some(request_config));
1019                        if let Some(progress) = progress {
1020                            req = req.with_send_progress_observable(progress);
1021                        }
1022                        let res = req.await?;
1023                        MediaSource::Plain(res.content_uri)
1024                    };
1025
1026                    let uri = match &media_source {
1027                        MediaSource::Plain(uri) => uri,
1028                        MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
1029                    };
1030                    trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
1031
1032                    Ok((
1033                        Some(SentRequestKey::Media(SentMediaInfo {
1034                            file: media_source,
1035                            thumbnail: thumbnail_source,
1036                            #[cfg(feature = "unstable-msc4274")]
1037                            accumulated,
1038                        })),
1039                        None,
1040                    ))
1041                };
1042
1043                let wait_for_cancel = async move {
1044                    if let Some(rx) = cancel_upload_rx {
1045                        rx.await
1046                    } else {
1047                        std::future::pending().await
1048                    }
1049                };
1050
1051                tokio::select! {
1052                    biased;
1053
1054                    _ = wait_for_cancel => {
1055                        Ok((None, None))
1056                    }
1057
1058                    res = fut => {
1059                        res
1060                    }
1061                }
1062            }
1063        }
1064    }
1065
1066    /// Returns whether the room is enabled, at the room level.
1067    pub fn is_enabled(&self) -> bool {
1068        self.inner.locally_enabled.load(Ordering::SeqCst)
1069    }
1070
1071    /// Set the locally enabled flag for this room queue.
1072    pub fn set_enabled(&self, enabled: bool) {
1073        self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
1074
1075        // No need to wake a task to tell it it's been disabled, so only notify if we're
1076        // re-enabling the queue.
1077        if enabled {
1078            self.inner.notifier.notify_one();
1079        }
1080    }
1081
1082    /// Send an update on the room send queue channel, and on the global send
1083    /// queue channel, i.e. it sends a [`RoomSendQueueUpdate`] and a
1084    /// [`SendQueueUpdate`].
1085    fn send_update(&self, update: RoomSendQueueUpdate) {
1086        let _ = self.inner.update_sender.send(update.clone());
1087        let _ = self
1088            .inner
1089            .global_update_sender
1090            .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
1091    }
1092}
1093
1094fn send_update(
1095    global_update_sender: &broadcast::Sender<SendQueueUpdate>,
1096    update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
1097    room_id: &RoomId,
1098    update: RoomSendQueueUpdate,
1099) {
1100    let _ = update_sender.send(update.clone());
1101    let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
1102}
1103
1104impl From<&crate::Error> for QueueWedgeError {
1105    fn from(value: &crate::Error) -> Self {
1106        match value {
1107            #[cfg(feature = "e2e-encryption")]
1108            crate::Error::OlmError(error) => match &**error {
1109                OlmError::SessionRecipientCollectionError(error) => match error {
1110                    SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
1111                        QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
1112                    }
1113
1114                    SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
1115                        QueueWedgeError::IdentityViolations { users: users.clone() }
1116                    }
1117
1118                    SessionRecipientCollectionError::CrossSigningNotSetup
1119                    | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
1120                        QueueWedgeError::CrossVerificationRequired
1121                    }
1122                },
1123                _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1124            },
1125
1126            // Flatten errors of `Self` type.
1127            crate::Error::SendQueueWedgeError(error) => *error.clone(),
1128
1129            _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1130        }
1131    }
1132}
1133
1134struct RoomSendQueueInner {
1135    /// The room which this send queue relates to.
1136    room: WeakRoom,
1137
1138    /// Global sender to send [`SendQueueUpdate`].
1139    ///
1140    /// See [`SendQueue::subscribe`].
1141    global_update_sender: broadcast::Sender<SendQueueUpdate>,
1142
1143    /// Broadcaster for notifications about the statuses of requests to be sent.
1144    ///
1145    /// Can be subscribed to from the outside.
1146    ///
1147    /// See [`RoomSendQueue::subscribe`].
1148    update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1149
1150    /// Queue of requests that are either to be sent, or being sent.
1151    ///
1152    /// When a request has been sent to the server, it is removed from that
1153    /// queue *after* being sent. That way, we will retry sending upon
1154    /// failure, in the same order requests have been inserted in the first
1155    /// place.
1156    queue: QueueStorage,
1157
1158    /// A notifier that's updated any time common data is touched (stopped or
1159    /// enabled statuses), or the associated room [`QueueStorage`].
1160    notifier: Arc<Notify>,
1161
1162    /// Should the room process new requests or not (because e.g. it might be
1163    /// running off the network)?
1164    locally_enabled: Arc<AtomicBool>,
1165
1166    /// Handle to the actual sending task. Unused, but kept alive along this
1167    /// data structure.
1168    _task: BackgroundTaskHandle,
1169}
1170
1171/// Information about a request being sent right this moment.
1172struct BeingSentInfo {
1173    /// Transaction id of the thing being sent.
1174    transaction_id: OwnedTransactionId,
1175
1176    /// For an upload request, a trigger to cancel the upload before it
1177    /// completes.
1178    cancel_upload: Option<oneshot::Sender<()>>,
1179}
1180
1181impl BeingSentInfo {
1182    /// Aborts the upload, if a trigger is available.
1183    ///
1184    /// Consumes the object because the sender is a oneshot and will be consumed
1185    /// upon sending.
1186    fn cancel_upload(self) -> bool {
1187        if let Some(cancel_upload) = self.cancel_upload {
1188            let _ = cancel_upload.send(());
1189            true
1190        } else {
1191            false
1192        }
1193    }
1194}
1195
1196/// A specialized lock that guards both against the state store and the
1197/// [`Self::being_sent`] data.
1198#[derive(Clone)]
1199struct StoreLock {
1200    /// Reference to the client, to get access to the underlying store.
1201    client: WeakClient,
1202
1203    /// The one queued request that is being sent at the moment, along with
1204    /// associated data that can be useful to act upon it.
1205    ///
1206    /// Also used as the lock to access the state store.
1207    being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1208}
1209
1210impl StoreLock {
1211    /// Gets a hold of the locked store and [`Self::being_sent`] pair.
1212    async fn lock(&self) -> StoreLockGuard {
1213        StoreLockGuard {
1214            client: self.client.clone(),
1215            being_sent: self.being_sent.clone().lock_owned().await,
1216        }
1217    }
1218}
1219
1220/// A lock guard obtained through locking with [`StoreLock`].
1221/// `being_sent` data.
1222struct StoreLockGuard {
1223    /// Reference to the client, to get access to the underlying store.
1224    client: WeakClient,
1225
1226    /// The one queued request that is being sent at the moment, along with
1227    /// associated data that can be useful to act upon it.
1228    being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1229}
1230
1231impl StoreLockGuard {
1232    /// Get a client from the locked state, useful to get a handle on a store.
1233    fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1234        self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1235    }
1236}
1237
1238#[derive(Clone)]
1239struct QueueStorage {
1240    /// A lock to make sure the state store is only accessed once at a time, to
1241    /// make some store operations atomic.
1242    store: StoreLock,
1243
1244    /// To which room is this storage related.
1245    room_id: OwnedRoomId,
1246
1247    /// In-memory mapping of media transaction IDs to thumbnail sizes for the
1248    /// purpose of progress reporting.
1249    ///
1250    /// The keys are the transaction IDs for sending the media or gallery event
1251    /// after all uploads have finished. This allows us to easily clean up the
1252    /// cache after the event was sent.
1253    ///
1254    /// For media uploads, the value vector will always have a single element.
1255    ///
1256    /// For galleries, some gallery items might not have a thumbnail while
1257    /// others do. Since we access the thumbnails by their index within the
1258    /// gallery, the vector needs to hold optional usize's.
1259    thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1260}
1261
1262impl QueueStorage {
1263    /// Default priority for a queued request.
1264    const LOW_PRIORITY: usize = 0;
1265
1266    /// High priority for a queued request that must be handled before others.
1267    const HIGH_PRIORITY: usize = 10;
1268
1269    /// Create a new queue for queuing requests to be sent later.
1270    fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1271        Self {
1272            room_id: room,
1273            store: StoreLock { client, being_sent: Default::default() },
1274            thumbnail_file_sizes: Default::default(),
1275        }
1276    }
1277
1278    /// Push a new event to be sent in the queue, with a default priority of 0.
1279    ///
1280    /// Returns the transaction id chosen to identify the request.
1281    async fn push(
1282        &self,
1283        request: QueuedRequestKind,
1284        created_at: MilliSecondsSinceUnixEpoch,
1285    ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1286        let transaction_id = TransactionId::new();
1287
1288        self.store
1289            .lock()
1290            .await
1291            .client()?
1292            .state_store()
1293            .save_send_queue_request(
1294                &self.room_id,
1295                transaction_id.clone(),
1296                created_at,
1297                request,
1298                Self::LOW_PRIORITY,
1299            )
1300            .await?;
1301
1302        Ok(transaction_id)
1303    }
1304
1305    /// Peeks the next request to be sent, marking it as being sent.
1306    ///
1307    /// It is required to call [`Self::mark_as_sent`] after it's been
1308    /// effectively sent.
1309    async fn peek_next_to_send(
1310        &self,
1311    ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1312    {
1313        let mut guard = self.store.lock().await;
1314        let queued_requests =
1315            guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1316
1317        if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1318            let (cancel_upload_tx, cancel_upload_rx) =
1319                if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1320                    let (tx, rx) = oneshot::channel();
1321                    (Some(tx), Some(rx))
1322                } else {
1323                    Default::default()
1324                };
1325
1326            let prev = guard.being_sent.replace(BeingSentInfo {
1327                transaction_id: request.transaction_id.clone(),
1328                cancel_upload: cancel_upload_tx,
1329            });
1330
1331            if let Some(prev) = prev {
1332                error!(
1333                    prev_txn = ?prev.transaction_id,
1334                    "a previous request was still active while picking a new one"
1335                );
1336            }
1337
1338            Ok(Some((request.clone(), cancel_upload_rx)))
1339        } else {
1340            Ok(None)
1341        }
1342    }
1343
1344    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1345    /// with the given transaction id as not being sent anymore, so it can
1346    /// be removed from the queue later.
1347    async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1348        let was_being_sent = self.store.lock().await.being_sent.take();
1349
1350        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1351        if prev_txn != Some(transaction_id) {
1352            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1353        }
1354    }
1355
1356    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1357    /// with the given transaction id as being wedged (and not being sent
1358    /// anymore), so it can be removed from the queue later.
1359    async fn mark_as_wedged(
1360        &self,
1361        transaction_id: &TransactionId,
1362        reason: QueueWedgeError,
1363    ) -> Result<(), RoomSendQueueStorageError> {
1364        // Keep the lock until we're done touching the storage.
1365        let mut guard = self.store.lock().await;
1366        let was_being_sent = guard.being_sent.take();
1367
1368        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1369        if prev_txn != Some(transaction_id) {
1370            error!(
1371                ?prev_txn,
1372                "previous active request didn't match that we expect (after permanent error)",
1373            );
1374        }
1375
1376        Ok(guard
1377            .client()?
1378            .state_store()
1379            .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1380            .await?)
1381    }
1382
1383    /// Marks a request identified with the given transaction id as being now
1384    /// unwedged and adds it back to the queue.
1385    async fn mark_as_unwedged(
1386        &self,
1387        transaction_id: &TransactionId,
1388    ) -> Result<(), RoomSendQueueStorageError> {
1389        Ok(self
1390            .store
1391            .lock()
1392            .await
1393            .client()?
1394            .state_store()
1395            .update_send_queue_request_status(&self.room_id, transaction_id, None)
1396            .await?)
1397    }
1398
1399    /// Marks a request pushed with [`Self::push`] and identified with the given
1400    /// transaction id as sent, by removing it from the local queue.
1401    async fn mark_as_sent(
1402        &self,
1403        transaction_id: &TransactionId,
1404        parent_key: SentRequestKey,
1405    ) -> Result<(), RoomSendQueueStorageError> {
1406        // Keep the lock until we're done touching the storage.
1407        let mut guard = self.store.lock().await;
1408        let was_being_sent = guard.being_sent.take();
1409
1410        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1411        if prev_txn != Some(transaction_id) {
1412            error!(
1413                ?prev_txn,
1414                "previous active request didn't match that we expect (after successful send)",
1415            );
1416        }
1417
1418        let client = guard.client()?;
1419        let store = client.state_store();
1420
1421        // Update all dependent requests.
1422        store
1423            .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1424            .await?;
1425
1426        let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1427
1428        if !removed {
1429            warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1430        }
1431
1432        self.thumbnail_file_sizes.lock().remove(transaction_id);
1433
1434        Ok(())
1435    }
1436
1437    /// Cancel a sending command for an event that has been sent with
1438    /// [`Self::push`] with the given transaction id.
1439    ///
1440    /// Returns whether the given transaction has been effectively removed. If
1441    /// false, this either means that the transaction id was unrelated to
1442    /// this queue, or that the request was sent before we cancelled it.
1443    async fn cancel_event(
1444        &self,
1445        transaction_id: &TransactionId,
1446    ) -> Result<bool, RoomSendQueueStorageError> {
1447        let guard = self.store.lock().await;
1448
1449        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1450            == Some(transaction_id)
1451        {
1452            // Save the intent to redact the event.
1453            guard
1454                .client()?
1455                .state_store()
1456                .save_dependent_queued_request(
1457                    &self.room_id,
1458                    transaction_id,
1459                    ChildTransactionId::new(),
1460                    MilliSecondsSinceUnixEpoch::now(),
1461                    DependentQueuedRequestKind::RedactEvent,
1462                )
1463                .await?;
1464
1465            return Ok(true);
1466        }
1467
1468        let removed = guard
1469            .client()?
1470            .state_store()
1471            .remove_send_queue_request(&self.room_id, transaction_id)
1472            .await?;
1473
1474        self.thumbnail_file_sizes.lock().remove(transaction_id);
1475
1476        Ok(removed)
1477    }
1478
1479    /// Replace an event that has been sent with [`Self::push`] with the given
1480    /// transaction id, before it's been actually sent.
1481    ///
1482    /// Returns whether the given transaction has been effectively edited. If
1483    /// false, this either means that the transaction id was unrelated to
1484    /// this queue, or that the request was sent before we edited it.
1485    async fn replace_event(
1486        &self,
1487        transaction_id: &TransactionId,
1488        serializable: SerializableEventContent,
1489    ) -> Result<bool, RoomSendQueueStorageError> {
1490        let guard = self.store.lock().await;
1491
1492        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1493            == Some(transaction_id)
1494        {
1495            // Save the intent to edit the associated event.
1496            guard
1497                .client()?
1498                .state_store()
1499                .save_dependent_queued_request(
1500                    &self.room_id,
1501                    transaction_id,
1502                    ChildTransactionId::new(),
1503                    MilliSecondsSinceUnixEpoch::now(),
1504                    DependentQueuedRequestKind::EditEvent { new_content: serializable },
1505                )
1506                .await?;
1507
1508            return Ok(true);
1509        }
1510
1511        let edited = guard
1512            .client()?
1513            .state_store()
1514            .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1515            .await?;
1516
1517        Ok(edited)
1518    }
1519
1520    /// Push requests (and dependents) to upload a media.
1521    ///
1522    /// See the module-level description for details of the whole processus.
1523    #[allow(clippy::too_many_arguments)]
1524    async fn push_media(
1525        &self,
1526        event: RoomMessageEventContent,
1527        content_type: Mime,
1528        send_event_txn: OwnedTransactionId,
1529        created_at: MilliSecondsSinceUnixEpoch,
1530        upload_file_txn: OwnedTransactionId,
1531        file_media_request: MediaRequestParameters,
1532        thumbnail: Option<QueueThumbnailInfo>,
1533    ) -> Result<(), RoomSendQueueStorageError> {
1534        let guard = self.store.lock().await;
1535        let client = guard.client()?;
1536        let store = client.state_store();
1537
1538        // There's only a single media to be sent, so it has at most one thumbnail.
1539        let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1540
1541        let thumbnail_info = self
1542            .push_thumbnail_and_media_uploads(
1543                store,
1544                &content_type,
1545                send_event_txn.clone(),
1546                created_at,
1547                upload_file_txn.clone(),
1548                file_media_request,
1549                thumbnail,
1550            )
1551            .await?;
1552
1553        // Push the dependent request for the event itself.
1554        store
1555            .save_dependent_queued_request(
1556                &self.room_id,
1557                &upload_file_txn,
1558                send_event_txn.clone().into(),
1559                created_at,
1560                DependentQueuedRequestKind::FinishUpload {
1561                    local_echo: Box::new(event),
1562                    file_upload: upload_file_txn.clone(),
1563                    thumbnail_info,
1564                },
1565            )
1566            .await?;
1567
1568        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1569
1570        Ok(())
1571    }
1572
1573    /// Push requests (and dependents) to upload a gallery.
1574    ///
1575    /// See the module-level description for details of the whole processus.
1576    #[cfg(feature = "unstable-msc4274")]
1577    #[allow(clippy::too_many_arguments)]
1578    async fn push_gallery(
1579        &self,
1580        event: RoomMessageEventContent,
1581        send_event_txn: OwnedTransactionId,
1582        created_at: MilliSecondsSinceUnixEpoch,
1583        item_queue_infos: Vec<GalleryItemQueueInfo>,
1584    ) -> Result<(), RoomSendQueueStorageError> {
1585        let guard = self.store.lock().await;
1586        let client = guard.client()?;
1587        let store = client.state_store();
1588
1589        let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1590        let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1591
1592        let Some((first, rest)) = item_queue_infos.split_first() else {
1593            return Ok(());
1594        };
1595
1596        let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1597            first;
1598
1599        let thumbnail_info = self
1600            .push_thumbnail_and_media_uploads(
1601                store,
1602                content_type,
1603                send_event_txn.clone(),
1604                created_at,
1605                upload_file_txn.clone(),
1606                file_media_request.clone(),
1607                thumbnail.clone(),
1608            )
1609            .await?;
1610
1611        finish_item_infos
1612            .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1613        thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1614
1615        let mut last_upload_file_txn = upload_file_txn.clone();
1616
1617        for item_queue_info in rest {
1618            let GalleryItemQueueInfo {
1619                content_type,
1620                upload_file_txn,
1621                file_media_request,
1622                thumbnail,
1623            } = item_queue_info;
1624
1625            let thumbnail_info = if let Some(QueueThumbnailInfo {
1626                finish_upload_thumbnail_info: thumbnail_info,
1627                media_request_parameters: thumbnail_media_request,
1628                content_type: thumbnail_content_type,
1629                ..
1630            }) = thumbnail
1631            {
1632                let upload_thumbnail_txn = thumbnail_info.txn.clone();
1633
1634                // Save the thumbnail upload request as a dependent request of the last file
1635                // upload.
1636                store
1637                    .save_dependent_queued_request(
1638                        &self.room_id,
1639                        &last_upload_file_txn,
1640                        upload_thumbnail_txn.clone().into(),
1641                        created_at,
1642                        DependentQueuedRequestKind::UploadFileOrThumbnail {
1643                            content_type: thumbnail_content_type.to_string(),
1644                            cache_key: thumbnail_media_request.clone(),
1645                            related_to: send_event_txn.clone(),
1646                            parent_is_thumbnail_upload: false,
1647                        },
1648                    )
1649                    .await?;
1650
1651                last_upload_file_txn = upload_thumbnail_txn;
1652
1653                Some(thumbnail_info)
1654            } else {
1655                None
1656            };
1657
1658            // Save the file upload as a dependent request of the previous upload.
1659            store
1660                .save_dependent_queued_request(
1661                    &self.room_id,
1662                    &last_upload_file_txn,
1663                    upload_file_txn.clone().into(),
1664                    created_at,
1665                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1666                        content_type: content_type.to_string(),
1667                        cache_key: file_media_request.clone(),
1668                        related_to: send_event_txn.clone(),
1669                        parent_is_thumbnail_upload: thumbnail.is_some(),
1670                    },
1671                )
1672                .await?;
1673
1674            finish_item_infos.push(FinishGalleryItemInfo {
1675                file_upload: upload_file_txn.clone(),
1676                thumbnail_info: thumbnail_info.cloned(),
1677            });
1678            thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1679
1680            last_upload_file_txn = upload_file_txn.clone();
1681        }
1682
1683        // Push the request for the event itself as a dependent request of the last file
1684        // upload.
1685        store
1686            .save_dependent_queued_request(
1687                &self.room_id,
1688                &last_upload_file_txn,
1689                send_event_txn.clone().into(),
1690                created_at,
1691                DependentQueuedRequestKind::FinishGallery {
1692                    local_echo: Box::new(event),
1693                    item_infos: finish_item_infos,
1694                },
1695            )
1696            .await?;
1697
1698        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1699
1700        Ok(())
1701    }
1702
1703    /// If a thumbnail exists, pushes a [`QueuedRequestKind::MediaUpload`] to
1704    /// upload it
1705    /// and a [`DependentQueuedRequestKind::UploadFileOrThumbnail`] to upload
1706    /// the media itself. Otherwise, pushes a
1707    /// [`QueuedRequestKind::MediaUpload`] to upload the media directly.
1708    #[allow(clippy::too_many_arguments)]
1709    async fn push_thumbnail_and_media_uploads(
1710        &self,
1711        store: &DynStateStore,
1712        content_type: &Mime,
1713        send_event_txn: OwnedTransactionId,
1714        created_at: MilliSecondsSinceUnixEpoch,
1715        upload_file_txn: OwnedTransactionId,
1716        file_media_request: MediaRequestParameters,
1717        thumbnail: Option<QueueThumbnailInfo>,
1718    ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1719        if let Some(QueueThumbnailInfo {
1720            finish_upload_thumbnail_info: thumbnail_info,
1721            media_request_parameters: thumbnail_media_request,
1722            content_type: thumbnail_content_type,
1723            ..
1724        }) = thumbnail
1725        {
1726            let upload_thumbnail_txn = thumbnail_info.txn.clone();
1727
1728            // Save the thumbnail upload request.
1729            store
1730                .save_send_queue_request(
1731                    &self.room_id,
1732                    upload_thumbnail_txn.clone(),
1733                    created_at,
1734                    QueuedRequestKind::MediaUpload {
1735                        content_type: thumbnail_content_type.to_string(),
1736                        cache_key: thumbnail_media_request,
1737                        thumbnail_source: None, // the thumbnail has no thumbnails :)
1738                        related_to: send_event_txn.clone(),
1739                        #[cfg(feature = "unstable-msc4274")]
1740                        accumulated: vec![],
1741                    },
1742                    Self::LOW_PRIORITY,
1743                )
1744                .await?;
1745
1746            // Save the file upload request as a dependent request of the thumbnail upload.
1747            store
1748                .save_dependent_queued_request(
1749                    &self.room_id,
1750                    &upload_thumbnail_txn,
1751                    upload_file_txn.into(),
1752                    created_at,
1753                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1754                        content_type: content_type.to_string(),
1755                        cache_key: file_media_request,
1756                        related_to: send_event_txn,
1757                        parent_is_thumbnail_upload: true,
1758                    },
1759                )
1760                .await?;
1761
1762            Ok(Some(thumbnail_info))
1763        } else {
1764            // Save the file upload as its own request, not a dependent one.
1765            store
1766                .save_send_queue_request(
1767                    &self.room_id,
1768                    upload_file_txn,
1769                    created_at,
1770                    QueuedRequestKind::MediaUpload {
1771                        content_type: content_type.to_string(),
1772                        cache_key: file_media_request,
1773                        thumbnail_source: None,
1774                        related_to: send_event_txn,
1775                        #[cfg(feature = "unstable-msc4274")]
1776                        accumulated: vec![],
1777                    },
1778                    Self::LOW_PRIORITY,
1779                )
1780                .await?;
1781
1782            Ok(None)
1783        }
1784    }
1785
1786    /// Reacts to the given local echo of an event.
1787    #[instrument(skip(self))]
1788    async fn react(
1789        &self,
1790        transaction_id: &TransactionId,
1791        key: String,
1792        created_at: MilliSecondsSinceUnixEpoch,
1793    ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1794        let guard = self.store.lock().await;
1795        let client = guard.client()?;
1796        let store = client.state_store();
1797
1798        let requests = store.load_send_queue_requests(&self.room_id).await?;
1799
1800        // If the target event has been already sent, abort immediately.
1801        if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1802            // We didn't find it as a queued request; try to find it as a dependent queued
1803            // request.
1804            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1805            if !dependent_requests
1806                .into_iter()
1807                .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1808                .any(|child_txn| *child_txn == *transaction_id)
1809            {
1810                // We didn't find it as either a request or a dependent request, abort.
1811                return Ok(None);
1812            }
1813        }
1814
1815        // Record the dependent request.
1816        let reaction_txn_id = ChildTransactionId::new();
1817        store
1818            .save_dependent_queued_request(
1819                &self.room_id,
1820                transaction_id,
1821                reaction_txn_id.clone(),
1822                created_at,
1823                DependentQueuedRequestKind::ReactEvent { key },
1824            )
1825            .await?;
1826
1827        Ok(Some(reaction_txn_id))
1828    }
1829
1830    /// Returns a list of the local echoes, that is, all the requests that we're
1831    /// about to send but that haven't been sent yet (or are being sent).
1832    async fn local_echoes(
1833        &self,
1834        room: &RoomSendQueue,
1835    ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1836        let guard = self.store.lock().await;
1837        let client = guard.client()?;
1838        let store = client.state_store();
1839
1840        let local_requests =
1841            store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1842                Some(LocalEcho {
1843                    transaction_id: queued.transaction_id.clone(),
1844                    content: match queued.kind {
1845                        QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1846                            serialized_event: content,
1847                            send_handle: SendHandle {
1848                                room: room.clone(),
1849                                transaction_id: queued.transaction_id,
1850                                media_handles: vec![],
1851                                created_at: queued.created_at,
1852                            },
1853                            send_error: queued.error,
1854                        },
1855
1856                        QueuedRequestKind::MediaUpload { .. } => {
1857                            // Don't return uploaded medias as their own things; the accompanying
1858                            // event represented as a dependent request should be sufficient.
1859                            return None;
1860                        }
1861                    },
1862                })
1863            });
1864
1865        let reactions_and_medias = store
1866            .load_dependent_queued_requests(&self.room_id)
1867            .await?
1868            .into_iter()
1869            .filter_map(|dep| match dep.kind {
1870                DependentQueuedRequestKind::EditEvent { .. }
1871                | DependentQueuedRequestKind::RedactEvent => {
1872                    // TODO: reflect local edits/redacts too?
1873                    None
1874                }
1875
1876                DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1877                    transaction_id: dep.own_transaction_id.clone().into(),
1878                    content: LocalEchoContent::React {
1879                        key,
1880                        send_handle: SendReactionHandle {
1881                            room: room.clone(),
1882                            transaction_id: dep.own_transaction_id,
1883                        },
1884                        applies_to: dep.parent_transaction_id,
1885                    },
1886                }),
1887
1888                DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
1889                    // Don't reflect these: only the associated event is interesting to observers.
1890                    None
1891                }
1892
1893                DependentQueuedRequestKind::FinishUpload {
1894                    local_echo,
1895                    file_upload,
1896                    thumbnail_info,
1897                } => {
1898                    // Materialize as an event local echo.
1899                    Some(LocalEcho {
1900                        transaction_id: dep.own_transaction_id.clone().into(),
1901                        content: LocalEchoContent::Event {
1902                            serialized_event: SerializableEventContent::new(&(*local_echo).into())
1903                                .ok()?,
1904                            send_handle: SendHandle {
1905                                room: room.clone(),
1906                                transaction_id: dep.own_transaction_id.into(),
1907                                media_handles: vec![MediaHandles {
1908                                    upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1909                                    upload_file_txn: file_upload,
1910                                }],
1911                                created_at: dep.created_at,
1912                            },
1913                            send_error: None,
1914                        },
1915                    })
1916                }
1917
1918                #[cfg(feature = "unstable-msc4274")]
1919                DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
1920                    // Materialize as an event local echo.
1921                    self.create_gallery_local_echo(
1922                        dep.own_transaction_id,
1923                        room,
1924                        dep.created_at,
1925                        local_echo,
1926                        item_infos,
1927                    )
1928                }
1929            });
1930
1931        Ok(local_requests.chain(reactions_and_medias).collect())
1932    }
1933
1934    /// Create a local echo for a gallery event.
1935    #[cfg(feature = "unstable-msc4274")]
1936    fn create_gallery_local_echo(
1937        &self,
1938        transaction_id: ChildTransactionId,
1939        room: &RoomSendQueue,
1940        created_at: MilliSecondsSinceUnixEpoch,
1941        local_echo: Box<RoomMessageEventContent>,
1942        item_infos: Vec<FinishGalleryItemInfo>,
1943    ) -> Option<LocalEcho> {
1944        Some(LocalEcho {
1945            transaction_id: transaction_id.clone().into(),
1946            content: LocalEchoContent::Event {
1947                serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
1948                send_handle: SendHandle {
1949                    room: room.clone(),
1950                    transaction_id: transaction_id.into(),
1951                    media_handles: item_infos
1952                        .into_iter()
1953                        .map(|i| MediaHandles {
1954                            upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
1955                            upload_file_txn: i.file_upload,
1956                        })
1957                        .collect(),
1958                    created_at,
1959                },
1960                send_error: None,
1961            },
1962        })
1963    }
1964
1965    /// Try to apply a single dependent request, whether it's local or remote.
1966    ///
1967    /// This swallows errors that would retrigger every time if we retried
1968    /// applying the dependent request: invalid edit content, etc.
1969    ///
1970    /// Returns true if the dependent request has been sent (or should not be
1971    /// retried later).
1972    #[instrument(skip_all)]
1973    async fn try_apply_single_dependent_request(
1974        &self,
1975        client: &Client,
1976        dependent_request: DependentQueuedRequest,
1977        new_updates: &mut Vec<RoomSendQueueUpdate>,
1978    ) -> Result<bool, RoomSendQueueError> {
1979        let store = client.state_store();
1980
1981        let parent_key = dependent_request.parent_key;
1982
1983        match dependent_request.kind {
1984            DependentQueuedRequestKind::EditEvent { new_content } => {
1985                if let Some(parent_key) = parent_key {
1986                    let Some(event_id) = parent_key.into_event_id() else {
1987                        return Err(RoomSendQueueError::StorageError(
1988                            RoomSendQueueStorageError::InvalidParentKey,
1989                        ));
1990                    };
1991
1992                    // The parent event has been sent, so send an edit event.
1993                    let room = client
1994                        .get_room(&self.room_id)
1995                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1996
1997                    // Check the event is one we know how to edit with an edit event.
1998
1999                    // It must be deserializable…
2000                    let edited_content = match new_content.deserialize() {
2001                        Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
2002                            // Assume no relationships.
2003                            EditedContent::RoomMessage(c.into())
2004                        }
2005
2006                        Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
2007                            let poll_start = c.poll_start().clone();
2008                            EditedContent::PollStart {
2009                                fallback_text: poll_start.question.text.clone(),
2010                                new_content: poll_start,
2011                            }
2012                        }
2013
2014                        Ok(c) => {
2015                            warn!("Unsupported edit content type: {:?}", c.event_type());
2016                            return Ok(true);
2017                        }
2018
2019                        Err(err) => {
2020                            warn!("Unable to deserialize: {err}");
2021                            return Ok(true);
2022                        }
2023                    };
2024
2025                    let edit_event = match room.make_edit_event(&event_id, edited_content).await {
2026                        Ok(e) => e,
2027                        Err(err) => {
2028                            warn!("couldn't create edited event: {err}");
2029                            return Ok(true);
2030                        }
2031                    };
2032
2033                    // Queue the edit event in the send queue 🧠.
2034                    let serializable = SerializableEventContent::from_raw(
2035                        Raw::new(&edit_event)
2036                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2037                        edit_event.event_type().to_string(),
2038                    );
2039
2040                    store
2041                        .save_send_queue_request(
2042                            &self.room_id,
2043                            dependent_request.own_transaction_id.into(),
2044                            dependent_request.created_at,
2045                            serializable.into(),
2046                            Self::HIGH_PRIORITY,
2047                        )
2048                        .await
2049                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2050                } else {
2051                    // The parent event is still local; update the local echo.
2052                    let edited = store
2053                        .update_send_queue_request(
2054                            &self.room_id,
2055                            &dependent_request.parent_transaction_id,
2056                            new_content.into(),
2057                        )
2058                        .await
2059                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2060
2061                    if !edited {
2062                        warn!("missing local echo upon dependent edit");
2063                    }
2064                }
2065            }
2066
2067            DependentQueuedRequestKind::RedactEvent => {
2068                if let Some(parent_key) = parent_key {
2069                    let Some(event_id) = parent_key.into_event_id() else {
2070                        return Err(RoomSendQueueError::StorageError(
2071                            RoomSendQueueStorageError::InvalidParentKey,
2072                        ));
2073                    };
2074
2075                    // The parent event has been sent; send a redaction.
2076                    let room = client
2077                        .get_room(&self.room_id)
2078                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
2079
2080                    // Ideally we'd use the send queue to send the redaction, but the protocol has
2081                    // changed the shape of a room.redaction after v11, so keep it simple and try
2082                    // once here.
2083
2084                    // Note: no reason is provided because we materialize the intent of "cancel
2085                    // sending the parent event".
2086
2087                    if let Err(err) = room
2088                        .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
2089                        .await
2090                    {
2091                        warn!("error when sending a redact for {event_id}: {err}");
2092                        return Ok(false);
2093                    }
2094                } else {
2095                    // The parent event is still local (sending must have failed); redact the local
2096                    // echo.
2097                    let removed = store
2098                        .remove_send_queue_request(
2099                            &self.room_id,
2100                            &dependent_request.parent_transaction_id,
2101                        )
2102                        .await
2103                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2104
2105                    if !removed {
2106                        warn!("missing local echo upon dependent redact");
2107                    }
2108                }
2109            }
2110
2111            DependentQueuedRequestKind::ReactEvent { key } => {
2112                if let Some(parent_key) = parent_key {
2113                    let Some(parent_event_id) = parent_key.into_event_id() else {
2114                        return Err(RoomSendQueueError::StorageError(
2115                            RoomSendQueueStorageError::InvalidParentKey,
2116                        ));
2117                    };
2118
2119                    // Queue the reaction event in the send queue 🧠.
2120                    let react_event =
2121                        ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
2122                    let serializable = SerializableEventContent::from_raw(
2123                        Raw::new(&react_event)
2124                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2125                        react_event.event_type().to_string(),
2126                    );
2127
2128                    store
2129                        .save_send_queue_request(
2130                            &self.room_id,
2131                            dependent_request.own_transaction_id.into(),
2132                            dependent_request.created_at,
2133                            serializable.into(),
2134                            Self::HIGH_PRIORITY,
2135                        )
2136                        .await
2137                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2138                } else {
2139                    // Not applied yet, we should retry later => false.
2140                    return Ok(false);
2141                }
2142            }
2143
2144            DependentQueuedRequestKind::UploadFileOrThumbnail {
2145                content_type,
2146                cache_key,
2147                related_to,
2148                parent_is_thumbnail_upload,
2149            } => {
2150                let Some(parent_key) = parent_key else {
2151                    // Not finished yet, we should retry later => false.
2152                    return Ok(false);
2153                };
2154                self.handle_dependent_file_or_thumbnail_upload(
2155                    client,
2156                    dependent_request.own_transaction_id.into(),
2157                    parent_key,
2158                    content_type,
2159                    cache_key,
2160                    related_to,
2161                    parent_is_thumbnail_upload,
2162                )
2163                .await?;
2164            }
2165
2166            DependentQueuedRequestKind::FinishUpload {
2167                local_echo,
2168                file_upload,
2169                thumbnail_info,
2170            } => {
2171                let Some(parent_key) = parent_key else {
2172                    // Not finished yet, we should retry later => false.
2173                    return Ok(false);
2174                };
2175                self.handle_dependent_finish_upload(
2176                    client,
2177                    dependent_request.own_transaction_id.into(),
2178                    parent_key,
2179                    *local_echo,
2180                    file_upload,
2181                    thumbnail_info,
2182                    new_updates,
2183                )
2184                .await?;
2185            }
2186
2187            #[cfg(feature = "unstable-msc4274")]
2188            DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2189                let Some(parent_key) = parent_key else {
2190                    // Not finished yet, we should retry later => false.
2191                    return Ok(false);
2192                };
2193                self.handle_dependent_finish_gallery_upload(
2194                    client,
2195                    dependent_request.own_transaction_id.into(),
2196                    parent_key,
2197                    *local_echo,
2198                    item_infos,
2199                    new_updates,
2200                )
2201                .await?;
2202            }
2203        }
2204
2205        Ok(true)
2206    }
2207
2208    #[instrument(skip(self))]
2209    async fn apply_dependent_requests(
2210        &self,
2211        new_updates: &mut Vec<RoomSendQueueUpdate>,
2212    ) -> Result<(), RoomSendQueueError> {
2213        let guard = self.store.lock().await;
2214
2215        let client = guard.client()?;
2216        let store = client.state_store();
2217
2218        let dependent_requests = store
2219            .load_dependent_queued_requests(&self.room_id)
2220            .await
2221            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2222
2223        let num_initial_dependent_requests = dependent_requests.len();
2224        if num_initial_dependent_requests == 0 {
2225            // Returning early here avoids a bit of useless logging.
2226            return Ok(());
2227        }
2228
2229        let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2230
2231        // Get rid of the all non-canonical dependent events.
2232        for original in &dependent_requests {
2233            if !canonicalized_dependent_requests
2234                .iter()
2235                .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2236            {
2237                store
2238                    .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2239                    .await
2240                    .map_err(RoomSendQueueStorageError::StateStoreError)?;
2241            }
2242        }
2243
2244        let mut num_dependent_requests = canonicalized_dependent_requests.len();
2245
2246        debug!(
2247            num_dependent_requests,
2248            num_initial_dependent_requests, "starting handling of dependent requests"
2249        );
2250
2251        for dependent in canonicalized_dependent_requests {
2252            let dependent_id = dependent.own_transaction_id.clone();
2253
2254            match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2255                Ok(should_remove) => {
2256                    if should_remove {
2257                        // The dependent request has been successfully applied, forget about it.
2258                        store
2259                            .remove_dependent_queued_request(&self.room_id, &dependent_id)
2260                            .await
2261                            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2262
2263                        num_dependent_requests -= 1;
2264                    }
2265                }
2266
2267                Err(err) => {
2268                    warn!("error when applying single dependent request: {err}");
2269                }
2270            }
2271        }
2272
2273        debug!(
2274            leftover_dependent_requests = num_dependent_requests,
2275            "stopped handling dependent request"
2276        );
2277
2278        Ok(())
2279    }
2280
2281    /// Remove a single dependent request from storage.
2282    async fn remove_dependent_send_queue_request(
2283        &self,
2284        dependent_event_id: &ChildTransactionId,
2285    ) -> Result<bool, RoomSendQueueStorageError> {
2286        Ok(self
2287            .store
2288            .lock()
2289            .await
2290            .client()?
2291            .state_store()
2292            .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2293            .await?)
2294    }
2295}
2296
2297#[cfg(feature = "unstable-msc4274")]
2298/// Metadata needed for pushing gallery item uploads onto the send queue.
2299struct GalleryItemQueueInfo {
2300    content_type: Mime,
2301    upload_file_txn: OwnedTransactionId,
2302    file_media_request: MediaRequestParameters,
2303    thumbnail: Option<QueueThumbnailInfo>,
2304}
2305
2306/// The content of a local echo.
2307#[derive(Clone, Debug)]
2308pub enum LocalEchoContent {
2309    /// The local echo contains an actual event ready to display.
2310    Event {
2311        /// Content of the event itself (along with its type) that we are about
2312        /// to send.
2313        serialized_event: SerializableEventContent,
2314        /// A handle to manipulate the sending of the associated event.
2315        send_handle: SendHandle,
2316        /// Whether trying to send this local echo failed in the past with an
2317        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
2318        send_error: Option<QueueWedgeError>,
2319    },
2320
2321    /// A local echo has been reacted to.
2322    React {
2323        /// The key with which the local echo has been reacted to.
2324        key: String,
2325        /// A handle to manipulate the sending of the reaction.
2326        send_handle: SendReactionHandle,
2327        /// The local echo which has been reacted to.
2328        applies_to: OwnedTransactionId,
2329    },
2330}
2331
2332/// A local representation for a request that hasn't been sent yet to the user's
2333/// homeserver.
2334#[derive(Clone, Debug)]
2335pub struct LocalEcho {
2336    /// Transaction id used to identify the associated request.
2337    pub transaction_id: OwnedTransactionId,
2338    /// The content for the local echo.
2339    pub content: LocalEchoContent,
2340}
2341
2342/// An update to a room send queue, observable with
2343/// [`RoomSendQueue::subscribe`].
2344#[derive(Clone, Debug)]
2345pub enum RoomSendQueueUpdate {
2346    /// A new local event is being sent.
2347    ///
2348    /// There's been a user query to create this event. It is being sent to the
2349    /// server.
2350    NewLocalEvent(LocalEcho),
2351
2352    /// A local event that hadn't been sent to the server yet has been cancelled
2353    /// before sending.
2354    CancelledLocalEvent {
2355        /// Transaction id used to identify this event.
2356        transaction_id: OwnedTransactionId,
2357    },
2358
2359    /// A local event's content has been replaced with something else.
2360    ReplacedLocalEvent {
2361        /// Transaction id used to identify this event.
2362        transaction_id: OwnedTransactionId,
2363
2364        /// The new content replacing the previous one.
2365        new_content: SerializableEventContent,
2366    },
2367
2368    /// An error happened when an event was being sent.
2369    ///
2370    /// The event has not been removed from the queue. All the send queues
2371    /// will be disabled after this happens, and must be manually re-enabled.
2372    SendError {
2373        /// Transaction id used to identify this event.
2374        transaction_id: OwnedTransactionId,
2375        /// Error received while sending the event.
2376        error: Arc<crate::Error>,
2377        /// Whether the error is considered recoverable or not.
2378        ///
2379        /// An error that's recoverable will disable the room's send queue,
2380        /// while an unrecoverable error will be parked, until the user
2381        /// decides to cancel sending it.
2382        is_recoverable: bool,
2383    },
2384
2385    /// The event has been unwedged and sending is now being retried.
2386    RetryEvent {
2387        /// Transaction id used to identify this event.
2388        transaction_id: OwnedTransactionId,
2389    },
2390
2391    /// The event has been sent to the server, and the query returned
2392    /// successfully.
2393    SentEvent {
2394        /// Transaction id used to identify this event.
2395        transaction_id: OwnedTransactionId,
2396        /// Received event id from the send response.
2397        event_id: OwnedEventId,
2398    },
2399
2400    /// A media upload (consisting of a file and possibly a thumbnail) has made
2401    /// progress.
2402    MediaUpload {
2403        /// The media event this uploaded media relates to.
2404        related_to: OwnedTransactionId,
2405
2406        /// The final media source for the file if it has finished uploading.
2407        file: Option<MediaSource>,
2408
2409        /// The index of the media within the transaction. A file and its
2410        /// thumbnail share the same index. Will always be 0 for non-gallery
2411        /// media uploads.
2412        index: u64,
2413
2414        /// The combined upload progress across the file and, if existing, its
2415        /// thumbnail. For gallery uploads, the progress is reported per indexed
2416        /// gallery item.
2417        progress: AbstractProgress,
2418    },
2419}
2420
2421/// A [`RoomSendQueueUpdate`] with an associated [`OwnedRoomId`].
2422///
2423/// This is used by [`SendQueue::subscribe`] to get a single channel to receive
2424/// updates for all [`RoomSendQueue`]s.
2425#[derive(Clone, Debug)]
2426pub struct SendQueueUpdate {
2427    /// The room where the update happened.
2428    pub room_id: OwnedRoomId,
2429
2430    /// The update for this room.
2431    pub update: RoomSendQueueUpdate,
2432}
2433
2434/// An error triggered by the send queue module.
2435#[derive(Debug, thiserror::Error)]
2436pub enum RoomSendQueueError {
2437    /// The room isn't in the joined state.
2438    #[error("the room isn't in the joined state")]
2439    RoomNotJoined,
2440
2441    /// The room is missing from the client.
2442    ///
2443    /// This happens only whenever the client is shutting down.
2444    #[error("the room is now missing from the client")]
2445    RoomDisappeared,
2446
2447    /// Error coming from storage.
2448    #[error(transparent)]
2449    StorageError(#[from] RoomSendQueueStorageError),
2450
2451    /// The attachment event failed to be created.
2452    #[error("the attachment event could not be created")]
2453    FailedToCreateAttachment,
2454
2455    /// The gallery contains no items.
2456    #[cfg(feature = "unstable-msc4274")]
2457    #[error("the gallery contains no items")]
2458    EmptyGallery,
2459
2460    /// The gallery event failed to be created.
2461    #[cfg(feature = "unstable-msc4274")]
2462    #[error("the gallery event could not be created")]
2463    FailedToCreateGallery,
2464}
2465
2466/// An error triggered by the send queue storage.
2467#[derive(Debug, thiserror::Error)]
2468pub enum RoomSendQueueStorageError {
2469    /// Error caused by the state store.
2470    #[error(transparent)]
2471    StateStoreError(#[from] StoreError),
2472
2473    /// Error caused by the event cache store.
2474    #[error(transparent)]
2475    EventCacheStoreError(#[from] EventCacheStoreError),
2476
2477    /// Error caused by the event cache store.
2478    #[error(transparent)]
2479    MediaStoreError(#[from] MediaStoreError),
2480
2481    /// Error caused when attempting to get a handle on the event cache store.
2482    #[error(transparent)]
2483    LockError(#[from] CrossProcessLockError),
2484
2485    /// Error caused when (de)serializing into/from json.
2486    #[error(transparent)]
2487    JsonSerialization(#[from] serde_json::Error),
2488
2489    /// A parent key was expected to be of a certain type, and it was another
2490    /// type instead.
2491    #[error("a dependent event had an invalid parent key type")]
2492    InvalidParentKey,
2493
2494    /// The client is shutting down.
2495    #[error("The client is shutting down.")]
2496    ClientShuttingDown,
2497
2498    /// An operation not implemented on a send handle.
2499    #[error("This operation is not implemented for media uploads")]
2500    OperationNotImplementedYet,
2501
2502    /// Trying to edit a media caption for something that's not a media.
2503    #[error("Can't edit a media caption when the underlying event isn't a media")]
2504    InvalidMediaCaptionEdit,
2505}
2506
2507/// Extra transaction IDs useful during an upload.
2508#[derive(Clone, Debug)]
2509struct MediaHandles {
2510    /// Transaction id used when uploading the thumbnail.
2511    ///
2512    /// Optional because a media can be uploaded without a thumbnail.
2513    upload_thumbnail_txn: Option<OwnedTransactionId>,
2514
2515    /// Transaction id used when uploading the media itself.
2516    upload_file_txn: OwnedTransactionId,
2517}
2518
2519/// A handle to manipulate an event that was scheduled to be sent to a room.
2520#[derive(Clone, Debug)]
2521pub struct SendHandle {
2522    /// Link to the send queue used to send this request.
2523    room: RoomSendQueue,
2524
2525    /// Transaction id used for the sent request.
2526    ///
2527    /// If this is a media upload, this is the "main" transaction id, i.e. the
2528    /// one used to send the event, and that will be seen by observers.
2529    transaction_id: OwnedTransactionId,
2530
2531    /// Additional handles for a media upload.
2532    media_handles: Vec<MediaHandles>,
2533
2534    /// The time at which the event to be sent has been created.
2535    pub created_at: MilliSecondsSinceUnixEpoch,
2536}
2537
2538impl SendHandle {
2539    /// Creates a new [`SendHandle`].
2540    #[cfg(test)]
2541    pub(crate) fn new(
2542        room: RoomSendQueue,
2543        transaction_id: OwnedTransactionId,
2544        created_at: MilliSecondsSinceUnixEpoch,
2545    ) -> Self {
2546        Self { room, transaction_id, media_handles: vec![], created_at }
2547    }
2548
2549    fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2550        if !self.media_handles.is_empty() {
2551            Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2552        } else {
2553            Ok(())
2554        }
2555    }
2556
2557    /// Aborts the sending of the event, if it wasn't sent yet.
2558    ///
2559    /// Returns true if the sending could be aborted, false if not (i.e. the
2560    /// event had already been sent).
2561    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2562    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2563        trace!("received an abort request");
2564
2565        let queue = &self.room.inner.queue;
2566
2567        for handles in &self.media_handles {
2568            if queue.abort_upload(&self.transaction_id, handles).await? {
2569                // Propagate a cancelled update.
2570                self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2571                    transaction_id: self.transaction_id.clone(),
2572                });
2573
2574                return Ok(true);
2575            }
2576
2577            // If it failed, it means the sending of the event is not a
2578            // dependent request anymore. Fall back to the regular
2579            // code path below, that handles aborting sending of an event.
2580        }
2581
2582        if queue.cancel_event(&self.transaction_id).await? {
2583            trace!("successful abort");
2584
2585            // Propagate a cancelled update too.
2586            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2587                transaction_id: self.transaction_id.clone(),
2588            });
2589
2590            Ok(true)
2591        } else {
2592            debug!("local echo didn't exist anymore, can't abort");
2593            Ok(false)
2594        }
2595    }
2596
2597    /// Edits the content of a local echo with a raw event content.
2598    ///
2599    /// Returns true if the event to be sent was replaced, false if not (i.e.
2600    /// the event had already been sent).
2601    #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2602    pub async fn edit_raw(
2603        &self,
2604        new_content: Raw<AnyMessageLikeEventContent>,
2605        event_type: String,
2606    ) -> Result<bool, RoomSendQueueStorageError> {
2607        trace!("received an edit request");
2608        self.nyi_for_uploads()?;
2609
2610        let serializable = SerializableEventContent::from_raw(new_content, event_type);
2611
2612        if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2613            trace!("successful edit");
2614
2615            // Wake up the queue, in case the room was asleep before the edit.
2616            self.room.inner.notifier.notify_one();
2617
2618            // Propagate a replaced update too.
2619            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2620                transaction_id: self.transaction_id.clone(),
2621                new_content: serializable,
2622            });
2623
2624            Ok(true)
2625        } else {
2626            debug!("local echo doesn't exist anymore, can't edit");
2627            Ok(false)
2628        }
2629    }
2630
2631    /// Edits the content of a local echo with an event content.
2632    ///
2633    /// Returns true if the event to be sent was replaced, false if not (i.e.
2634    /// the event had already been sent).
2635    pub async fn edit(
2636        &self,
2637        new_content: AnyMessageLikeEventContent,
2638    ) -> Result<bool, RoomSendQueueStorageError> {
2639        self.edit_raw(
2640            Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2641            new_content.event_type().to_string(),
2642        )
2643        .await
2644    }
2645
2646    /// Edits the content of a local echo with a media caption.
2647    ///
2648    /// Will fail if the event to be sent, represented by this send handle,
2649    /// wasn't a media.
2650    pub async fn edit_media_caption(
2651        &self,
2652        caption: Option<String>,
2653        formatted_caption: Option<FormattedBody>,
2654        mentions: Option<Mentions>,
2655    ) -> Result<bool, RoomSendQueueStorageError> {
2656        if let Some(new_content) = self
2657            .room
2658            .inner
2659            .queue
2660            .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2661            .await?
2662        {
2663            trace!("successful edit of media caption");
2664
2665            // Wake up the queue, in case the room was asleep before the edit.
2666            self.room.inner.notifier.notify_one();
2667
2668            let new_content = SerializableEventContent::new(&new_content)
2669                .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2670
2671            // Propagate a replaced update too.
2672            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2673                transaction_id: self.transaction_id.clone(),
2674                new_content,
2675            });
2676
2677            Ok(true)
2678        } else {
2679            debug!("local echo doesn't exist anymore, can't edit media caption");
2680            Ok(false)
2681        }
2682    }
2683
2684    /// Unwedge a local echo identified by its transaction identifier and try to
2685    /// resend it.
2686    pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2687        let room = &self.room.inner;
2688        room.queue
2689            .mark_as_unwedged(&self.transaction_id)
2690            .await
2691            .map_err(RoomSendQueueError::StorageError)?;
2692
2693        // If we have media handles, also try to unwedge them.
2694        //
2695        // It's fine to always do it to *all* the transaction IDs at once, because only
2696        // one of the three requests will be active at the same time, i.e. only
2697        // one entry will be updated in the store. The other two are either
2698        // done, or dependent requests.
2699
2700        for handles in &self.media_handles {
2701            room.queue
2702                .mark_as_unwedged(&handles.upload_file_txn)
2703                .await
2704                .map_err(RoomSendQueueError::StorageError)?;
2705
2706            if let Some(txn) = &handles.upload_thumbnail_txn {
2707                room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2708            }
2709        }
2710
2711        // Wake up the queue, in case the room was asleep before unwedging the request.
2712        room.notifier.notify_one();
2713
2714        self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2715            transaction_id: self.transaction_id.clone(),
2716        });
2717
2718        Ok(())
2719    }
2720
2721    /// Send a reaction to the event as soon as it's sent.
2722    ///
2723    /// If returning `Ok(None)`; this means the reaction couldn't be sent
2724    /// because the event is already a remote one.
2725    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2726    pub async fn react(
2727        &self,
2728        key: String,
2729    ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2730        trace!("received an intent to react");
2731
2732        let created_at = MilliSecondsSinceUnixEpoch::now();
2733        if let Some(reaction_txn_id) =
2734            self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2735        {
2736            trace!("successfully queued react");
2737
2738            // Wake up the queue, in case the room was asleep before the sending.
2739            self.room.inner.notifier.notify_one();
2740
2741            // Propagate a new local event.
2742            let send_handle = SendReactionHandle {
2743                room: self.room.clone(),
2744                transaction_id: reaction_txn_id.clone(),
2745            };
2746
2747            self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2748                // Note: we do want to use the `txn_id` we're going to use for the reaction, not
2749                // the one for the event we're reacting to.
2750                transaction_id: reaction_txn_id.into(),
2751                content: LocalEchoContent::React {
2752                    key,
2753                    send_handle: send_handle.clone(),
2754                    applies_to: self.transaction_id.clone(),
2755                },
2756            }));
2757
2758            Ok(Some(send_handle))
2759        } else {
2760            debug!("local echo doesn't exist anymore, can't react");
2761            Ok(None)
2762        }
2763    }
2764}
2765
2766/// A handle to execute actions on the sending of a reaction.
2767#[derive(Clone, Debug)]
2768pub struct SendReactionHandle {
2769    /// Reference to the send queue for the room where this reaction was sent.
2770    room: RoomSendQueue,
2771    /// The own transaction id for the reaction.
2772    transaction_id: ChildTransactionId,
2773}
2774
2775impl SendReactionHandle {
2776    /// Creates a new [`SendReactionHandle`].
2777    #[cfg(test)]
2778    pub(crate) fn new(room: RoomSendQueue, transaction_id: ChildTransactionId) -> Self {
2779        Self { room, transaction_id }
2780    }
2781
2782    /// Abort the sending of the reaction.
2783    ///
2784    /// Will return true if the reaction could be aborted, false if it's been
2785    /// sent (and there's no matching local echo anymore).
2786    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2787        if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2788            // Simple case: the reaction was found in the dependent event list.
2789
2790            // Propagate a cancelled update too.
2791            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2792                transaction_id: self.transaction_id.clone().into(),
2793            });
2794
2795            return Ok(true);
2796        }
2797
2798        // The reaction has already been queued for sending, try to abort it using a
2799        // regular abort.
2800        let handle = SendHandle {
2801            room: self.room.clone(),
2802            transaction_id: self.transaction_id.clone().into(),
2803            media_handles: vec![],
2804            created_at: MilliSecondsSinceUnixEpoch::now(),
2805        };
2806
2807        handle.abort().await
2808    }
2809
2810    /// The transaction id that will be used to send this reaction later.
2811    pub fn transaction_id(&self) -> &TransactionId {
2812        &self.transaction_id
2813    }
2814}
2815
2816/// From a given source of [`DependentQueuedRequest`], return only the most
2817/// meaningful, i.e. the ones that wouldn't be overridden after applying the
2818/// others.
2819fn canonicalize_dependent_requests(
2820    dependent: &[DependentQueuedRequest],
2821) -> Vec<DependentQueuedRequest> {
2822    let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2823
2824    for d in dependent {
2825        let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2826
2827        if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2828            // The parent event has already been flagged for redaction, don't consider the
2829            // other dependent events.
2830            continue;
2831        }
2832
2833        match &d.kind {
2834            DependentQueuedRequestKind::EditEvent { .. } => {
2835                // Replace any previous edit with this one.
2836                if let Some(prev_edit) = prevs
2837                    .iter_mut()
2838                    .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2839                {
2840                    *prev_edit = d;
2841                } else {
2842                    prevs.insert(0, d);
2843                }
2844            }
2845
2846            DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
2847            | DependentQueuedRequestKind::FinishUpload { .. }
2848            | DependentQueuedRequestKind::ReactEvent { .. } => {
2849                // These requests can't be canonicalized, push them as is.
2850                prevs.push(d);
2851            }
2852
2853            #[cfg(feature = "unstable-msc4274")]
2854            DependentQueuedRequestKind::FinishGallery { .. } => {
2855                // This request can't be canonicalized, push it as is.
2856                prevs.push(d);
2857            }
2858
2859            DependentQueuedRequestKind::RedactEvent => {
2860                // Remove every other dependent action.
2861                prevs.clear();
2862                prevs.push(d);
2863            }
2864        }
2865    }
2866
2867    by_txn.into_values().flat_map(|entries| entries.into_iter().cloned()).collect()
2868}
2869
2870#[cfg(all(test, not(target_family = "wasm")))]
2871mod tests {
2872    use std::{sync::Arc, time::Duration};
2873
2874    use assert_matches2::{assert_let, assert_matches};
2875    use matrix_sdk_base::store::{
2876        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2877        SerializableEventContent,
2878    };
2879    use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
2880    use ruma::{
2881        MilliSecondsSinceUnixEpoch, TransactionId,
2882        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
2883        room_id,
2884    };
2885
2886    use super::canonicalize_dependent_requests;
2887    use crate::{client::WeakClient, test_utils::logged_in_client};
2888
2889    #[test]
2890    fn test_canonicalize_dependent_events_created_at() {
2891        // Test to ensure the created_at field is being serialized and retrieved
2892        // correctly.
2893        let txn = TransactionId::new();
2894        let created_at = MilliSecondsSinceUnixEpoch::now();
2895
2896        let edit = DependentQueuedRequest {
2897            own_transaction_id: ChildTransactionId::new(),
2898            parent_transaction_id: txn.clone(),
2899            kind: DependentQueuedRequestKind::EditEvent {
2900                new_content: SerializableEventContent::new(
2901                    &RoomMessageEventContent::text_plain("edit").into(),
2902                )
2903                .unwrap(),
2904            },
2905            parent_key: None,
2906            created_at,
2907        };
2908
2909        let res = canonicalize_dependent_requests(&[edit]);
2910
2911        assert_eq!(res.len(), 1);
2912        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2913        assert_let!(
2914            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2915        );
2916        assert_eq!(msg.body(), "edit");
2917        assert_eq!(res[0].parent_transaction_id, txn);
2918        assert_eq!(res[0].created_at, created_at);
2919    }
2920
2921    #[async_test]
2922    async fn test_client_no_cycle_with_send_queue() {
2923        for enabled in [true, false] {
2924            let client = logged_in_client(None).await;
2925            let weak_client = WeakClient::from_client(&client);
2926
2927            {
2928                let mut sync_response_builder = SyncResponseBuilder::new();
2929
2930                let room_id = room_id!("!a:b.c");
2931
2932                // Make sure the client knows about the room.
2933                client
2934                    .base_client()
2935                    .receive_sync_response(
2936                        sync_response_builder
2937                            .add_joined_room(JoinedRoomBuilder::new(room_id))
2938                            .build_sync_response(),
2939                    )
2940                    .await
2941                    .unwrap();
2942
2943                let room = client.get_room(room_id).unwrap();
2944                let q = room.send_queue();
2945
2946                let _watcher = q.subscribe().await;
2947
2948                client.send_queue().set_enabled(enabled).await;
2949            }
2950
2951            drop(client);
2952
2953            // Give a bit of time for background tasks to die.
2954            tokio::time::sleep(Duration::from_millis(500)).await;
2955
2956            // The weak client must be the last reference to the client now.
2957            let client = weak_client.get();
2958            assert!(
2959                client.is_none(),
2960                "too many strong references to the client: {}",
2961                Arc::strong_count(&client.unwrap().inner)
2962            );
2963        }
2964    }
2965
2966    #[test]
2967    fn test_canonicalize_dependent_events_smoke_test() {
2968        // Smoke test: canonicalizing a single dependent event returns it.
2969        let txn = TransactionId::new();
2970
2971        let edit = DependentQueuedRequest {
2972            own_transaction_id: ChildTransactionId::new(),
2973            parent_transaction_id: txn.clone(),
2974            kind: DependentQueuedRequestKind::EditEvent {
2975                new_content: SerializableEventContent::new(
2976                    &RoomMessageEventContent::text_plain("edit").into(),
2977                )
2978                .unwrap(),
2979            },
2980            parent_key: None,
2981            created_at: MilliSecondsSinceUnixEpoch::now(),
2982        };
2983        let res = canonicalize_dependent_requests(&[edit]);
2984
2985        assert_eq!(res.len(), 1);
2986        assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2987        assert_eq!(res[0].parent_transaction_id, txn);
2988        assert!(res[0].parent_key.is_none());
2989    }
2990
2991    #[test]
2992    fn test_canonicalize_dependent_events_redaction_preferred() {
2993        // A redaction is preferred over any other kind of dependent event.
2994        let txn = TransactionId::new();
2995
2996        let mut inputs = Vec::with_capacity(100);
2997        let redact = DependentQueuedRequest {
2998            own_transaction_id: ChildTransactionId::new(),
2999            parent_transaction_id: txn.clone(),
3000            kind: DependentQueuedRequestKind::RedactEvent,
3001            parent_key: None,
3002            created_at: MilliSecondsSinceUnixEpoch::now(),
3003        };
3004
3005        let edit = DependentQueuedRequest {
3006            own_transaction_id: ChildTransactionId::new(),
3007            parent_transaction_id: txn.clone(),
3008            kind: DependentQueuedRequestKind::EditEvent {
3009                new_content: SerializableEventContent::new(
3010                    &RoomMessageEventContent::text_plain("edit").into(),
3011                )
3012                .unwrap(),
3013            },
3014            parent_key: None,
3015            created_at: MilliSecondsSinceUnixEpoch::now(),
3016        };
3017
3018        inputs.push({
3019            let mut edit = edit.clone();
3020            edit.own_transaction_id = ChildTransactionId::new();
3021            edit
3022        });
3023
3024        inputs.push(redact);
3025
3026        for _ in 0..98 {
3027            let mut edit = edit.clone();
3028            edit.own_transaction_id = ChildTransactionId::new();
3029            inputs.push(edit);
3030        }
3031
3032        let res = canonicalize_dependent_requests(&inputs);
3033
3034        assert_eq!(res.len(), 1);
3035        assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
3036        assert_eq!(res[0].parent_transaction_id, txn);
3037    }
3038
3039    #[test]
3040    fn test_canonicalize_dependent_events_last_edit_preferred() {
3041        let parent_txn = TransactionId::new();
3042
3043        // The latest edit of a list is always preferred.
3044        let inputs = (0..10)
3045            .map(|i| DependentQueuedRequest {
3046                own_transaction_id: ChildTransactionId::new(),
3047                parent_transaction_id: parent_txn.clone(),
3048                kind: DependentQueuedRequestKind::EditEvent {
3049                    new_content: SerializableEventContent::new(
3050                        &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
3051                    )
3052                    .unwrap(),
3053                },
3054                parent_key: None,
3055                created_at: MilliSecondsSinceUnixEpoch::now(),
3056            })
3057            .collect::<Vec<_>>();
3058
3059        let txn = inputs[9].parent_transaction_id.clone();
3060
3061        let res = canonicalize_dependent_requests(&inputs);
3062
3063        assert_eq!(res.len(), 1);
3064        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
3065        assert_let!(
3066            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
3067        );
3068        assert_eq!(msg.body(), "edit9");
3069        assert_eq!(res[0].parent_transaction_id, txn);
3070    }
3071
3072    #[test]
3073    fn test_canonicalize_multiple_local_echoes() {
3074        let txn1 = TransactionId::new();
3075        let txn2 = TransactionId::new();
3076
3077        let child1 = ChildTransactionId::new();
3078        let child2 = ChildTransactionId::new();
3079
3080        let inputs = vec![
3081            // This one pertains to txn1.
3082            DependentQueuedRequest {
3083                own_transaction_id: child1.clone(),
3084                kind: DependentQueuedRequestKind::RedactEvent,
3085                parent_transaction_id: txn1.clone(),
3086                parent_key: None,
3087                created_at: MilliSecondsSinceUnixEpoch::now(),
3088            },
3089            // This one pertains to txn2.
3090            DependentQueuedRequest {
3091                own_transaction_id: child2,
3092                kind: DependentQueuedRequestKind::EditEvent {
3093                    new_content: SerializableEventContent::new(
3094                        &RoomMessageEventContent::text_plain("edit").into(),
3095                    )
3096                    .unwrap(),
3097                },
3098                parent_transaction_id: txn2.clone(),
3099                parent_key: None,
3100                created_at: MilliSecondsSinceUnixEpoch::now(),
3101            },
3102        ];
3103
3104        let res = canonicalize_dependent_requests(&inputs);
3105
3106        // The canonicalization shouldn't depend per event id.
3107        assert_eq!(res.len(), 2);
3108
3109        for dependent in res {
3110            if dependent.own_transaction_id == child1 {
3111                assert_eq!(dependent.parent_transaction_id, txn1);
3112                assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
3113            } else {
3114                assert_eq!(dependent.parent_transaction_id, txn2);
3115                assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
3116            }
3117        }
3118    }
3119
3120    #[test]
3121    fn test_canonicalize_reactions_after_edits() {
3122        // Sending reactions should happen after edits to a given event.
3123        let txn = TransactionId::new();
3124
3125        let react_id = ChildTransactionId::new();
3126        let react = DependentQueuedRequest {
3127            own_transaction_id: react_id.clone(),
3128            kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
3129            parent_transaction_id: txn.clone(),
3130            parent_key: None,
3131            created_at: MilliSecondsSinceUnixEpoch::now(),
3132        };
3133
3134        let edit_id = ChildTransactionId::new();
3135        let edit = DependentQueuedRequest {
3136            own_transaction_id: edit_id.clone(),
3137            kind: DependentQueuedRequestKind::EditEvent {
3138                new_content: SerializableEventContent::new(
3139                    &RoomMessageEventContent::text_plain("edit").into(),
3140                )
3141                .unwrap(),
3142            },
3143            parent_transaction_id: txn,
3144            parent_key: None,
3145            created_at: MilliSecondsSinceUnixEpoch::now(),
3146        };
3147
3148        let res = canonicalize_dependent_requests(&[react, edit]);
3149
3150        assert_eq!(res.len(), 2);
3151        assert_eq!(res[0].own_transaction_id, edit_id);
3152        assert_eq!(res[1].own_transaction_id, react_id);
3153    }
3154}