Skip to main content

matrix_sdk/send_queue/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A send queue facility to serializing queuing and sending of messages.
16//!
17//! # [`Room`] send queue
18//!
19//! Each room gets its own [`RoomSendQueue`], that's available by calling
20//! [`Room::send_queue()`]. The first time this method is called, it will spawn
21//! a background task that's used to actually send events, in the order they
22//! were passed from calls to [`RoomSendQueue::send()`].
23//!
24//! This queue tries to simplify error management around sending events, using
25//! [`RoomSendQueue::send`] or [`RoomSendQueue::send_raw`]: by default, it will retry to send the
26//! same event a few times, before automatically disabling itself, and emitting
27//! a notification that can be listened to with the global send queue (see
28//! paragraph below) or using [`RoomSendQueue::subscribe()`].
29//!
30//! It is possible to control whether a single room is enabled using
31//! [`RoomSendQueue::set_enabled()`].
32//!
33//! # Global [`SendQueue`] object
34//!
35//! The [`Client::send_queue()`] method returns an API object allowing to
36//! control all the room send queues:
37//!
38//! - enable/disable them all at once with [`SendQueue::set_enabled()`].
39//! - get notifications about send errors with [`SendQueue::subscribe_errors`].
40//! - reload all unsent events that had been persisted in storage using
41//!   [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is
42//!   recommended to call this method during initialization of a client,
43//!   otherwise persisted unsent events will only be re-sent after the send
44//!   queue for the given room has been reopened for the first time.
45//!
46//! # Send handle
47//!
48//! Just after queuing a request to send something, a [`SendHandle`] is
49//! returned, allowing manipulating the inflight request.
50//!
51//! For a send handle for an event, it's possible to edit the event / abort
52//! sending it. If it was still in the queue (i.e. not sent yet, or not being
53//! sent), then such an action would happen locally (i.e. in the database).
54//! Otherwise, it is "too late": the background task may be sending
55//! the event already, or has sent it; in that case, the edit/aborting must
56//! happen as an actual event materializing this, on the server. To accomplish
57//! this, the send queue may send such an event, using the dependency system
58//! described below.
59//!
60//! # Dependency system
61//!
62//! The send queue includes a simple dependency system, where a
63//! [`QueuedRequest`] can have zero or more dependents in the form of
64//! [`DependentQueuedRequest`]. A dependent queued request can have at most one
65//! depended-upon (parent) queued request.
66//!
67//! This allows implementing deferred edits/redacts, as hinted to in the
68//! previous section.
69//!
70//! ## Media upload
71//!
72//! This dependency system also allows uploading medias, since the media's
73//! *content* must be uploaded before we send the media *event* that describes
74//! it.
75//!
76//! In the simplest case, that is, a media file and its event must be sent (i.e.
77//! no thumbnails):
78//!
79//! - The file's content is immediately cached in the
80//!   [`matrix_sdk_base::event_cache::store::EventCacheStore`], using an MXC ID
81//!   that is temporary and designates a local URI without any possible doubt.
82//! - An initial media event is created and uses this temporary MXC ID, and
83//!   propagated as a local echo for an event.
84//! - A [`QueuedRequest`] is pushed to upload the file's media
85//!   ([`QueuedRequestKind::MediaUpload`]).
86//! - A [`DependentQueuedRequest`] is pushed to finish the upload
87//!   ([`DependentQueuedRequestKind::FinishUpload`]).
88//!
89//! What is expected to happen, if all goes well, is the following:
90//!
91//! - the media is uploaded to the media homeserver, which returns the final MXC
92//!   ID.
93//! - when marking the upload request as sent, the MXC ID is injected (as a
94//!   [`matrix_sdk_base::store::SentRequestKey`]) into the dependent request
95//!   [`DependentQueuedRequestKind::FinishUpload`] created in the last step
96//!   above.
97//! - next time the send queue handles dependent queries, it'll see this one is
98//!   ready to be sent, and it will transform it into an event queued request
99//!   ([`QueuedRequestKind::Event`]), with the event created in the local echo
100//!   before, updated with the MXC ID returned from the server.
101//! - this updated local echo is also propagated as an edit of the local echo to
102//!   observers, who get the final version with the final MXC IDs at this point
103//!   too.
104//! - then the event is sent normally, as any event sent with the send queue.
105//!
106//! When there is a thumbnail, things behave similarly, with some tweaks:
107//!
108//! - the thumbnail's content is also stored into the cache store immediately,
109//! - the thumbnail is sent first as an [`QueuedRequestKind::MediaUpload`]
110//!   request,
111//! - the file upload is pushed as a dependent request of kind
112//!   [`DependentQueuedRequestKind::UploadFileOrThumbnail`] (this variant keeps
113//!   the file's key used to look it up in the cache store).
114//! - the media event is then sent as a dependent request as described in the
115//!   previous section.
116//!
117//! What's expected to happen is thus the following:
118//!
119//! - After the thumbnail has been uploaded, the dependent query will retrieve
120//!   the final MXC ID returned by the homeserver for the thumbnail, and store
121//!   it into the [`QueuedRequestKind::MediaUpload`]'s `thumbnail_source` field,
122//!   allowing to remember the thumbnail MXC ID when it's time to finish the
123//!   upload later.
124//! - The dependent request is morphed into another
125//!   [`QueuedRequestKind::MediaUpload`], for the file itself.
126//!
127//! The rest of the process is then similar to that of uploading a file without
128//! a thumbnail. The only difference is that there's a thumbnail source (MXC ID)
129//! remembered and fixed up into the media event, just before sending it.
130
131use std::{
132    collections::{BTreeMap, HashMap},
133    ops::Not,
134    str::FromStr as _,
135    sync::{
136        Arc, RwLock,
137        atomic::{AtomicBool, Ordering},
138    },
139};
140
141use eyeball::SharedObservable;
142#[cfg(feature = "e2e-encryption")]
143use matrix_sdk_base::crypto::{OlmError, SessionRecipientCollectionError};
144#[cfg(feature = "unstable-msc4274")]
145use matrix_sdk_base::store::FinishGalleryItemInfo;
146use matrix_sdk_base::{
147    RoomState, StoreError,
148    cross_process_lock::CrossProcessLockError,
149    deserialized_responses::{EncryptionInfo, TimelineEvent},
150    event_cache::store::EventCacheStoreError,
151    media::{MediaRequestParameters, store::MediaStoreError},
152    store::{
153        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
154        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
155        SentMediaInfo, SentRequestKey, SerializableEventContent,
156    },
157    task_monitor::BackgroundTaskHandle,
158};
159use matrix_sdk_common::locks::Mutex as SyncMutex;
160use mime::Mime;
161use ruma::{
162    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId,
163    TransactionId,
164    events::{
165        AnyMessageLikeEventContent, Mentions, MessageLikeEventContent as _, TimelineEventType,
166        reaction::ReactionEventContent,
167        relation::Annotation,
168        room::{
169            MediaSource,
170            message::{FormattedBody, RoomMessageEventContent},
171        },
172    },
173    serde::Raw,
174};
175use tokio::sync::{Mutex, Notify, OwnedMutexGuard, broadcast, oneshot};
176use tracing::{debug, error, info, instrument, trace, warn};
177
178use crate::{
179    Client, Media, Room, TransmissionProgress,
180    client::WeakClient,
181    config::RequestConfig,
182    error::RetryKind,
183    room::{WeakRoom, edit::EditedContent},
184};
185
186mod progress;
187mod upload;
188
189pub use progress::AbstractProgress;
190
191/// A client-wide send queue, for all the rooms known by a client.
192pub struct SendQueue {
193    client: Client,
194}
195
196#[cfg(not(tarpaulin_include))]
197impl std::fmt::Debug for SendQueue {
198    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
199        f.debug_struct("SendQueue").finish_non_exhaustive()
200    }
201}
202
203impl SendQueue {
204    pub(super) fn new(client: Client) -> Self {
205        Self { client }
206    }
207
208    /// Reload all the rooms which had unsent requests, and respawn tasks for
209    /// those rooms.
210    pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
211        if !self.is_enabled() {
212            return;
213        }
214
215        let room_ids =
216            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
217                |err| {
218                    warn!("error when loading rooms with unsent requests: {err}");
219                    Vec::new()
220                },
221            );
222
223        // Getting the [`RoomSendQueue`] is sufficient to spawn the task if needs be.
224        for room_id in room_ids {
225            if let Some(room) = self.client.get_room(&room_id) {
226                let _ = self.for_room(room);
227            }
228        }
229    }
230
231    /// Tiny helper to get the send queue's global context from the [`Client`].
232    #[inline(always)]
233    fn data(&self) -> &SendQueueData {
234        &self.client.inner.send_queue_data
235    }
236
237    /// Get or create a new send queue for a given room, and insert it into our
238    /// memoized rooms mapping.
239    pub(crate) fn for_room(&self, room: Room) -> RoomSendQueue {
240        let data = self.data();
241
242        let mut map = data.rooms.write().unwrap();
243
244        let room_id = room.room_id();
245        if let Some(room_q) = map.get(room_id).cloned() {
246            return room_q;
247        }
248
249        let owned_room_id = room_id.to_owned();
250        let room_q = RoomSendQueue::new(
251            self.is_enabled(),
252            data.global_update_sender.clone(),
253            data.error_sender.clone(),
254            data.is_dropping.clone(),
255            &self.client,
256            owned_room_id.clone(),
257            data.report_media_upload_progress.clone(),
258        );
259
260        map.insert(owned_room_id, room_q.clone());
261
262        room_q
263    }
264
265    /// Enable or disable the send queue for the entire client, i.e. all rooms.
266    ///
267    /// If we're disabling the queue, and requests were being sent, they're not
268    /// aborted, and will continue until a status resolves (error responses
269    /// will keep the events in the buffer of events to send later). The
270    /// disablement will happen before the next request is sent.
271    ///
272    /// This may wake up background tasks and resume sending of requests in the
273    /// background.
274    pub async fn set_enabled(&self, enabled: bool) {
275        debug!(?enabled, "setting global send queue enablement");
276
277        self.data().globally_enabled.store(enabled, Ordering::SeqCst);
278
279        // Wake up individual rooms we already know about.
280        for room in self.data().rooms.read().unwrap().values() {
281            room.set_enabled(enabled);
282        }
283
284        // Reload some extra rooms that might not have been awaken yet, but could have
285        // requests from previous sessions.
286        self.respawn_tasks_for_rooms_with_unsent_requests().await;
287    }
288
289    /// Returns whether the send queue is enabled, at a client-wide
290    /// granularity.
291    pub fn is_enabled(&self) -> bool {
292        self.data().globally_enabled.load(Ordering::SeqCst)
293    }
294
295    /// Enable or disable progress reporting for media uploads.
296    pub fn enable_upload_progress(&self, enabled: bool) {
297        self.data().report_media_upload_progress.store(enabled, Ordering::SeqCst);
298    }
299
300    /// Subscribe to all updates for all rooms.
301    ///
302    /// Use [`RoomSendQueue::subscribe`] to subscribe to update for a _specific
303    /// room_.
304    pub fn subscribe(&self) -> broadcast::Receiver<SendQueueUpdate> {
305        self.data().global_update_sender.subscribe()
306    }
307
308    /// Get local echoes from all room send queues.
309    pub async fn local_echoes(
310        &self,
311    ) -> Result<BTreeMap<OwnedRoomId, Vec<LocalEcho>>, RoomSendQueueError> {
312        let room_ids =
313            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
314                |err| {
315                    warn!("error when loading rooms with unsent requests: {err}");
316                    Vec::new()
317                },
318            );
319
320        let mut local_echoes: BTreeMap<OwnedRoomId, Vec<LocalEcho>> = BTreeMap::new();
321
322        for room_id in room_ids {
323            if let Some(room) = self.client.get_room(&room_id) {
324                let queue = self.for_room(room);
325                local_echoes
326                    .insert(room_id.to_owned(), queue.inner.queue.local_echoes(&queue).await?);
327            }
328        }
329
330        Ok(local_echoes)
331    }
332
333    /// A subscriber to the enablement status (enabled or disabled) of the
334    /// send queue, along with useful errors.
335    pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
336        self.data().error_sender.subscribe()
337    }
338}
339
340/// Metadata about a thumbnail needed when pushing media uploads to the send
341/// queue.
342#[derive(Clone, Debug)]
343struct QueueThumbnailInfo {
344    /// Metadata about the thumbnail needed when finishing a media upload.
345    finish_upload_thumbnail_info: FinishUploadThumbnailInfo,
346
347    /// The parameters for the request to retrieve the thumbnail data.
348    media_request_parameters: MediaRequestParameters,
349
350    /// The thumbnail's mime type.
351    content_type: Mime,
352
353    /// The thumbnail's file size in bytes.
354    file_size: usize,
355}
356
357/// A specific room's send queue ran into an error, and it has disabled itself.
358#[derive(Clone, Debug)]
359pub struct SendQueueRoomError {
360    /// For which room is the send queue failing?
361    pub room_id: OwnedRoomId,
362
363    /// The error the room has ran into, when trying to send a request.
364    pub error: Arc<crate::Error>,
365
366    /// Whether the error is considered recoverable or not.
367    ///
368    /// An error that's recoverable will disable the room's send queue, while an
369    /// unrecoverable error will be parked, until the user decides to do
370    /// something about it.
371    pub is_recoverable: bool,
372}
373
374impl Client {
375    /// Returns a [`SendQueue`] that handles sending, retrying and not
376    /// forgetting about requests that are to be sent.
377    pub fn send_queue(&self) -> SendQueue {
378        SendQueue::new(self.clone())
379    }
380}
381
382pub(super) struct SendQueueData {
383    /// Mapping of room to their unique send queue.
384    rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
385
386    /// Is the whole mechanism enabled or disabled?
387    ///
388    /// This is only kept in memory to initialize new room queues with an
389    /// initial enablement state.
390    globally_enabled: AtomicBool,
391
392    /// Global sender to send [`SendQueueUpdate`].
393    ///
394    /// See [`SendQueue::subscribe`].
395    global_update_sender: broadcast::Sender<SendQueueUpdate>,
396
397    /// Global error updates for the send queue.
398    error_sender: broadcast::Sender<SendQueueRoomError>,
399
400    /// Are we currently dropping the Client?
401    is_dropping: Arc<AtomicBool>,
402
403    /// Will media upload progress be reported via send queue updates?
404    report_media_upload_progress: Arc<AtomicBool>,
405}
406
407impl SendQueueData {
408    /// Create the data for a send queue, in the given enabled state.
409    pub fn new(globally_enabled: bool) -> Self {
410        let (global_update_sender, _) = broadcast::channel(32);
411        let (error_sender, _) = broadcast::channel(32);
412
413        Self {
414            rooms: Default::default(),
415            globally_enabled: AtomicBool::new(globally_enabled),
416            global_update_sender,
417            error_sender,
418            is_dropping: Arc::new(false.into()),
419            report_media_upload_progress: Arc::new(false.into()),
420        }
421    }
422}
423
424impl Drop for SendQueueData {
425    fn drop(&mut self) {
426        // Mark the whole send queue as shutting down, then wake up all the room
427        // queues so they're stopped too.
428        debug!("globally dropping the send queue");
429        self.is_dropping.store(true, Ordering::SeqCst);
430
431        let rooms = self.rooms.read().unwrap();
432        for room in rooms.values() {
433            room.inner.notifier.notify_one();
434        }
435    }
436}
437
438impl Room {
439    /// Returns the [`RoomSendQueue`] for this specific room.
440    pub fn send_queue(&self) -> RoomSendQueue {
441        self.client.send_queue().for_room(self.clone())
442    }
443}
444
445/// A per-room send queue.
446///
447/// This is cheap to clone.
448#[derive(Clone)]
449pub struct RoomSendQueue {
450    inner: Arc<RoomSendQueueInner>,
451}
452
453#[cfg(not(tarpaulin_include))]
454impl std::fmt::Debug for RoomSendQueue {
455    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
456        f.debug_struct("RoomSendQueue").finish_non_exhaustive()
457    }
458}
459
460impl RoomSendQueue {
461    fn new(
462        globally_enabled: bool,
463        global_update_sender: broadcast::Sender<SendQueueUpdate>,
464        global_error_sender: broadcast::Sender<SendQueueRoomError>,
465        is_dropping: Arc<AtomicBool>,
466        client: &Client,
467        room_id: OwnedRoomId,
468        report_media_upload_progress: Arc<AtomicBool>,
469    ) -> Self {
470        let (update_sender, _) = broadcast::channel(32);
471
472        let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
473        let notifier = Arc::new(Notify::new());
474
475        let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
476        let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
477
478        let task = client.task_monitor().spawn_infinite_task(
479            "send_queue",
480            Self::sending_task(
481                weak_room.clone(),
482                queue.clone(),
483                notifier.clone(),
484                global_update_sender.clone(),
485                update_sender.clone(),
486                locally_enabled.clone(),
487                global_error_sender,
488                is_dropping,
489                report_media_upload_progress,
490            ),
491        );
492
493        Self {
494            inner: Arc::new(RoomSendQueueInner {
495                room: weak_room,
496                global_update_sender,
497                update_sender,
498                _task: task,
499                queue,
500                notifier,
501                locally_enabled,
502            }),
503        }
504    }
505
506    /// Queues a raw event for sending it to this room.
507    ///
508    /// This immediately returns, and will push the event to be sent into a
509    /// queue, handled in the background.
510    ///
511    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
512    /// the [`Self::subscribe()`] method to get updates about the sending of
513    /// that event.
514    ///
515    /// By default, if sending failed on the first attempt, it will be retried a
516    /// few times. If sending failed after those retries, the entire
517    /// client's sending queue will be disabled, and it will need to be
518    /// manually re-enabled by the caller (e.g. after network is back, or when
519    /// something has been done about the faulty requests).
520    pub async fn send_raw(
521        &self,
522        content: Raw<AnyMessageLikeEventContent>,
523        event_type: String,
524    ) -> Result<SendHandle, RoomSendQueueError> {
525        let Some(room) = self.inner.room.get() else {
526            return Err(RoomSendQueueError::RoomDisappeared);
527        };
528        if room.state() != RoomState::Joined {
529            return Err(RoomSendQueueError::RoomNotJoined);
530        }
531
532        let content = SerializableEventContent::from_raw(content, event_type);
533
534        let created_at = MilliSecondsSinceUnixEpoch::now();
535        let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
536        trace!(%transaction_id, "manager sends a raw event to the background task");
537
538        self.inner.notifier.notify_one();
539
540        let send_handle = SendHandle {
541            room: self.clone(),
542            transaction_id: transaction_id.clone(),
543            media_handles: vec![],
544            created_at,
545        };
546
547        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
548            transaction_id,
549            content: LocalEchoContent::Event {
550                serialized_event: content,
551                send_handle: send_handle.clone(),
552                send_error: None,
553            },
554        }));
555
556        Ok(send_handle)
557    }
558
559    /// Queues an event for sending it to this room.
560    ///
561    /// This immediately returns, and will push the event to be sent into a
562    /// queue, handled in the background.
563    ///
564    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
565    /// the [`Self::subscribe()`] method to get updates about the sending of
566    /// that event.
567    ///
568    /// By default, if sending failed on the first attempt, it will be retried a
569    /// few times. If sending failed after those retries, the entire
570    /// client's sending queue will be disabled, and it will need to be
571    /// manually re-enabled by the caller (e.g. after network is back, or when
572    /// something has been done about the faulty requests).
573    pub async fn send(
574        &self,
575        content: AnyMessageLikeEventContent,
576    ) -> Result<SendHandle, RoomSendQueueError> {
577        self.send_raw(
578            Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
579            content.event_type().to_string(),
580        )
581        .await
582    }
583
584    /// Queues a redaction of another event for sending it to this room.
585    ///
586    /// This immediately returns, and will push the redaction to be sent into a
587    /// queue, handled in the background.
588    ///
589    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
590    /// the [`Self::subscribe()`] method to get updates about the sending of
591    /// that redaction.
592    ///
593    /// By default, if sending failed on the first attempt, it will be retried a
594    /// few times. If sending failed after those retries, the entire
595    /// client's sending queue will be disabled, and it will need to be
596    /// manually re-enabled by the caller (e.g. after network is back, or when
597    /// something has been done about the faulty requests).
598    pub async fn redact(
599        &self,
600        redacts: OwnedEventId,
601        reason: Option<&str>,
602    ) -> Result<SendRedactionHandle, RoomSendQueueError> {
603        let Some(room) = self.inner.room.get() else {
604            return Err(RoomSendQueueError::RoomDisappeared);
605        };
606        if room.state() != RoomState::Joined {
607            return Err(RoomSendQueueError::RoomNotJoined);
608        }
609
610        let request = QueuedRequestKind::Redaction {
611            redacts: redacts.clone(),
612            reason: reason.map(str::to_owned),
613        };
614
615        let created_at = MilliSecondsSinceUnixEpoch::now();
616        let transaction_id = self.inner.queue.push(request, created_at).await?;
617        trace!(%transaction_id, "manager sends a redaction event to the background task");
618
619        self.inner.notifier.notify_one();
620
621        let send_handle =
622            SendRedactionHandle { room: self.clone(), transaction_id: transaction_id.clone() };
623
624        self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
625            transaction_id,
626            content: LocalEchoContent::Redaction {
627                redacts,
628                reason: reason.map(str::to_owned),
629                send_handle: send_handle.clone(),
630                send_error: None,
631            },
632        }));
633
634        Ok(send_handle)
635    }
636
637    /// Returns the current local requests as well as a receiver to listen to
638    /// the send queue updates, as defined in [`RoomSendQueueUpdate`].
639    ///
640    /// Use [`SendQueue::subscribe`] to subscribe to update for _all rooms_ with
641    /// a single receiver.
642    pub async fn subscribe(
643        &self,
644    ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
645    {
646        let local_echoes = self.inner.queue.local_echoes(self).await?;
647
648        Ok((local_echoes, self.inner.update_sender.subscribe()))
649    }
650
651    /// A task that must be spawned in the async runtime, running in the
652    /// background for each room that has a send queue.
653    ///
654    /// It only progresses forward: nothing can be cancelled at any point, which
655    /// makes the implementation not overly complicated to follow.
656    #[allow(clippy::too_many_arguments)]
657    #[instrument(skip_all, fields(room_id = %room.room_id()))]
658    async fn sending_task(
659        room: WeakRoom,
660        queue: QueueStorage,
661        notifier: Arc<Notify>,
662        global_update_sender: broadcast::Sender<SendQueueUpdate>,
663        update_sender: broadcast::Sender<RoomSendQueueUpdate>,
664        locally_enabled: Arc<AtomicBool>,
665        global_error_sender: broadcast::Sender<SendQueueRoomError>,
666        is_dropping: Arc<AtomicBool>,
667        report_media_upload_progress: Arc<AtomicBool>,
668    ) {
669        trace!("spawned the sending task");
670
671        let room_id = room.room_id();
672
673        loop {
674            // A request to shut down should be preferred above everything else.
675            if is_dropping.load(Ordering::SeqCst) {
676                trace!("shutting down!");
677                break;
678            }
679
680            // Try to apply dependent requests now; those applying to previously failed
681            // attempts (local echoes) would succeed now.
682            let mut new_updates = Vec::new();
683            if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
684                warn!("errors when applying dependent requests: {err}");
685            }
686
687            for up in new_updates {
688                send_update(&global_update_sender, &update_sender, room_id, up);
689            }
690
691            if !locally_enabled.load(Ordering::SeqCst) {
692                trace!("not enabled, sleeping");
693                // Wait for an explicit wakeup.
694                notifier.notified().await;
695                continue;
696            }
697
698            let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
699                Ok(Some(request)) => request,
700
701                Ok(None) => {
702                    trace!("queue is empty, sleeping");
703                    // Wait for an explicit wakeup.
704                    notifier.notified().await;
705                    continue;
706                }
707
708                Err(err) => {
709                    warn!("error when loading next request to send: {err}");
710                    continue;
711                }
712            };
713
714            let txn_id = queued_request.transaction_id.clone();
715            trace!(txn_id = %txn_id, "received a request to send!");
716
717            let Some(room) = room.get() else {
718                if is_dropping.load(Ordering::SeqCst) {
719                    break;
720                }
721                error!("the weak room couldn't be upgraded but we're not shutting down?");
722                continue;
723            };
724
725            // If this is a media/gallery upload, prepare the following:
726            // - transaction id for the related media event request,
727            // - progress metadata to feed the final media upload progress
728            // - an observable to watch the media upload progress.
729            let (related_txn_id, media_upload_progress_info, http_progress) =
730                if let QueuedRequestKind::MediaUpload {
731                    cache_key,
732                    thumbnail_source,
733                    #[cfg(feature = "unstable-msc4274")]
734                    accumulated,
735                    related_to,
736                    ..
737                } = &queued_request.kind
738                {
739                    // Prepare to watch and communicate the request's progress for media uploads, if
740                    // it has been requested.
741                    let (media_upload_progress_info, http_progress) =
742                        if report_media_upload_progress.load(Ordering::SeqCst) {
743                            let media_upload_progress_info =
744                                RoomSendQueue::create_media_upload_progress_info(
745                                    &queued_request.transaction_id,
746                                    related_to,
747                                    cache_key,
748                                    thumbnail_source.as_ref(),
749                                    #[cfg(feature = "unstable-msc4274")]
750                                    accumulated,
751                                    &room,
752                                    &queue,
753                                )
754                                .await;
755
756                            let progress = RoomSendQueue::create_media_upload_progress_observable(
757                                &media_upload_progress_info,
758                                related_to,
759                                &update_sender,
760                            );
761
762                            (Some(media_upload_progress_info), Some(progress))
763                        } else {
764                            Default::default()
765                        };
766
767                    (Some(related_to.clone()), media_upload_progress_info, http_progress)
768                } else {
769                    Default::default()
770                };
771
772            match Self::handle_request(&room, queued_request, cancel_upload_rx, http_progress).await
773            {
774                Ok((Some(parent_key), encryption_info)) => match queue
775                    .mark_as_sent(&txn_id, parent_key.clone())
776                    .await
777                {
778                    Ok(()) => match parent_key {
779                        SentRequestKey::Event { event_id, event, event_type } => {
780                            send_update(
781                                &global_update_sender,
782                                &update_sender,
783                                room_id,
784                                RoomSendQueueUpdate::SentEvent {
785                                    transaction_id: txn_id,
786                                    event_id: event_id.clone(),
787                                },
788                            );
789
790                            // The event has been sent to the server and the server has received it.
791                            // Yepee! Now, we usually wait on the server to give us back the event
792                            // via the sync.
793                            //
794                            // Problem: sometimes the network lags, can be down, or the server may
795                            // be slow; well, anything can happen.
796                            //
797                            // It results in a weird situation where the user sees its event being
798                            // sent, then disappears before it's received again from the server.
799                            //
800                            // To avoid this situation, we eagerly save the event in the Event
801                            // Cache. It's similar to what would happen if the event was echoed back
802                            // from the server via the sync, but we avoid any network issues. The
803                            // Event Cache is smart enough to deduplicate events based on the event
804                            // ID, so it's safe to do that.
805                            //
806                            // If this little feature fails, it MUST NOT stop the Send Queue. Any
807                            // errors are logged, but the Send Queue will continue as if everything
808                            // happened successfully. This feature is not considered “crucial”.
809                            if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
810                            {
811                                let timeline_event = match Raw::from_json_string(
812                                    // Create a compact string: remove all useless spaces.
813                                    format!(
814                                        "{{\
815                                            \"event_id\":\"{event_id}\",\
816                                            \"origin_server_ts\":{ts},\
817                                            \"sender\":\"{sender}\",\
818                                            \"type\":\"{type}\",\
819                                            \"content\":{content}\
820                                        }}",
821                                        event_id = event_id,
822                                        ts = MilliSecondsSinceUnixEpoch::now().get(),
823                                        sender = room.client().user_id().expect("Client must be logged-in"),
824                                        type = event_type,
825                                        content = event.into_json(),
826                                    ),
827                                ) {
828                                    Ok(event) => match encryption_info {
829                                        #[cfg(feature = "e2e-encryption")]
830                                        Some(encryption_info) => {
831                                            use matrix_sdk_base::deserialized_responses::DecryptedRoomEvent;
832                                            let decrypted_event = DecryptedRoomEvent {
833                                                event: event.cast_unchecked(),
834                                                encryption_info: Arc::new(encryption_info),
835                                                unsigned_encryption_info: None,
836                                            };
837                                            Some(TimelineEvent::from_decrypted(
838                                                decrypted_event,
839                                                None,
840                                            ))
841                                        }
842                                        _ => Some(TimelineEvent::from_plaintext(event)),
843                                    },
844                                    Err(err) => {
845                                        error!(
846                                            ?err,
847                                            "Failed to build the (sync) event before the saving in the Event Cache"
848                                        );
849                                        None
850                                    }
851                                };
852
853                                // In case of an error, just log the error but don't stop the Send
854                                // Queue. This feature is not crucial.
855                                if let Some(timeline_event) = timeline_event
856                                    && let Err(err) = room_event_cache
857                                        .insert_sent_event_from_send_queue(timeline_event)
858                                        .await
859                                {
860                                    error!(
861                                        ?err,
862                                        "Failed to save the sent event in the Event Cache"
863                                    );
864                                }
865                            } else {
866                                info!(
867                                    "Cannot insert the sent event in the Event Cache because \
868                                    either the room no longer exists, or the Room Event Cache cannot be retrieved"
869                                );
870                            }
871                        }
872
873                        SentRequestKey::Media(sent_media_info) => {
874                            // Generate some final progress information, even if incremental
875                            // progress wasn't requested.
876                            let index =
877                                media_upload_progress_info.as_ref().map_or(0, |info| info.index);
878                            let progress = media_upload_progress_info
879                                .as_ref()
880                                .map(|info| {
881                                    AbstractProgress { current: info.bytes, total: info.bytes }
882                                        + info.offsets
883                                })
884                                .unwrap_or(AbstractProgress { current: 1, total: 1 });
885
886                            // Purposefully don't use `send_update` here, because we don't want to
887                            // notify the global listeners about an upload progress update.
888                            let _ = update_sender.send(RoomSendQueueUpdate::MediaUpload {
889                                related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
890                                file: Some(sent_media_info.file),
891                                index,
892                                progress,
893                            });
894                        }
895
896                        SentRequestKey::Redaction { event_id, redacts, reason } => {
897                            send_update(
898                                &global_update_sender,
899                                &update_sender,
900                                room_id,
901                                RoomSendQueueUpdate::SentEvent {
902                                    transaction_id: txn_id,
903                                    event_id: event_id.clone(),
904                                },
905                            );
906
907                            // The redaction event has been sent to the server and the server has
908                            // received it. It's safe to cache the event
909                            // now to avoid any inconsistencies until the server
910                            // sends down the remote echo via the sync.
911                            if let Ok((room_event_cache, _drop_handles)) = room.event_cache().await
912                            {
913                                let content_field_redacts = room.version().is_some_and(|id| {
914                                    id.rules()
915                                        .is_some_and(|rules| rules.redaction.content_field_redacts)
916                                });
917                                let redacts = if content_field_redacts.not() {
918                                    format!("\"redacts\":\"{redacts}\",")
919                                } else {
920                                    "".to_owned()
921                                };
922                                let reason = reason.map_or_else(
923                                    || "".to_owned(),
924                                    |r| format!("\"reason\": \"{r}\""),
925                                );
926                                let content = if content_field_redacts {
927                                    format!("\"redacts\":\"{redacts}\",{reason}")
928                                } else {
929                                    reason
930                                };
931
932                                let timeline_event = match Raw::from_json_string(
933                                    // Create a compact string: remove all useless spaces.
934                                    format!(
935                                        "{{\
936                                            {redacts}\
937                                            \"event_id\":\"{event_id}\",\
938                                            \"origin_server_ts\":{ts},\
939                                            \"sender\":\"{sender}\",\
940                                            \"type\":\"{type}\",\
941                                            \"content\":{{{content}}}\
942                                        }}",
943                                        redacts = redacts,
944                                        event_id = event_id,
945                                        ts = MilliSecondsSinceUnixEpoch::now().get(),
946                                        sender = room.client().user_id().expect("Client must be logged-in"),
947                                        type = TimelineEventType::RoomRedaction,
948                                        content = content
949                                    ),
950                                ) {
951                                    Ok(event) => Some(TimelineEvent::from_plaintext(event)),
952                                    Err(err) => {
953                                        error!(
954                                            ?err,
955                                            "Failed to build the (sync) redaction event before the saving in the Event Cache"
956                                        );
957                                        None
958                                    }
959                                };
960
961                                // In case of an error, just log the error but don't stop the Send
962                                // Queue. This feature is not crucial.
963                                if let Some(timeline_event) = timeline_event
964                                    && let Err(err) = room_event_cache
965                                        .insert_sent_event_from_send_queue(timeline_event)
966                                        .await
967                                {
968                                    error!(
969                                        ?err,
970                                        "Failed to save the sent redaction event in the Event Cache"
971                                    );
972                                }
973                            } else {
974                                info!(
975                                    "Cannot insert the sent redaction event in the Event Cache because \
976                                    either the room no longer exists, or the Room Event Cache cannot be retrieved"
977                                );
978                            }
979                        }
980                    },
981
982                    Err(err) => {
983                        warn!("unable to mark queued request as sent: {err}");
984                    }
985                },
986
987                Ok((None, _)) => {
988                    debug!("Request has been aborted while running, continuing.");
989                }
990
991                Err(err) => {
992                    let is_recoverable = match err {
993                        crate::Error::Http(ref http_err) => {
994                            // All transient errors are recoverable.
995                            matches!(
996                                http_err.retry_kind(),
997                                RetryKind::Transient { .. } | RetryKind::NetworkFailure
998                            )
999                        }
1000
1001                        // `ConcurrentRequestFailed` typically happens because of an HTTP failure;
1002                        // since we don't get the underlying error, be lax and consider it
1003                        // recoverable, and let observers decide to retry it or not. At some point
1004                        // we'll get the actual underlying error.
1005                        crate::Error::ConcurrentRequestFailed => true,
1006
1007                        // As of 2024-06-27, all other error types are considered unrecoverable.
1008                        _ => false,
1009                    };
1010
1011                    // Disable the queue for this room after any kind of error happened.
1012                    locally_enabled.store(false, Ordering::SeqCst);
1013
1014                    if is_recoverable {
1015                        warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
1016
1017                        // In this case, we intentionally keep the request in the queue, but mark it
1018                        // as not being sent anymore.
1019                        queue.mark_as_not_being_sent(&txn_id).await;
1020
1021                        // Let observers know about a failure *after* we've
1022                        // marked the item as not being sent anymore. Otherwise,
1023                        // there's a possible race where a caller might try to
1024                        // remove an item, while it's still marked as being
1025                        // sent, resulting in a cancellation failure.
1026                    } else {
1027                        warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
1028
1029                        // Mark the request as wedged, so it's not picked at any future point.
1030                        if let Err(storage_error) =
1031                            queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
1032                        {
1033                            warn!("unable to mark request as wedged: {storage_error}");
1034                        }
1035                    }
1036
1037                    let error = Arc::new(err);
1038
1039                    let _ = global_error_sender.send(SendQueueRoomError {
1040                        room_id: room_id.to_owned(),
1041                        error: error.clone(),
1042                        is_recoverable,
1043                    });
1044
1045                    send_update(
1046                        &global_update_sender,
1047                        &update_sender,
1048                        room_id,
1049                        RoomSendQueueUpdate::SendError {
1050                            transaction_id: related_txn_id.unwrap_or(txn_id),
1051                            error,
1052                            is_recoverable,
1053                        },
1054                    );
1055                }
1056            }
1057        }
1058
1059        info!("exited sending task");
1060    }
1061
1062    /// Handles a single request and returns the [`SentRequestKey`] on success
1063    /// (unless the request was cancelled, in which case it'll return
1064    /// `None`).
1065    async fn handle_request(
1066        room: &Room,
1067        request: QueuedRequest,
1068        cancel_upload_rx: Option<oneshot::Receiver<()>>,
1069        progress: Option<SharedObservable<TransmissionProgress>>,
1070    ) -> Result<(Option<SentRequestKey>, Option<EncryptionInfo>), crate::Error> {
1071        match request.kind {
1072            QueuedRequestKind::Event { content } => {
1073                let (event, event_type) = content.into_raw();
1074
1075                let result = room
1076                    .send_raw(&event_type, &event)
1077                    .with_transaction_id(&request.transaction_id)
1078                    .with_request_config(RequestConfig::short_retry())
1079                    .await?;
1080
1081                trace!(txn_id = %request.transaction_id, event_id = %result.response.event_id, "event successfully sent");
1082
1083                Ok((
1084                    Some(SentRequestKey::Event {
1085                        event_id: result.response.event_id,
1086                        event,
1087                        event_type,
1088                    }),
1089                    result.encryption_info,
1090                ))
1091            }
1092
1093            QueuedRequestKind::MediaUpload {
1094                content_type,
1095                cache_key,
1096                thumbnail_source,
1097                related_to: relates_to,
1098                #[cfg(feature = "unstable-msc4274")]
1099                accumulated,
1100            } => {
1101                trace!(%relates_to, "uploading media related to event");
1102
1103                let fut = async move {
1104                    let data = room
1105                        .client()
1106                        .media_store()
1107                        .lock()
1108                        .await?
1109                        .get_media_content(&cache_key)
1110                        .await?
1111                        .ok_or(crate::Error::SendQueueWedgeError(Box::new(
1112                            QueueWedgeError::MissingMediaContent,
1113                        )))?;
1114
1115                    let mime = Mime::from_str(&content_type).map_err(|_| {
1116                        crate::Error::SendQueueWedgeError(Box::new(
1117                            QueueWedgeError::InvalidMimeType { mime_type: content_type.clone() },
1118                        ))
1119                    })?;
1120
1121                    #[cfg(feature = "e2e-encryption")]
1122                    let media_source = if room.latest_encryption_state().await?.is_encrypted() {
1123                        trace!("upload will be encrypted (encrypted room)");
1124
1125                        let mut cursor = std::io::Cursor::new(data);
1126                        let mut req = room
1127                            .client
1128                            .upload_encrypted_file(&mut cursor)
1129                            .with_request_config(RequestConfig::short_retry());
1130                        if let Some(progress) = progress {
1131                            req = req.with_send_progress_observable(progress);
1132                        }
1133                        let encrypted_file = req.await?;
1134
1135                        MediaSource::Encrypted(Box::new(encrypted_file))
1136                    } else {
1137                        trace!("upload will be in clear text (room without encryption)");
1138
1139                        let request_config = RequestConfig::short_retry()
1140                            .timeout(Media::reasonable_upload_timeout(&data));
1141                        let mut req =
1142                            room.client().media().upload(&mime, data, Some(request_config));
1143                        if let Some(progress) = progress {
1144                            req = req.with_send_progress_observable(progress);
1145                        }
1146                        let res = req.await?;
1147
1148                        MediaSource::Plain(res.content_uri)
1149                    };
1150
1151                    #[cfg(not(feature = "e2e-encryption"))]
1152                    let media_source = {
1153                        let request_config = RequestConfig::short_retry()
1154                            .timeout(Media::reasonable_upload_timeout(&data));
1155                        let mut req =
1156                            room.client().media().upload(&mime, data, Some(request_config));
1157                        if let Some(progress) = progress {
1158                            req = req.with_send_progress_observable(progress);
1159                        }
1160                        let res = req.await?;
1161                        MediaSource::Plain(res.content_uri)
1162                    };
1163
1164                    let uri = match &media_source {
1165                        MediaSource::Plain(uri) => uri,
1166                        MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
1167                    };
1168                    trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
1169
1170                    Ok((
1171                        Some(SentRequestKey::Media(SentMediaInfo {
1172                            file: media_source,
1173                            thumbnail: thumbnail_source,
1174                            #[cfg(feature = "unstable-msc4274")]
1175                            accumulated,
1176                        })),
1177                        None,
1178                    ))
1179                };
1180
1181                let wait_for_cancel = async move {
1182                    if let Some(rx) = cancel_upload_rx {
1183                        rx.await
1184                    } else {
1185                        std::future::pending().await
1186                    }
1187                };
1188
1189                tokio::select! {
1190                    biased;
1191
1192                    _ = wait_for_cancel => {
1193                        Ok((None, None))
1194                    }
1195
1196                    res = fut => {
1197                        res
1198                    }
1199                }
1200            }
1201
1202            QueuedRequestKind::Redaction { redacts, reason } => {
1203                let result = room
1204                    .redact(&redacts, reason.as_deref(), Some(request.transaction_id.clone()))
1205                    .await?;
1206
1207                trace!(txn_id = %request.transaction_id, event_id = %result.event_id, "redaction successfully sent");
1208
1209                Ok((
1210                    Some(SentRequestKey::Redaction { event_id: result.event_id, redacts, reason }),
1211                    None,
1212                ))
1213            }
1214        }
1215    }
1216
1217    /// Returns whether the room is enabled, at the room level.
1218    pub fn is_enabled(&self) -> bool {
1219        self.inner.locally_enabled.load(Ordering::SeqCst)
1220    }
1221
1222    /// Set the locally enabled flag for this room queue.
1223    pub fn set_enabled(&self, enabled: bool) {
1224        self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
1225
1226        // No need to wake a task to tell it it's been disabled, so only notify if we're
1227        // re-enabling the queue.
1228        if enabled {
1229            self.inner.notifier.notify_one();
1230        }
1231    }
1232
1233    /// Send an update on the room send queue channel, and on the global send
1234    /// queue channel, i.e. it sends a [`RoomSendQueueUpdate`] and a
1235    /// [`SendQueueUpdate`].
1236    fn send_update(&self, update: RoomSendQueueUpdate) {
1237        let _ = self.inner.update_sender.send(update.clone());
1238        let _ = self
1239            .inner
1240            .global_update_sender
1241            .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update });
1242    }
1243}
1244
1245fn send_update(
1246    global_update_sender: &broadcast::Sender<SendQueueUpdate>,
1247    update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
1248    room_id: &RoomId,
1249    update: RoomSendQueueUpdate,
1250) {
1251    let _ = update_sender.send(update.clone());
1252    let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
1253}
1254
1255impl From<&crate::Error> for QueueWedgeError {
1256    fn from(value: &crate::Error) -> Self {
1257        match value {
1258            #[cfg(feature = "e2e-encryption")]
1259            crate::Error::OlmError(error) => match &**error {
1260                OlmError::SessionRecipientCollectionError(error) => match error {
1261                    SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
1262                        QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
1263                    }
1264
1265                    SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
1266                        QueueWedgeError::IdentityViolations { users: users.clone() }
1267                    }
1268
1269                    SessionRecipientCollectionError::CrossSigningNotSetup
1270                    | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
1271                        QueueWedgeError::CrossVerificationRequired
1272                    }
1273                },
1274                _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1275            },
1276
1277            // Flatten errors of `Self` type.
1278            crate::Error::SendQueueWedgeError(error) => *error.clone(),
1279
1280            _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
1281        }
1282    }
1283}
1284
1285struct RoomSendQueueInner {
1286    /// The room which this send queue relates to.
1287    room: WeakRoom,
1288
1289    /// Global sender to send [`SendQueueUpdate`].
1290    ///
1291    /// See [`SendQueue::subscribe`].
1292    global_update_sender: broadcast::Sender<SendQueueUpdate>,
1293
1294    /// Broadcaster for notifications about the statuses of requests to be sent.
1295    ///
1296    /// Can be subscribed to from the outside.
1297    ///
1298    /// See [`RoomSendQueue::subscribe`].
1299    update_sender: broadcast::Sender<RoomSendQueueUpdate>,
1300
1301    /// Queue of requests that are either to be sent, or being sent.
1302    ///
1303    /// When a request has been sent to the server, it is removed from that
1304    /// queue *after* being sent. That way, we will retry sending upon
1305    /// failure, in the same order requests have been inserted in the first
1306    /// place.
1307    queue: QueueStorage,
1308
1309    /// A notifier that's updated any time common data is touched (stopped or
1310    /// enabled statuses), or the associated room [`QueueStorage`].
1311    notifier: Arc<Notify>,
1312
1313    /// Should the room process new requests or not (because e.g. it might be
1314    /// running off the network)?
1315    locally_enabled: Arc<AtomicBool>,
1316
1317    /// Handle to the actual sending task. Unused, but kept alive along this
1318    /// data structure.
1319    _task: BackgroundTaskHandle,
1320}
1321
1322/// Information about a request being sent right this moment.
1323struct BeingSentInfo {
1324    /// Transaction id of the thing being sent.
1325    transaction_id: OwnedTransactionId,
1326
1327    /// For an upload request, a trigger to cancel the upload before it
1328    /// completes.
1329    cancel_upload: Option<oneshot::Sender<()>>,
1330}
1331
1332impl BeingSentInfo {
1333    /// Aborts the upload, if a trigger is available.
1334    ///
1335    /// Consumes the object because the sender is a oneshot and will be consumed
1336    /// upon sending.
1337    fn cancel_upload(self) -> bool {
1338        if let Some(cancel_upload) = self.cancel_upload {
1339            let _ = cancel_upload.send(());
1340            true
1341        } else {
1342            false
1343        }
1344    }
1345}
1346
1347/// A specialized lock that guards both against the state store and the
1348/// [`Self::being_sent`] data.
1349#[derive(Clone)]
1350struct StoreLock {
1351    /// Reference to the client, to get access to the underlying store.
1352    client: WeakClient,
1353
1354    /// The one queued request that is being sent at the moment, along with
1355    /// associated data that can be useful to act upon it.
1356    ///
1357    /// Also used as the lock to access the state store.
1358    being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
1359}
1360
1361impl StoreLock {
1362    /// Gets a hold of the locked store and [`Self::being_sent`] pair.
1363    async fn lock(&self) -> StoreLockGuard {
1364        StoreLockGuard {
1365            client: self.client.clone(),
1366            being_sent: self.being_sent.clone().lock_owned().await,
1367        }
1368    }
1369}
1370
1371/// A lock guard obtained through locking with [`StoreLock`].
1372/// `being_sent` data.
1373struct StoreLockGuard {
1374    /// Reference to the client, to get access to the underlying store.
1375    client: WeakClient,
1376
1377    /// The one queued request that is being sent at the moment, along with
1378    /// associated data that can be useful to act upon it.
1379    being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
1380}
1381
1382impl StoreLockGuard {
1383    /// Get a client from the locked state, useful to get a handle on a store.
1384    fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
1385        self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
1386    }
1387}
1388
1389#[derive(Clone)]
1390struct QueueStorage {
1391    /// A lock to make sure the state store is only accessed once at a time, to
1392    /// make some store operations atomic.
1393    store: StoreLock,
1394
1395    /// To which room is this storage related.
1396    room_id: OwnedRoomId,
1397
1398    /// In-memory mapping of media transaction IDs to thumbnail sizes for the
1399    /// purpose of progress reporting.
1400    ///
1401    /// The keys are the transaction IDs for sending the media or gallery event
1402    /// after all uploads have finished. This allows us to easily clean up the
1403    /// cache after the event was sent.
1404    ///
1405    /// For media uploads, the value vector will always have a single element.
1406    ///
1407    /// For galleries, some gallery items might not have a thumbnail while
1408    /// others do. Since we access the thumbnails by their index within the
1409    /// gallery, the vector needs to hold optional usize's.
1410    thumbnail_file_sizes: Arc<SyncMutex<HashMap<OwnedTransactionId, Vec<Option<usize>>>>>,
1411}
1412
1413impl QueueStorage {
1414    /// Default priority for a queued request.
1415    const LOW_PRIORITY: usize = 0;
1416
1417    /// High priority for a queued request that must be handled before others.
1418    const HIGH_PRIORITY: usize = 10;
1419
1420    /// Create a new queue for queuing requests to be sent later.
1421    fn new(client: WeakClient, room: OwnedRoomId) -> Self {
1422        Self {
1423            room_id: room,
1424            store: StoreLock { client, being_sent: Default::default() },
1425            thumbnail_file_sizes: Default::default(),
1426        }
1427    }
1428
1429    /// Push a new event to be sent in the queue, with a default priority of 0.
1430    ///
1431    /// Returns the transaction id chosen to identify the request.
1432    async fn push(
1433        &self,
1434        request: QueuedRequestKind,
1435        created_at: MilliSecondsSinceUnixEpoch,
1436    ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
1437        let transaction_id = TransactionId::new();
1438
1439        self.store
1440            .lock()
1441            .await
1442            .client()?
1443            .state_store()
1444            .save_send_queue_request(
1445                &self.room_id,
1446                transaction_id.clone(),
1447                created_at,
1448                request,
1449                Self::LOW_PRIORITY,
1450            )
1451            .await?;
1452
1453        Ok(transaction_id)
1454    }
1455
1456    /// Peeks the next request to be sent, marking it as being sent.
1457    ///
1458    /// It is required to call [`Self::mark_as_sent`] after it's been
1459    /// effectively sent.
1460    async fn peek_next_to_send(
1461        &self,
1462    ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
1463    {
1464        let mut guard = self.store.lock().await;
1465        let queued_requests =
1466            guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
1467
1468        if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
1469            let (cancel_upload_tx, cancel_upload_rx) =
1470                if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
1471                    let (tx, rx) = oneshot::channel();
1472                    (Some(tx), Some(rx))
1473                } else {
1474                    Default::default()
1475                };
1476
1477            let prev = guard.being_sent.replace(BeingSentInfo {
1478                transaction_id: request.transaction_id.clone(),
1479                cancel_upload: cancel_upload_tx,
1480            });
1481
1482            if let Some(prev) = prev {
1483                error!(
1484                    prev_txn = ?prev.transaction_id,
1485                    "a previous request was still active while picking a new one"
1486                );
1487            }
1488
1489            Ok(Some((request.clone(), cancel_upload_rx)))
1490        } else {
1491            Ok(None)
1492        }
1493    }
1494
1495    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1496    /// with the given transaction id as not being sent anymore, so it can
1497    /// be removed from the queue later.
1498    async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1499        let was_being_sent = self.store.lock().await.being_sent.take();
1500
1501        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1502        if prev_txn != Some(transaction_id) {
1503            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1504        }
1505    }
1506
1507    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1508    /// with the given transaction id as being wedged (and not being sent
1509    /// anymore), so it can be removed from the queue later.
1510    async fn mark_as_wedged(
1511        &self,
1512        transaction_id: &TransactionId,
1513        reason: QueueWedgeError,
1514    ) -> Result<(), RoomSendQueueStorageError> {
1515        // Keep the lock until we're done touching the storage.
1516        let mut guard = self.store.lock().await;
1517        let was_being_sent = guard.being_sent.take();
1518
1519        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1520        if prev_txn != Some(transaction_id) {
1521            error!(
1522                ?prev_txn,
1523                "previous active request didn't match that we expect (after permanent error)",
1524            );
1525        }
1526
1527        Ok(guard
1528            .client()?
1529            .state_store()
1530            .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1531            .await?)
1532    }
1533
1534    /// Marks a request identified with the given transaction id as being now
1535    /// unwedged and adds it back to the queue.
1536    async fn mark_as_unwedged(
1537        &self,
1538        transaction_id: &TransactionId,
1539    ) -> Result<(), RoomSendQueueStorageError> {
1540        Ok(self
1541            .store
1542            .lock()
1543            .await
1544            .client()?
1545            .state_store()
1546            .update_send_queue_request_status(&self.room_id, transaction_id, None)
1547            .await?)
1548    }
1549
1550    /// Marks a request pushed with [`Self::push`] and identified with the given
1551    /// transaction id as sent, by removing it from the local queue.
1552    async fn mark_as_sent(
1553        &self,
1554        transaction_id: &TransactionId,
1555        parent_key: SentRequestKey,
1556    ) -> Result<(), RoomSendQueueStorageError> {
1557        // Keep the lock until we're done touching the storage.
1558        let mut guard = self.store.lock().await;
1559        let was_being_sent = guard.being_sent.take();
1560
1561        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1562        if prev_txn != Some(transaction_id) {
1563            error!(
1564                ?prev_txn,
1565                "previous active request didn't match that we expect (after successful send)",
1566            );
1567        }
1568
1569        let client = guard.client()?;
1570        let store = client.state_store();
1571
1572        // Update all dependent requests.
1573        store
1574            .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1575            .await?;
1576
1577        let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1578
1579        if !removed {
1580            warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1581        }
1582
1583        self.thumbnail_file_sizes.lock().remove(transaction_id);
1584
1585        Ok(())
1586    }
1587
1588    /// Cancel a sending command for an event that has been sent with
1589    /// [`Self::push`] with the given transaction id.
1590    ///
1591    /// Returns whether the given transaction has been effectively removed. If
1592    /// false, this either means that the transaction id was unrelated to
1593    /// this queue, or that the request was sent before we cancelled it.
1594    async fn cancel_event(
1595        &self,
1596        transaction_id: &TransactionId,
1597    ) -> Result<bool, RoomSendQueueStorageError> {
1598        let guard = self.store.lock().await;
1599
1600        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1601            == Some(transaction_id)
1602        {
1603            // Save the intent to redact the event.
1604            guard
1605                .client()?
1606                .state_store()
1607                .save_dependent_queued_request(
1608                    &self.room_id,
1609                    transaction_id,
1610                    ChildTransactionId::new(),
1611                    MilliSecondsSinceUnixEpoch::now(),
1612                    DependentQueuedRequestKind::RedactEvent,
1613                )
1614                .await?;
1615
1616            return Ok(true);
1617        }
1618
1619        let removed = guard
1620            .client()?
1621            .state_store()
1622            .remove_send_queue_request(&self.room_id, transaction_id)
1623            .await?;
1624
1625        self.thumbnail_file_sizes.lock().remove(transaction_id);
1626
1627        Ok(removed)
1628    }
1629
1630    /// Replace an event that has been sent with [`Self::push`] with the given
1631    /// transaction id, before it's been actually sent.
1632    ///
1633    /// Returns whether the given transaction has been effectively edited. If
1634    /// false, this either means that the transaction id was unrelated to
1635    /// this queue, or that the request was sent before we edited it.
1636    async fn replace_event(
1637        &self,
1638        transaction_id: &TransactionId,
1639        serializable: SerializableEventContent,
1640    ) -> Result<bool, RoomSendQueueStorageError> {
1641        let guard = self.store.lock().await;
1642
1643        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1644            == Some(transaction_id)
1645        {
1646            // Save the intent to edit the associated event.
1647            guard
1648                .client()?
1649                .state_store()
1650                .save_dependent_queued_request(
1651                    &self.room_id,
1652                    transaction_id,
1653                    ChildTransactionId::new(),
1654                    MilliSecondsSinceUnixEpoch::now(),
1655                    DependentQueuedRequestKind::EditEvent { new_content: serializable },
1656                )
1657                .await?;
1658
1659            return Ok(true);
1660        }
1661
1662        let edited = guard
1663            .client()?
1664            .state_store()
1665            .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1666            .await?;
1667
1668        Ok(edited)
1669    }
1670
1671    /// Push requests (and dependents) to upload a media.
1672    ///
1673    /// See the module-level description for details of the whole processus.
1674    #[allow(clippy::too_many_arguments)]
1675    async fn push_media(
1676        &self,
1677        event: RoomMessageEventContent,
1678        content_type: Mime,
1679        send_event_txn: OwnedTransactionId,
1680        created_at: MilliSecondsSinceUnixEpoch,
1681        upload_file_txn: OwnedTransactionId,
1682        file_media_request: MediaRequestParameters,
1683        thumbnail: Option<QueueThumbnailInfo>,
1684    ) -> Result<(), RoomSendQueueStorageError> {
1685        let guard = self.store.lock().await;
1686        let client = guard.client()?;
1687        let store = client.state_store();
1688
1689        // There's only a single media to be sent, so it has at most one thumbnail.
1690        let thumbnail_file_sizes = vec![thumbnail.as_ref().map(|t| t.file_size)];
1691
1692        let thumbnail_info = self
1693            .push_thumbnail_and_media_uploads(
1694                store,
1695                &content_type,
1696                send_event_txn.clone(),
1697                created_at,
1698                upload_file_txn.clone(),
1699                file_media_request,
1700                thumbnail,
1701            )
1702            .await?;
1703
1704        // Push the dependent request for the event itself.
1705        store
1706            .save_dependent_queued_request(
1707                &self.room_id,
1708                &upload_file_txn,
1709                send_event_txn.clone().into(),
1710                created_at,
1711                DependentQueuedRequestKind::FinishUpload {
1712                    local_echo: Box::new(event),
1713                    file_upload: upload_file_txn.clone(),
1714                    thumbnail_info,
1715                },
1716            )
1717            .await?;
1718
1719        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1720
1721        Ok(())
1722    }
1723
1724    /// Push requests (and dependents) to upload a gallery.
1725    ///
1726    /// See the module-level description for details of the whole processus.
1727    #[cfg(feature = "unstable-msc4274")]
1728    #[allow(clippy::too_many_arguments)]
1729    async fn push_gallery(
1730        &self,
1731        event: RoomMessageEventContent,
1732        send_event_txn: OwnedTransactionId,
1733        created_at: MilliSecondsSinceUnixEpoch,
1734        item_queue_infos: Vec<GalleryItemQueueInfo>,
1735    ) -> Result<(), RoomSendQueueStorageError> {
1736        let guard = self.store.lock().await;
1737        let client = guard.client()?;
1738        let store = client.state_store();
1739
1740        let mut finish_item_infos = Vec::with_capacity(item_queue_infos.len());
1741        let mut thumbnail_file_sizes = Vec::with_capacity(item_queue_infos.len());
1742
1743        let Some((first, rest)) = item_queue_infos.split_first() else {
1744            return Ok(());
1745        };
1746
1747        let GalleryItemQueueInfo { content_type, upload_file_txn, file_media_request, thumbnail } =
1748            first;
1749
1750        let thumbnail_info = self
1751            .push_thumbnail_and_media_uploads(
1752                store,
1753                content_type,
1754                send_event_txn.clone(),
1755                created_at,
1756                upload_file_txn.clone(),
1757                file_media_request.clone(),
1758                thumbnail.clone(),
1759            )
1760            .await?;
1761
1762        finish_item_infos
1763            .push(FinishGalleryItemInfo { file_upload: upload_file_txn.clone(), thumbnail_info });
1764        thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1765
1766        let mut last_upload_file_txn = upload_file_txn.clone();
1767
1768        for item_queue_info in rest {
1769            let GalleryItemQueueInfo {
1770                content_type,
1771                upload_file_txn,
1772                file_media_request,
1773                thumbnail,
1774            } = item_queue_info;
1775
1776            let thumbnail_info = if let Some(QueueThumbnailInfo {
1777                finish_upload_thumbnail_info: thumbnail_info,
1778                media_request_parameters: thumbnail_media_request,
1779                content_type: thumbnail_content_type,
1780                ..
1781            }) = thumbnail
1782            {
1783                let upload_thumbnail_txn = thumbnail_info.txn.clone();
1784
1785                // Save the thumbnail upload request as a dependent request of the last file
1786                // upload.
1787                store
1788                    .save_dependent_queued_request(
1789                        &self.room_id,
1790                        &last_upload_file_txn,
1791                        upload_thumbnail_txn.clone().into(),
1792                        created_at,
1793                        DependentQueuedRequestKind::UploadFileOrThumbnail {
1794                            content_type: thumbnail_content_type.to_string(),
1795                            cache_key: thumbnail_media_request.clone(),
1796                            related_to: send_event_txn.clone(),
1797                            parent_is_thumbnail_upload: false,
1798                        },
1799                    )
1800                    .await?;
1801
1802                last_upload_file_txn = upload_thumbnail_txn;
1803
1804                Some(thumbnail_info)
1805            } else {
1806                None
1807            };
1808
1809            // Save the file upload as a dependent request of the previous upload.
1810            store
1811                .save_dependent_queued_request(
1812                    &self.room_id,
1813                    &last_upload_file_txn,
1814                    upload_file_txn.clone().into(),
1815                    created_at,
1816                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1817                        content_type: content_type.to_string(),
1818                        cache_key: file_media_request.clone(),
1819                        related_to: send_event_txn.clone(),
1820                        parent_is_thumbnail_upload: thumbnail.is_some(),
1821                    },
1822                )
1823                .await?;
1824
1825            finish_item_infos.push(FinishGalleryItemInfo {
1826                file_upload: upload_file_txn.clone(),
1827                thumbnail_info: thumbnail_info.cloned(),
1828            });
1829            thumbnail_file_sizes.push(thumbnail.as_ref().map(|t| t.file_size));
1830
1831            last_upload_file_txn = upload_file_txn.clone();
1832        }
1833
1834        // Push the request for the event itself as a dependent request of the last file
1835        // upload.
1836        store
1837            .save_dependent_queued_request(
1838                &self.room_id,
1839                &last_upload_file_txn,
1840                send_event_txn.clone().into(),
1841                created_at,
1842                DependentQueuedRequestKind::FinishGallery {
1843                    local_echo: Box::new(event),
1844                    item_infos: finish_item_infos,
1845                },
1846            )
1847            .await?;
1848
1849        self.thumbnail_file_sizes.lock().insert(send_event_txn, thumbnail_file_sizes);
1850
1851        Ok(())
1852    }
1853
1854    /// If a thumbnail exists, pushes a [`QueuedRequestKind::MediaUpload`] to
1855    /// upload it
1856    /// and a [`DependentQueuedRequestKind::UploadFileOrThumbnail`] to upload
1857    /// the media itself. Otherwise, pushes a
1858    /// [`QueuedRequestKind::MediaUpload`] to upload the media directly.
1859    #[allow(clippy::too_many_arguments)]
1860    async fn push_thumbnail_and_media_uploads(
1861        &self,
1862        store: &DynStateStore,
1863        content_type: &Mime,
1864        send_event_txn: OwnedTransactionId,
1865        created_at: MilliSecondsSinceUnixEpoch,
1866        upload_file_txn: OwnedTransactionId,
1867        file_media_request: MediaRequestParameters,
1868        thumbnail: Option<QueueThumbnailInfo>,
1869    ) -> Result<Option<FinishUploadThumbnailInfo>, RoomSendQueueStorageError> {
1870        if let Some(QueueThumbnailInfo {
1871            finish_upload_thumbnail_info: thumbnail_info,
1872            media_request_parameters: thumbnail_media_request,
1873            content_type: thumbnail_content_type,
1874            ..
1875        }) = thumbnail
1876        {
1877            let upload_thumbnail_txn = thumbnail_info.txn.clone();
1878
1879            // Save the thumbnail upload request.
1880            store
1881                .save_send_queue_request(
1882                    &self.room_id,
1883                    upload_thumbnail_txn.clone(),
1884                    created_at,
1885                    QueuedRequestKind::MediaUpload {
1886                        content_type: thumbnail_content_type.to_string(),
1887                        cache_key: thumbnail_media_request,
1888                        thumbnail_source: None, // the thumbnail has no thumbnails :)
1889                        related_to: send_event_txn.clone(),
1890                        #[cfg(feature = "unstable-msc4274")]
1891                        accumulated: vec![],
1892                    },
1893                    Self::LOW_PRIORITY,
1894                )
1895                .await?;
1896
1897            // Save the file upload request as a dependent request of the thumbnail upload.
1898            store
1899                .save_dependent_queued_request(
1900                    &self.room_id,
1901                    &upload_thumbnail_txn,
1902                    upload_file_txn.into(),
1903                    created_at,
1904                    DependentQueuedRequestKind::UploadFileOrThumbnail {
1905                        content_type: content_type.to_string(),
1906                        cache_key: file_media_request,
1907                        related_to: send_event_txn,
1908                        parent_is_thumbnail_upload: true,
1909                    },
1910                )
1911                .await?;
1912
1913            Ok(Some(thumbnail_info))
1914        } else {
1915            // Save the file upload as its own request, not a dependent one.
1916            store
1917                .save_send_queue_request(
1918                    &self.room_id,
1919                    upload_file_txn,
1920                    created_at,
1921                    QueuedRequestKind::MediaUpload {
1922                        content_type: content_type.to_string(),
1923                        cache_key: file_media_request,
1924                        thumbnail_source: None,
1925                        related_to: send_event_txn,
1926                        #[cfg(feature = "unstable-msc4274")]
1927                        accumulated: vec![],
1928                    },
1929                    Self::LOW_PRIORITY,
1930                )
1931                .await?;
1932
1933            Ok(None)
1934        }
1935    }
1936
1937    /// Reacts to the given local echo of an event.
1938    #[instrument(skip(self))]
1939    async fn react(
1940        &self,
1941        transaction_id: &TransactionId,
1942        key: String,
1943        created_at: MilliSecondsSinceUnixEpoch,
1944    ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1945        let guard = self.store.lock().await;
1946        let client = guard.client()?;
1947        let store = client.state_store();
1948
1949        let requests = store.load_send_queue_requests(&self.room_id).await?;
1950
1951        // If the target event has been already sent, abort immediately.
1952        if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1953            // We didn't find it as a queued request; try to find it as a dependent queued
1954            // request.
1955            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1956            if !dependent_requests
1957                .into_iter()
1958                .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1959                .any(|child_txn| *child_txn == *transaction_id)
1960            {
1961                // We didn't find it as either a request or a dependent request, abort.
1962                return Ok(None);
1963            }
1964        }
1965
1966        // Record the dependent request.
1967        let reaction_txn_id = ChildTransactionId::new();
1968        store
1969            .save_dependent_queued_request(
1970                &self.room_id,
1971                transaction_id,
1972                reaction_txn_id.clone(),
1973                created_at,
1974                DependentQueuedRequestKind::ReactEvent { key },
1975            )
1976            .await?;
1977
1978        Ok(Some(reaction_txn_id))
1979    }
1980
1981    /// Returns a list of the local echoes, that is, all the requests that we're
1982    /// about to send but that haven't been sent yet (or are being sent).
1983    async fn local_echoes(
1984        &self,
1985        room: &RoomSendQueue,
1986    ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1987        let guard = self.store.lock().await;
1988        let client = guard.client()?;
1989        let store = client.state_store();
1990
1991        let local_requests =
1992            store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1993                Some(LocalEcho {
1994                    transaction_id: queued.transaction_id.clone(),
1995                    content: match queued.kind {
1996                        QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1997                            serialized_event: content,
1998                            send_handle: SendHandle {
1999                                room: room.clone(),
2000                                transaction_id: queued.transaction_id,
2001                                media_handles: vec![],
2002                                created_at: queued.created_at,
2003                            },
2004                            send_error: queued.error,
2005                        },
2006
2007                        QueuedRequestKind::MediaUpload { .. } => {
2008                            // Don't return uploaded medias as their own things; the accompanying
2009                            // event represented as a dependent request should be sufficient.
2010                            return None;
2011                        }
2012
2013                        QueuedRequestKind::Redaction { redacts, reason } => {
2014                            LocalEchoContent::Redaction {
2015                                redacts,
2016                                reason,
2017                                send_handle: SendRedactionHandle {
2018                                    room: room.clone(),
2019                                    transaction_id: queued.transaction_id,
2020                                },
2021                                send_error: queued.error,
2022                            }
2023                        }
2024                    },
2025                })
2026            });
2027
2028        let reactions_and_medias = store
2029            .load_dependent_queued_requests(&self.room_id)
2030            .await?
2031            .into_iter()
2032            .filter_map(|dep| match dep.kind {
2033                DependentQueuedRequestKind::EditEvent { .. }
2034                | DependentQueuedRequestKind::RedactEvent => {
2035                    // TODO: reflect local edits/redacts too?
2036                    None
2037                }
2038
2039                DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
2040                    transaction_id: dep.own_transaction_id.clone().into(),
2041                    content: LocalEchoContent::React {
2042                        key,
2043                        send_handle: SendReactionHandle {
2044                            room: room.clone(),
2045                            transaction_id: dep.own_transaction_id,
2046                        },
2047                        applies_to: dep.parent_transaction_id,
2048                    },
2049                }),
2050
2051                DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
2052                    // Don't reflect these: only the associated event is interesting to observers.
2053                    None
2054                }
2055
2056                DependentQueuedRequestKind::FinishUpload {
2057                    local_echo,
2058                    file_upload,
2059                    thumbnail_info,
2060                } => {
2061                    // Materialize as an event local echo.
2062                    Some(LocalEcho {
2063                        transaction_id: dep.own_transaction_id.clone().into(),
2064                        content: LocalEchoContent::Event {
2065                            serialized_event: SerializableEventContent::new(&(*local_echo).into())
2066                                .ok()?,
2067                            send_handle: SendHandle {
2068                                room: room.clone(),
2069                                transaction_id: dep.own_transaction_id.into(),
2070                                media_handles: vec![MediaHandles {
2071                                    upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
2072                                    upload_file_txn: file_upload,
2073                                }],
2074                                created_at: dep.created_at,
2075                            },
2076                            send_error: None,
2077                        },
2078                    })
2079                }
2080
2081                #[cfg(feature = "unstable-msc4274")]
2082                DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2083                    // Materialize as an event local echo.
2084                    self.create_gallery_local_echo(
2085                        dep.own_transaction_id,
2086                        room,
2087                        dep.created_at,
2088                        local_echo,
2089                        item_infos,
2090                    )
2091                }
2092            });
2093
2094        Ok(local_requests.chain(reactions_and_medias).collect())
2095    }
2096
2097    /// Create a local echo for a gallery event.
2098    #[cfg(feature = "unstable-msc4274")]
2099    fn create_gallery_local_echo(
2100        &self,
2101        transaction_id: ChildTransactionId,
2102        room: &RoomSendQueue,
2103        created_at: MilliSecondsSinceUnixEpoch,
2104        local_echo: Box<RoomMessageEventContent>,
2105        item_infos: Vec<FinishGalleryItemInfo>,
2106    ) -> Option<LocalEcho> {
2107        Some(LocalEcho {
2108            transaction_id: transaction_id.clone().into(),
2109            content: LocalEchoContent::Event {
2110                serialized_event: SerializableEventContent::new(&(*local_echo).into()).ok()?,
2111                send_handle: SendHandle {
2112                    room: room.clone(),
2113                    transaction_id: transaction_id.into(),
2114                    media_handles: item_infos
2115                        .into_iter()
2116                        .map(|i| MediaHandles {
2117                            upload_thumbnail_txn: i.thumbnail_info.map(|info| info.txn),
2118                            upload_file_txn: i.file_upload,
2119                        })
2120                        .collect(),
2121                    created_at,
2122                },
2123                send_error: None,
2124            },
2125        })
2126    }
2127
2128    /// Try to apply a single dependent request, whether it's local or remote.
2129    ///
2130    /// This swallows errors that would retrigger every time if we retried
2131    /// applying the dependent request: invalid edit content, etc.
2132    ///
2133    /// Returns true if the dependent request has been sent (or should not be
2134    /// retried later).
2135    #[instrument(skip_all)]
2136    async fn try_apply_single_dependent_request(
2137        &self,
2138        client: &Client,
2139        dependent_request: DependentQueuedRequest,
2140        new_updates: &mut Vec<RoomSendQueueUpdate>,
2141    ) -> Result<bool, RoomSendQueueError> {
2142        let store = client.state_store();
2143
2144        let parent_key = dependent_request.parent_key;
2145
2146        match dependent_request.kind {
2147            DependentQueuedRequestKind::EditEvent { new_content } => {
2148                if let Some(parent_key) = parent_key {
2149                    let Some(event_id) = parent_key.into_event_id() else {
2150                        return Err(RoomSendQueueError::StorageError(
2151                            RoomSendQueueStorageError::InvalidParentKey,
2152                        ));
2153                    };
2154
2155                    // The parent event has been sent, so send an edit event.
2156                    let room = client
2157                        .get_room(&self.room_id)
2158                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
2159
2160                    // Check the event is one we know how to edit with an edit event.
2161
2162                    // It must be deserializable…
2163                    let edited_content = match new_content.deserialize() {
2164                        Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
2165                            // Assume no relationships.
2166                            EditedContent::RoomMessage(c.into())
2167                        }
2168
2169                        Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
2170                            let poll_start = c.poll_start().clone();
2171                            EditedContent::PollStart {
2172                                fallback_text: poll_start.question.text.clone(),
2173                                new_content: poll_start,
2174                            }
2175                        }
2176
2177                        Ok(c) => {
2178                            warn!("Unsupported edit content type: {:?}", c.event_type());
2179                            return Ok(true);
2180                        }
2181
2182                        Err(err) => {
2183                            warn!("Unable to deserialize: {err}");
2184                            return Ok(true);
2185                        }
2186                    };
2187
2188                    let edit_event = match room.make_edit_event(&event_id, edited_content).await {
2189                        Ok(e) => e,
2190                        Err(err) => {
2191                            warn!("couldn't create edited event: {err}");
2192                            return Ok(true);
2193                        }
2194                    };
2195
2196                    // Queue the edit event in the send queue 🧠.
2197                    let serializable = SerializableEventContent::from_raw(
2198                        Raw::new(&edit_event)
2199                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2200                        edit_event.event_type().to_string(),
2201                    );
2202
2203                    store
2204                        .save_send_queue_request(
2205                            &self.room_id,
2206                            dependent_request.own_transaction_id.into(),
2207                            dependent_request.created_at,
2208                            serializable.into(),
2209                            Self::HIGH_PRIORITY,
2210                        )
2211                        .await
2212                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2213                } else {
2214                    // The parent event is still local; update the local echo.
2215                    let edited = store
2216                        .update_send_queue_request(
2217                            &self.room_id,
2218                            &dependent_request.parent_transaction_id,
2219                            new_content.into(),
2220                        )
2221                        .await
2222                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2223
2224                    if !edited {
2225                        warn!("missing local echo upon dependent edit");
2226                    }
2227                }
2228            }
2229
2230            DependentQueuedRequestKind::RedactEvent => {
2231                if let Some(parent_key) = parent_key {
2232                    let Some(event_id) = parent_key.into_event_id() else {
2233                        return Err(RoomSendQueueError::StorageError(
2234                            RoomSendQueueStorageError::InvalidParentKey,
2235                        ));
2236                    };
2237
2238                    // The parent event has been sent; send a redaction.
2239                    let room = client
2240                        .get_room(&self.room_id)
2241                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
2242
2243                    // Ideally we'd use the send queue to send the redaction, but the protocol has
2244                    // changed the shape of a room.redaction after v11, so keep it simple and try
2245                    // once here.
2246
2247                    // Note: no reason is provided because we materialize the intent of "cancel
2248                    // sending the parent event".
2249
2250                    if let Err(err) = room
2251                        .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
2252                        .await
2253                    {
2254                        warn!("error when sending a redact for {event_id}: {err}");
2255                        return Ok(false);
2256                    }
2257                } else {
2258                    // The parent event is still local (sending must have failed); redact the local
2259                    // echo.
2260                    let removed = store
2261                        .remove_send_queue_request(
2262                            &self.room_id,
2263                            &dependent_request.parent_transaction_id,
2264                        )
2265                        .await
2266                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2267
2268                    if !removed {
2269                        warn!("missing local echo upon dependent redact");
2270                    }
2271                }
2272            }
2273
2274            DependentQueuedRequestKind::ReactEvent { key } => {
2275                if let Some(parent_key) = parent_key {
2276                    let Some(parent_event_id) = parent_key.into_event_id() else {
2277                        return Err(RoomSendQueueError::StorageError(
2278                            RoomSendQueueStorageError::InvalidParentKey,
2279                        ));
2280                    };
2281
2282                    // Queue the reaction event in the send queue 🧠.
2283                    let react_event =
2284                        ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
2285                    let serializable = SerializableEventContent::from_raw(
2286                        Raw::new(&react_event)
2287                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
2288                        react_event.event_type().to_string(),
2289                    );
2290
2291                    store
2292                        .save_send_queue_request(
2293                            &self.room_id,
2294                            dependent_request.own_transaction_id.into(),
2295                            dependent_request.created_at,
2296                            serializable.into(),
2297                            Self::HIGH_PRIORITY,
2298                        )
2299                        .await
2300                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
2301                } else {
2302                    // Not applied yet, we should retry later => false.
2303                    return Ok(false);
2304                }
2305            }
2306
2307            DependentQueuedRequestKind::UploadFileOrThumbnail {
2308                content_type,
2309                cache_key,
2310                related_to,
2311                parent_is_thumbnail_upload,
2312            } => {
2313                let Some(parent_key) = parent_key else {
2314                    // Not finished yet, we should retry later => false.
2315                    return Ok(false);
2316                };
2317                self.handle_dependent_file_or_thumbnail_upload(
2318                    client,
2319                    dependent_request.own_transaction_id.into(),
2320                    parent_key,
2321                    content_type,
2322                    cache_key,
2323                    related_to,
2324                    parent_is_thumbnail_upload,
2325                )
2326                .await?;
2327            }
2328
2329            DependentQueuedRequestKind::FinishUpload {
2330                local_echo,
2331                file_upload,
2332                thumbnail_info,
2333            } => {
2334                let Some(parent_key) = parent_key else {
2335                    // Not finished yet, we should retry later => false.
2336                    return Ok(false);
2337                };
2338                self.handle_dependent_finish_upload(
2339                    client,
2340                    dependent_request.own_transaction_id.into(),
2341                    parent_key,
2342                    *local_echo,
2343                    file_upload,
2344                    thumbnail_info,
2345                    new_updates,
2346                )
2347                .await?;
2348            }
2349
2350            #[cfg(feature = "unstable-msc4274")]
2351            DependentQueuedRequestKind::FinishGallery { local_echo, item_infos } => {
2352                let Some(parent_key) = parent_key else {
2353                    // Not finished yet, we should retry later => false.
2354                    return Ok(false);
2355                };
2356                self.handle_dependent_finish_gallery_upload(
2357                    client,
2358                    dependent_request.own_transaction_id.into(),
2359                    parent_key,
2360                    *local_echo,
2361                    item_infos,
2362                    new_updates,
2363                )
2364                .await?;
2365            }
2366        }
2367
2368        Ok(true)
2369    }
2370
2371    #[instrument(skip(self))]
2372    async fn apply_dependent_requests(
2373        &self,
2374        new_updates: &mut Vec<RoomSendQueueUpdate>,
2375    ) -> Result<(), RoomSendQueueError> {
2376        let guard = self.store.lock().await;
2377
2378        let client = guard.client()?;
2379        let store = client.state_store();
2380
2381        let dependent_requests = store
2382            .load_dependent_queued_requests(&self.room_id)
2383            .await
2384            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2385
2386        let num_initial_dependent_requests = dependent_requests.len();
2387        if num_initial_dependent_requests == 0 {
2388            // Returning early here avoids a bit of useless logging.
2389            return Ok(());
2390        }
2391
2392        let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
2393
2394        // Get rid of the all non-canonical dependent events.
2395        for original in &dependent_requests {
2396            if !canonicalized_dependent_requests
2397                .iter()
2398                .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
2399            {
2400                store
2401                    .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
2402                    .await
2403                    .map_err(RoomSendQueueStorageError::StateStoreError)?;
2404            }
2405        }
2406
2407        let mut num_dependent_requests = canonicalized_dependent_requests.len();
2408
2409        debug!(
2410            num_dependent_requests,
2411            num_initial_dependent_requests, "starting handling of dependent requests"
2412        );
2413
2414        for dependent in canonicalized_dependent_requests {
2415            let dependent_id = dependent.own_transaction_id.clone();
2416
2417            match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
2418                Ok(should_remove) => {
2419                    if should_remove {
2420                        // The dependent request has been successfully applied, forget about it.
2421                        store
2422                            .remove_dependent_queued_request(&self.room_id, &dependent_id)
2423                            .await
2424                            .map_err(RoomSendQueueStorageError::StateStoreError)?;
2425
2426                        num_dependent_requests -= 1;
2427                    }
2428                }
2429
2430                Err(err) => {
2431                    warn!("error when applying single dependent request: {err}");
2432                }
2433            }
2434        }
2435
2436        debug!(
2437            leftover_dependent_requests = num_dependent_requests,
2438            "stopped handling dependent request"
2439        );
2440
2441        Ok(())
2442    }
2443
2444    /// Remove a single dependent request from storage.
2445    async fn remove_dependent_send_queue_request(
2446        &self,
2447        dependent_event_id: &ChildTransactionId,
2448    ) -> Result<bool, RoomSendQueueStorageError> {
2449        Ok(self
2450            .store
2451            .lock()
2452            .await
2453            .client()?
2454            .state_store()
2455            .remove_dependent_queued_request(&self.room_id, dependent_event_id)
2456            .await?)
2457    }
2458}
2459
2460#[cfg(feature = "unstable-msc4274")]
2461/// Metadata needed for pushing gallery item uploads onto the send queue.
2462struct GalleryItemQueueInfo {
2463    content_type: Mime,
2464    upload_file_txn: OwnedTransactionId,
2465    file_media_request: MediaRequestParameters,
2466    thumbnail: Option<QueueThumbnailInfo>,
2467}
2468
2469/// The content of a local echo.
2470#[derive(Clone, Debug)]
2471pub enum LocalEchoContent {
2472    /// The local echo contains an actual event ready to display.
2473    Event {
2474        /// Content of the event itself (along with its type) that we are about
2475        /// to send.
2476        serialized_event: SerializableEventContent,
2477        /// A handle to manipulate the sending of the associated event.
2478        send_handle: SendHandle,
2479        /// Whether trying to send this local echo failed in the past with an
2480        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
2481        send_error: Option<QueueWedgeError>,
2482    },
2483
2484    /// A local echo has been reacted to.
2485    React {
2486        /// The key with which the local echo has been reacted to.
2487        key: String,
2488        /// A handle to manipulate the sending of the reaction.
2489        send_handle: SendReactionHandle,
2490        /// The local echo which has been reacted to.
2491        applies_to: OwnedTransactionId,
2492    },
2493
2494    /// A local echo of a redaction event.
2495    Redaction {
2496        /// The ID of the redacted event.
2497        redacts: OwnedEventId,
2498        /// The reason for the event being redacted.
2499        reason: Option<String>,
2500        /// A handle to manipulate the sending of the associated event.
2501        send_handle: SendRedactionHandle,
2502        /// Whether trying to send this local echo failed in the past with an
2503        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
2504        send_error: Option<QueueWedgeError>,
2505    },
2506}
2507
2508/// A local representation for a request that hasn't been sent yet to the user's
2509/// homeserver.
2510#[derive(Clone, Debug)]
2511pub struct LocalEcho {
2512    /// Transaction id used to identify the associated request.
2513    pub transaction_id: OwnedTransactionId,
2514    /// The content for the local echo.
2515    pub content: LocalEchoContent,
2516}
2517
2518/// An update to a room send queue, observable with
2519/// [`RoomSendQueue::subscribe`].
2520#[derive(Clone, Debug)]
2521pub enum RoomSendQueueUpdate {
2522    /// A new local event is being sent.
2523    ///
2524    /// There's been a user query to create this event. It is being sent to the
2525    /// server.
2526    NewLocalEvent(LocalEcho),
2527
2528    /// A local event that hadn't been sent to the server yet has been cancelled
2529    /// before sending.
2530    CancelledLocalEvent {
2531        /// Transaction id used to identify this event.
2532        transaction_id: OwnedTransactionId,
2533    },
2534
2535    /// A local event's content has been replaced with something else.
2536    ReplacedLocalEvent {
2537        /// Transaction id used to identify this event.
2538        transaction_id: OwnedTransactionId,
2539
2540        /// The new content replacing the previous one.
2541        new_content: SerializableEventContent,
2542    },
2543
2544    /// An error happened when an event was being sent.
2545    ///
2546    /// The event has not been removed from the queue. All the send queues
2547    /// will be disabled after this happens, and must be manually re-enabled.
2548    SendError {
2549        /// Transaction id used to identify this event.
2550        transaction_id: OwnedTransactionId,
2551        /// Error received while sending the event.
2552        error: Arc<crate::Error>,
2553        /// Whether the error is considered recoverable or not.
2554        ///
2555        /// An error that's recoverable will disable the room's send queue,
2556        /// while an unrecoverable error will be parked, until the user
2557        /// decides to cancel sending it.
2558        is_recoverable: bool,
2559    },
2560
2561    /// The event has been unwedged and sending is now being retried.
2562    RetryEvent {
2563        /// Transaction id used to identify this event.
2564        transaction_id: OwnedTransactionId,
2565    },
2566
2567    /// The event has been sent to the server, and the query returned
2568    /// successfully.
2569    SentEvent {
2570        /// Transaction id used to identify this event.
2571        transaction_id: OwnedTransactionId,
2572        /// Received event id from the send response.
2573        event_id: OwnedEventId,
2574    },
2575
2576    /// A media upload (consisting of a file and possibly a thumbnail) has made
2577    /// progress.
2578    MediaUpload {
2579        /// The media event this uploaded media relates to.
2580        related_to: OwnedTransactionId,
2581
2582        /// The final media source for the file if it has finished uploading.
2583        file: Option<MediaSource>,
2584
2585        /// The index of the media within the transaction. A file and its
2586        /// thumbnail share the same index. Will always be 0 for non-gallery
2587        /// media uploads.
2588        index: u64,
2589
2590        /// The combined upload progress across the file and, if existing, its
2591        /// thumbnail. For gallery uploads, the progress is reported per indexed
2592        /// gallery item.
2593        progress: AbstractProgress,
2594    },
2595}
2596
2597/// A [`RoomSendQueueUpdate`] with an associated [`OwnedRoomId`].
2598///
2599/// This is used by [`SendQueue::subscribe`] to get a single channel to receive
2600/// updates for all [`RoomSendQueue`]s.
2601#[derive(Clone, Debug)]
2602pub struct SendQueueUpdate {
2603    /// The room where the update happened.
2604    pub room_id: OwnedRoomId,
2605
2606    /// The update for this room.
2607    pub update: RoomSendQueueUpdate,
2608}
2609
2610/// An error triggered by the send queue module.
2611#[derive(Debug, thiserror::Error)]
2612pub enum RoomSendQueueError {
2613    /// The room isn't in the joined state.
2614    #[error("the room isn't in the joined state")]
2615    RoomNotJoined,
2616
2617    /// The room is missing from the client.
2618    ///
2619    /// This happens only whenever the client is shutting down.
2620    #[error("the room is now missing from the client")]
2621    RoomDisappeared,
2622
2623    /// Error coming from storage.
2624    #[error(transparent)]
2625    StorageError(#[from] RoomSendQueueStorageError),
2626
2627    /// The attachment event failed to be created.
2628    #[error("the attachment event could not be created")]
2629    FailedToCreateAttachment,
2630
2631    /// The gallery contains no items.
2632    #[cfg(feature = "unstable-msc4274")]
2633    #[error("the gallery contains no items")]
2634    EmptyGallery,
2635
2636    /// The gallery event failed to be created.
2637    #[cfg(feature = "unstable-msc4274")]
2638    #[error("the gallery event could not be created")]
2639    FailedToCreateGallery,
2640}
2641
2642/// An error triggered by the send queue storage.
2643#[derive(Debug, thiserror::Error)]
2644pub enum RoomSendQueueStorageError {
2645    /// Error caused by the state store.
2646    #[error(transparent)]
2647    StateStoreError(#[from] StoreError),
2648
2649    /// Error caused by the event cache store.
2650    #[error(transparent)]
2651    EventCacheStoreError(#[from] EventCacheStoreError),
2652
2653    /// Error caused by the event cache store.
2654    #[error(transparent)]
2655    MediaStoreError(#[from] MediaStoreError),
2656
2657    /// Error caused when attempting to get a handle on the event cache store.
2658    #[error(transparent)]
2659    LockError(#[from] CrossProcessLockError),
2660
2661    /// Error caused when (de)serializing into/from json.
2662    #[error(transparent)]
2663    JsonSerialization(#[from] serde_json::Error),
2664
2665    /// A parent key was expected to be of a certain type, and it was another
2666    /// type instead.
2667    #[error("a dependent event had an invalid parent key type")]
2668    InvalidParentKey,
2669
2670    /// The client is shutting down.
2671    #[error("The client is shutting down.")]
2672    ClientShuttingDown,
2673
2674    /// An operation not implemented on a send handle.
2675    #[error("This operation is not implemented for media uploads")]
2676    OperationNotImplementedYet,
2677
2678    /// Trying to edit a media caption for something that's not a media.
2679    #[error("Can't edit a media caption when the underlying event isn't a media")]
2680    InvalidMediaCaptionEdit,
2681}
2682
2683/// Extra transaction IDs useful during an upload.
2684#[derive(Clone, Debug)]
2685struct MediaHandles {
2686    /// Transaction id used when uploading the thumbnail.
2687    ///
2688    /// Optional because a media can be uploaded without a thumbnail.
2689    upload_thumbnail_txn: Option<OwnedTransactionId>,
2690
2691    /// Transaction id used when uploading the media itself.
2692    upload_file_txn: OwnedTransactionId,
2693}
2694
2695/// A handle to manipulate an event that was scheduled to be sent to a room.
2696#[derive(Clone, Debug)]
2697pub struct SendHandle {
2698    /// Link to the send queue used to send this request.
2699    room: RoomSendQueue,
2700
2701    /// Transaction id used for the sent request.
2702    ///
2703    /// If this is a media upload, this is the "main" transaction id, i.e. the
2704    /// one used to send the event, and that will be seen by observers.
2705    transaction_id: OwnedTransactionId,
2706
2707    /// Additional handles for a media upload.
2708    media_handles: Vec<MediaHandles>,
2709
2710    /// The time at which the event to be sent has been created.
2711    pub created_at: MilliSecondsSinceUnixEpoch,
2712}
2713
2714impl SendHandle {
2715    /// Creates a new [`SendHandle`].
2716    #[cfg(test)]
2717    pub(crate) fn new(
2718        room: RoomSendQueue,
2719        transaction_id: OwnedTransactionId,
2720        created_at: MilliSecondsSinceUnixEpoch,
2721    ) -> Self {
2722        Self { room, transaction_id, media_handles: vec![], created_at }
2723    }
2724
2725    fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
2726        if !self.media_handles.is_empty() {
2727            Err(RoomSendQueueStorageError::OperationNotImplementedYet)
2728        } else {
2729            Ok(())
2730        }
2731    }
2732
2733    /// Aborts the sending of the event, if it wasn't sent yet.
2734    ///
2735    /// Returns true if the sending could be aborted, false if not (i.e. the
2736    /// event had already been sent).
2737    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2738    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2739        trace!("received an abort request");
2740
2741        let queue = &self.room.inner.queue;
2742
2743        for handles in &self.media_handles {
2744            if queue.abort_upload(&self.transaction_id, handles).await? {
2745                // Propagate a cancelled update.
2746                self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2747                    transaction_id: self.transaction_id.clone(),
2748                });
2749
2750                return Ok(true);
2751            }
2752
2753            // If it failed, it means the sending of the event is not a
2754            // dependent request anymore. Fall back to the regular
2755            // code path below, that handles aborting sending of an event.
2756        }
2757
2758        if queue.cancel_event(&self.transaction_id).await? {
2759            trace!("successful abort");
2760
2761            // Propagate a cancelled update too.
2762            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2763                transaction_id: self.transaction_id.clone(),
2764            });
2765
2766            Ok(true)
2767        } else {
2768            debug!("local echo didn't exist anymore, can't abort");
2769            Ok(false)
2770        }
2771    }
2772
2773    /// Edits the content of a local echo with a raw event content.
2774    ///
2775    /// Returns true if the event to be sent was replaced, false if not (i.e.
2776    /// the event had already been sent).
2777    #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2778    pub async fn edit_raw(
2779        &self,
2780        new_content: Raw<AnyMessageLikeEventContent>,
2781        event_type: String,
2782    ) -> Result<bool, RoomSendQueueStorageError> {
2783        trace!("received an edit request");
2784        self.nyi_for_uploads()?;
2785
2786        let serializable = SerializableEventContent::from_raw(new_content, event_type);
2787
2788        if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
2789            trace!("successful edit");
2790
2791            // Wake up the queue, in case the room was asleep before the edit.
2792            self.room.inner.notifier.notify_one();
2793
2794            // Propagate a replaced update too.
2795            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2796                transaction_id: self.transaction_id.clone(),
2797                new_content: serializable,
2798            });
2799
2800            Ok(true)
2801        } else {
2802            debug!("local echo doesn't exist anymore, can't edit");
2803            Ok(false)
2804        }
2805    }
2806
2807    /// Edits the content of a local echo with an event content.
2808    ///
2809    /// Returns true if the event to be sent was replaced, false if not (i.e.
2810    /// the event had already been sent).
2811    pub async fn edit(
2812        &self,
2813        new_content: AnyMessageLikeEventContent,
2814    ) -> Result<bool, RoomSendQueueStorageError> {
2815        self.edit_raw(
2816            Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2817            new_content.event_type().to_string(),
2818        )
2819        .await
2820    }
2821
2822    /// Edits the content of a local echo with a media caption.
2823    ///
2824    /// Will fail if the event to be sent, represented by this send handle,
2825    /// wasn't a media.
2826    pub async fn edit_media_caption(
2827        &self,
2828        caption: Option<String>,
2829        formatted_caption: Option<FormattedBody>,
2830        mentions: Option<Mentions>,
2831    ) -> Result<bool, RoomSendQueueStorageError> {
2832        if let Some(new_content) = self
2833            .room
2834            .inner
2835            .queue
2836            .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2837            .await?
2838        {
2839            trace!("successful edit of media caption");
2840
2841            // Wake up the queue, in case the room was asleep before the edit.
2842            self.room.inner.notifier.notify_one();
2843
2844            let new_content = SerializableEventContent::new(&new_content)
2845                .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2846
2847            // Propagate a replaced update too.
2848            self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent {
2849                transaction_id: self.transaction_id.clone(),
2850                new_content,
2851            });
2852
2853            Ok(true)
2854        } else {
2855            debug!("local echo doesn't exist anymore, can't edit media caption");
2856            Ok(false)
2857        }
2858    }
2859
2860    /// Unwedge a local echo identified by its transaction identifier and try to
2861    /// resend it.
2862    pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2863        let room = &self.room.inner;
2864        room.queue
2865            .mark_as_unwedged(&self.transaction_id)
2866            .await
2867            .map_err(RoomSendQueueError::StorageError)?;
2868
2869        // If we have media handles, also try to unwedge them.
2870        //
2871        // It's fine to always do it to *all* the transaction IDs at once, because only
2872        // one of the three requests will be active at the same time, i.e. only
2873        // one entry will be updated in the store. The other two are either
2874        // done, or dependent requests.
2875
2876        for handles in &self.media_handles {
2877            room.queue
2878                .mark_as_unwedged(&handles.upload_file_txn)
2879                .await
2880                .map_err(RoomSendQueueError::StorageError)?;
2881
2882            if let Some(txn) = &handles.upload_thumbnail_txn {
2883                room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2884            }
2885        }
2886
2887        // Wake up the queue, in case the room was asleep before unwedging the request.
2888        room.notifier.notify_one();
2889
2890        self.room.send_update(RoomSendQueueUpdate::RetryEvent {
2891            transaction_id: self.transaction_id.clone(),
2892        });
2893
2894        Ok(())
2895    }
2896
2897    /// Send a reaction to the event as soon as it's sent.
2898    ///
2899    /// If returning `Ok(None)`; this means the reaction couldn't be sent
2900    /// because the event is already a remote one.
2901    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2902    pub async fn react(
2903        &self,
2904        key: String,
2905    ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2906        trace!("received an intent to react");
2907
2908        let created_at = MilliSecondsSinceUnixEpoch::now();
2909        if let Some(reaction_txn_id) =
2910            self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2911        {
2912            trace!("successfully queued react");
2913
2914            // Wake up the queue, in case the room was asleep before the sending.
2915            self.room.inner.notifier.notify_one();
2916
2917            // Propagate a new local event.
2918            let send_handle = SendReactionHandle {
2919                room: self.room.clone(),
2920                transaction_id: reaction_txn_id.clone(),
2921            };
2922
2923            self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2924                // Note: we do want to use the `txn_id` we're going to use for the reaction, not
2925                // the one for the event we're reacting to.
2926                transaction_id: reaction_txn_id.into(),
2927                content: LocalEchoContent::React {
2928                    key,
2929                    send_handle: send_handle.clone(),
2930                    applies_to: self.transaction_id.clone(),
2931                },
2932            }));
2933
2934            Ok(Some(send_handle))
2935        } else {
2936            debug!("local echo doesn't exist anymore, can't react");
2937            Ok(None)
2938        }
2939    }
2940}
2941
2942/// A handle to execute actions on the sending of a reaction.
2943#[derive(Clone, Debug)]
2944pub struct SendReactionHandle {
2945    /// Reference to the send queue for the room where this reaction was sent.
2946    room: RoomSendQueue,
2947    /// The own transaction id for the reaction.
2948    transaction_id: ChildTransactionId,
2949}
2950
2951impl SendReactionHandle {
2952    /// Creates a new [`SendReactionHandle`].
2953    #[cfg(test)]
2954    pub(crate) fn new(room: RoomSendQueue, transaction_id: ChildTransactionId) -> Self {
2955        Self { room, transaction_id }
2956    }
2957
2958    /// Abort the sending of the reaction.
2959    ///
2960    /// Will return true if the reaction could be aborted, false if it's been
2961    /// sent (and there's no matching local echo anymore).
2962    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2963        if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2964            // Simple case: the reaction was found in the dependent event list.
2965
2966            // Propagate a cancelled update too.
2967            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
2968                transaction_id: self.transaction_id.clone().into(),
2969            });
2970
2971            return Ok(true);
2972        }
2973
2974        // The reaction has already been queued for sending, try to abort it using a
2975        // regular abort.
2976        let handle = SendHandle {
2977            room: self.room.clone(),
2978            transaction_id: self.transaction_id.clone().into(),
2979            media_handles: vec![],
2980            created_at: MilliSecondsSinceUnixEpoch::now(),
2981        };
2982
2983        handle.abort().await
2984    }
2985
2986    /// The transaction id that will be used to send this reaction later.
2987    pub fn transaction_id(&self) -> &TransactionId {
2988        &self.transaction_id
2989    }
2990}
2991
2992/// A handle to manipulate a redaction event that was scheduled to be sent to a
2993/// room.
2994#[derive(Clone, Debug)]
2995pub struct SendRedactionHandle {
2996    /// Link to the send queue used to send this request.
2997    room: RoomSendQueue,
2998
2999    /// Transaction id used for the sent request.
3000    transaction_id: OwnedTransactionId,
3001}
3002
3003impl SendRedactionHandle {
3004    /// Creates a new [`SendRedactionHandle`].
3005    #[cfg(test)]
3006    pub(crate) fn new(room: RoomSendQueue, transaction_id: OwnedTransactionId) -> Self {
3007        Self { room, transaction_id }
3008    }
3009
3010    /// Abort the sending of the redaction.
3011    ///
3012    /// Will return true if the redaction could be aborted, false if it's been
3013    /// sent (and there's no matching local echo anymore).
3014    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
3015        trace!("received a redaction abort request");
3016
3017        let queue = &self.room.inner.queue;
3018
3019        if queue.cancel_event(&self.transaction_id).await? {
3020            trace!("successful redaction abort");
3021
3022            // Propagate a cancelled update too.
3023            self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent {
3024                transaction_id: self.transaction_id.clone(),
3025            });
3026
3027            Ok(true)
3028        } else {
3029            debug!("local echo of redaction didn't exist anymore, can't abort");
3030            Ok(false)
3031        }
3032    }
3033}
3034
3035/// From a given source of [`DependentQueuedRequest`], return only the most
3036/// meaningful, i.e. the ones that wouldn't be overridden after applying the
3037/// others.
3038fn canonicalize_dependent_requests(
3039    dependent: &[DependentQueuedRequest],
3040) -> Vec<DependentQueuedRequest> {
3041    let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
3042
3043    for d in dependent {
3044        let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
3045
3046        if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
3047            // The parent event has already been flagged for redaction, don't consider the
3048            // other dependent events.
3049            continue;
3050        }
3051
3052        match &d.kind {
3053            DependentQueuedRequestKind::EditEvent { .. } => {
3054                // Replace any previous edit with this one.
3055                if let Some(prev_edit) = prevs
3056                    .iter_mut()
3057                    .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
3058                {
3059                    *prev_edit = d;
3060                } else {
3061                    prevs.insert(0, d);
3062                }
3063            }
3064
3065            DependentQueuedRequestKind::UploadFileOrThumbnail { .. }
3066            | DependentQueuedRequestKind::FinishUpload { .. }
3067            | DependentQueuedRequestKind::ReactEvent { .. } => {
3068                // These requests can't be canonicalized, push them as is.
3069                prevs.push(d);
3070            }
3071
3072            #[cfg(feature = "unstable-msc4274")]
3073            DependentQueuedRequestKind::FinishGallery { .. } => {
3074                // This request can't be canonicalized, push it as is.
3075                prevs.push(d);
3076            }
3077
3078            DependentQueuedRequestKind::RedactEvent => {
3079                // Remove every other dependent action.
3080                prevs.clear();
3081                prevs.push(d);
3082            }
3083        }
3084    }
3085
3086    by_txn.into_values().flat_map(|entries| entries.into_iter().cloned()).collect()
3087}
3088
3089#[cfg(all(test, not(target_family = "wasm")))]
3090mod tests {
3091    use std::{sync::Arc, time::Duration};
3092
3093    use assert_matches2::{assert_let, assert_matches};
3094    use matrix_sdk_base::store::{
3095        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
3096        SerializableEventContent,
3097    };
3098    use matrix_sdk_test::{JoinedRoomBuilder, SyncResponseBuilder, async_test};
3099    use ruma::{
3100        MilliSecondsSinceUnixEpoch, TransactionId,
3101        events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent},
3102        room_id,
3103    };
3104
3105    use super::canonicalize_dependent_requests;
3106    use crate::{client::WeakClient, test_utils::logged_in_client};
3107
3108    #[test]
3109    fn test_canonicalize_dependent_events_created_at() {
3110        // Test to ensure the created_at field is being serialized and retrieved
3111        // correctly.
3112        let txn = TransactionId::new();
3113        let created_at = MilliSecondsSinceUnixEpoch::now();
3114
3115        let edit = DependentQueuedRequest {
3116            own_transaction_id: ChildTransactionId::new(),
3117            parent_transaction_id: txn.clone(),
3118            kind: DependentQueuedRequestKind::EditEvent {
3119                new_content: SerializableEventContent::new(
3120                    &RoomMessageEventContent::text_plain("edit").into(),
3121                )
3122                .unwrap(),
3123            },
3124            parent_key: None,
3125            created_at,
3126        };
3127
3128        let res = canonicalize_dependent_requests(&[edit]);
3129
3130        assert_eq!(res.len(), 1);
3131        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
3132        assert_let!(
3133            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
3134        );
3135        assert_eq!(msg.body(), "edit");
3136        assert_eq!(res[0].parent_transaction_id, txn);
3137        assert_eq!(res[0].created_at, created_at);
3138    }
3139
3140    #[async_test]
3141    async fn test_client_no_cycle_with_send_queue() {
3142        for enabled in [true, false] {
3143            let client = logged_in_client(None).await;
3144            let weak_client = WeakClient::from_client(&client);
3145
3146            {
3147                let mut sync_response_builder = SyncResponseBuilder::new();
3148
3149                let room_id = room_id!("!a:b.c");
3150
3151                // Make sure the client knows about the room.
3152                client
3153                    .base_client()
3154                    .receive_sync_response(
3155                        sync_response_builder
3156                            .add_joined_room(JoinedRoomBuilder::new(room_id))
3157                            .build_sync_response(),
3158                    )
3159                    .await
3160                    .unwrap();
3161
3162                let room = client.get_room(room_id).unwrap();
3163                let q = room.send_queue();
3164
3165                let _watcher = q.subscribe().await;
3166
3167                client.send_queue().set_enabled(enabled).await;
3168            }
3169
3170            drop(client);
3171
3172            // Give a bit of time for background tasks to die.
3173            tokio::time::sleep(Duration::from_millis(500)).await;
3174
3175            // The weak client must be the last reference to the client now.
3176            let client = weak_client.get();
3177            assert!(
3178                client.is_none(),
3179                "too many strong references to the client: {}",
3180                Arc::strong_count(&client.unwrap().inner)
3181            );
3182        }
3183    }
3184
3185    #[test]
3186    fn test_canonicalize_dependent_events_smoke_test() {
3187        // Smoke test: canonicalizing a single dependent event returns it.
3188        let txn = TransactionId::new();
3189
3190        let edit = DependentQueuedRequest {
3191            own_transaction_id: ChildTransactionId::new(),
3192            parent_transaction_id: txn.clone(),
3193            kind: DependentQueuedRequestKind::EditEvent {
3194                new_content: SerializableEventContent::new(
3195                    &RoomMessageEventContent::text_plain("edit").into(),
3196                )
3197                .unwrap(),
3198            },
3199            parent_key: None,
3200            created_at: MilliSecondsSinceUnixEpoch::now(),
3201        };
3202        let res = canonicalize_dependent_requests(&[edit]);
3203
3204        assert_eq!(res.len(), 1);
3205        assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
3206        assert_eq!(res[0].parent_transaction_id, txn);
3207        assert!(res[0].parent_key.is_none());
3208    }
3209
3210    #[test]
3211    fn test_canonicalize_dependent_events_redaction_preferred() {
3212        // A redaction is preferred over any other kind of dependent event.
3213        let txn = TransactionId::new();
3214
3215        let mut inputs = Vec::with_capacity(100);
3216        let redact = DependentQueuedRequest {
3217            own_transaction_id: ChildTransactionId::new(),
3218            parent_transaction_id: txn.clone(),
3219            kind: DependentQueuedRequestKind::RedactEvent,
3220            parent_key: None,
3221            created_at: MilliSecondsSinceUnixEpoch::now(),
3222        };
3223
3224        let edit = DependentQueuedRequest {
3225            own_transaction_id: ChildTransactionId::new(),
3226            parent_transaction_id: txn.clone(),
3227            kind: DependentQueuedRequestKind::EditEvent {
3228                new_content: SerializableEventContent::new(
3229                    &RoomMessageEventContent::text_plain("edit").into(),
3230                )
3231                .unwrap(),
3232            },
3233            parent_key: None,
3234            created_at: MilliSecondsSinceUnixEpoch::now(),
3235        };
3236
3237        inputs.push({
3238            let mut edit = edit.clone();
3239            edit.own_transaction_id = ChildTransactionId::new();
3240            edit
3241        });
3242
3243        inputs.push(redact);
3244
3245        for _ in 0..98 {
3246            let mut edit = edit.clone();
3247            edit.own_transaction_id = ChildTransactionId::new();
3248            inputs.push(edit);
3249        }
3250
3251        let res = canonicalize_dependent_requests(&inputs);
3252
3253        assert_eq!(res.len(), 1);
3254        assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
3255        assert_eq!(res[0].parent_transaction_id, txn);
3256    }
3257
3258    #[test]
3259    fn test_canonicalize_dependent_events_last_edit_preferred() {
3260        let parent_txn = TransactionId::new();
3261
3262        // The latest edit of a list is always preferred.
3263        let inputs = (0..10)
3264            .map(|i| DependentQueuedRequest {
3265                own_transaction_id: ChildTransactionId::new(),
3266                parent_transaction_id: parent_txn.clone(),
3267                kind: DependentQueuedRequestKind::EditEvent {
3268                    new_content: SerializableEventContent::new(
3269                        &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
3270                    )
3271                    .unwrap(),
3272                },
3273                parent_key: None,
3274                created_at: MilliSecondsSinceUnixEpoch::now(),
3275            })
3276            .collect::<Vec<_>>();
3277
3278        let txn = inputs[9].parent_transaction_id.clone();
3279
3280        let res = canonicalize_dependent_requests(&inputs);
3281
3282        assert_eq!(res.len(), 1);
3283        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
3284        assert_let!(
3285            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
3286        );
3287        assert_eq!(msg.body(), "edit9");
3288        assert_eq!(res[0].parent_transaction_id, txn);
3289    }
3290
3291    #[test]
3292    fn test_canonicalize_multiple_local_echoes() {
3293        let txn1 = TransactionId::new();
3294        let txn2 = TransactionId::new();
3295
3296        let child1 = ChildTransactionId::new();
3297        let child2 = ChildTransactionId::new();
3298
3299        let inputs = vec![
3300            // This one pertains to txn1.
3301            DependentQueuedRequest {
3302                own_transaction_id: child1.clone(),
3303                kind: DependentQueuedRequestKind::RedactEvent,
3304                parent_transaction_id: txn1.clone(),
3305                parent_key: None,
3306                created_at: MilliSecondsSinceUnixEpoch::now(),
3307            },
3308            // This one pertains to txn2.
3309            DependentQueuedRequest {
3310                own_transaction_id: child2,
3311                kind: DependentQueuedRequestKind::EditEvent {
3312                    new_content: SerializableEventContent::new(
3313                        &RoomMessageEventContent::text_plain("edit").into(),
3314                    )
3315                    .unwrap(),
3316                },
3317                parent_transaction_id: txn2.clone(),
3318                parent_key: None,
3319                created_at: MilliSecondsSinceUnixEpoch::now(),
3320            },
3321        ];
3322
3323        let res = canonicalize_dependent_requests(&inputs);
3324
3325        // The canonicalization shouldn't depend per event id.
3326        assert_eq!(res.len(), 2);
3327
3328        for dependent in res {
3329            if dependent.own_transaction_id == child1 {
3330                assert_eq!(dependent.parent_transaction_id, txn1);
3331                assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
3332            } else {
3333                assert_eq!(dependent.parent_transaction_id, txn2);
3334                assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
3335            }
3336        }
3337    }
3338
3339    #[test]
3340    fn test_canonicalize_reactions_after_edits() {
3341        // Sending reactions should happen after edits to a given event.
3342        let txn = TransactionId::new();
3343
3344        let react_id = ChildTransactionId::new();
3345        let react = DependentQueuedRequest {
3346            own_transaction_id: react_id.clone(),
3347            kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
3348            parent_transaction_id: txn.clone(),
3349            parent_key: None,
3350            created_at: MilliSecondsSinceUnixEpoch::now(),
3351        };
3352
3353        let edit_id = ChildTransactionId::new();
3354        let edit = DependentQueuedRequest {
3355            own_transaction_id: edit_id.clone(),
3356            kind: DependentQueuedRequestKind::EditEvent {
3357                new_content: SerializableEventContent::new(
3358                    &RoomMessageEventContent::text_plain("edit").into(),
3359                )
3360                .unwrap(),
3361            },
3362            parent_transaction_id: txn,
3363            parent_key: None,
3364            created_at: MilliSecondsSinceUnixEpoch::now(),
3365        };
3366
3367        let res = canonicalize_dependent_requests(&[react, edit]);
3368
3369        assert_eq!(res.len(), 2);
3370        assert_eq!(res[0].own_transaction_id, edit_id);
3371        assert_eq!(res[1].own_transaction_id, react_id);
3372    }
3373}