matrix_sdk/send_queue/
mod.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A send queue facility to serializing queuing and sending of messages.
16//!
17//! # [`Room`] send queue
18//!
19//! Each room gets its own [`RoomSendQueue`], that's available by calling
20//! [`Room::send_queue()`]. The first time this method is called, it will spawn
21//! a background task that's used to actually send events, in the order they
22//! were passed from calls to [`RoomSendQueue::send()`].
23//!
24//! This queue tries to simplify error management around sending events, using
25//! [`RoomSendQueue::send`] or [`RoomSendQueue::send_raw`]: by default, it will retry to send the
26//! same event a few times, before automatically disabling itself, and emitting
27//! a notification that can be listened to with the global send queue (see
28//! paragraph below) or using [`RoomSendQueue::subscribe()`].
29//!
30//! It is possible to control whether a single room is enabled using
31//! [`RoomSendQueue::set_enabled()`].
32//!
33//! # Global [`SendQueue`] object
34//!
35//! The [`Client::send_queue()`] method returns an API object allowing to
36//! control all the room send queues:
37//!
38//! - enable/disable them all at once with [`SendQueue::set_enabled()`].
39//! - get notifications about send errors with [`SendQueue::subscribe_errors`].
40//! - reload all unsent events that had been persisted in storage using
41//!   [`SendQueue::respawn_tasks_for_rooms_with_unsent_requests()`]. It is
42//!   recommended to call this method during initialization of a client,
43//!   otherwise persisted unsent events will only be re-sent after the send
44//!   queue for the given room has been reopened for the first time.
45//!
46//! # Send handle
47//!
48//! Just after queuing a request to send something, a [`SendHandle`] is
49//! returned, allowing manipulating the inflight request.
50//!
51//! For a send handle for an event, it's possible to edit the event / abort
52//! sending it. If it was still in the queue (i.e. not sent yet, or not being
53//! sent), then such an action would happen locally (i.e. in the database).
54//! Otherwise, it is "too late": the background task may be sending
55//! the event already, or has sent it; in that case, the edit/aborting must
56//! happen as an actual event materializing this, on the server. To accomplish
57//! this, the send queue may send such an event, using the dependency system
58//! described below.
59//!
60//! # Dependency system
61//!
62//! The send queue includes a simple dependency system, where a
63//! [`QueuedRequest`] can have zero or more dependents in the form of
64//! [`DependentQueuedRequest`]. A dependent queued request can have at most one
65//! depended-upon (parent) queued request.
66//!
67//! This allows implementing deferred edits/redacts, as hinted to in the
68//! previous section.
69//!
70//! ## Media upload
71//!
72//! This dependency system also allows uploading medias, since the media's
73//! *content* must be uploaded before we send the media *event* that describes
74//! it.
75//!
76//! In the simplest case, that is, a media file and its event must be sent (i.e.
77//! no thumbnails):
78//!
79//! - The file's content is immediately cached in the
80//!   [`matrix_sdk_base::event_cache::store::EventCacheStore`], using an MXC ID
81//!   that is temporary and designates a local URI without any possible doubt.
82//! - An initial media event is created and uses this temporary MXC ID, and
83//!   propagated as a local echo for an event.
84//! - A [`QueuedRequest`] is pushed to upload the file's media
85//!   ([`QueuedRequestKind::MediaUpload`]).
86//! - A [`DependentQueuedRequest`] is pushed to finish the upload
87//!   ([`DependentQueuedRequestKind::FinishUpload`]).
88//!
89//! What is expected to happen, if all goes well, is the following:
90//!
91//! - the media is uploaded to the media homeserver, which returns the final MXC
92//!   ID.
93//! - when marking the upload request as sent, the MXC ID is injected (as a
94//!   [`matrix_sdk_base::store::SentRequestKey`]) into the dependent request
95//!   [`DependentQueuedRequestKind::FinishUpload`] created in the last step
96//!   above.
97//! - next time the send queue handles dependent queries, it'll see this one is
98//!   ready to be sent, and it will transform it into an event queued request
99//!   ([`QueuedRequestKind::Event`]), with the event created in the local echo
100//!   before, updated with the MXC ID returned from the server.
101//! - this updated local echo is also propagated as an edit of the local echo to
102//!   observers, who get the final version with the final MXC IDs at this point
103//!   too.
104//! - then the event is sent normally, as any event sent with the send queue.
105//!
106//! When there is a thumbnail, things behave similarly, with some tweaks:
107//!
108//! - the thumbnail's content is also stored into the cache store immediately,
109//! - the thumbnail is sent first as an [`QueuedRequestKind::MediaUpload`]
110//!   request,
111//! - the file upload is pushed as a dependent request of kind
112//!   [`DependentQueuedRequestKind::UploadFileWithThumbnail`] (this variant
113//!   keeps the file's key used to look it up in the cache store).
114//! - the media event is then sent as a dependent request as described in the
115//!   previous section.
116//!
117//! What's expected to happen is thus the following:
118//!
119//! - After the thumbnail has been uploaded, the dependent query will retrieve
120//!   the final MXC ID returned by the homeserver for the thumbnail, and store
121//!   it into the [`QueuedRequestKind::MediaUpload`]'s `thumbnail_source` field,
122//!   allowing to remember the thumbnail MXC ID when it's time to finish the
123//!   upload later.
124//! - The dependent request is morphed into another
125//!   [`QueuedRequestKind::MediaUpload`], for the file itself.
126//!
127//! The rest of the process is then similar to that of uploading a file without
128//! a thumbnail. The only difference is that there's a thumbnail source (MXC ID)
129//! remembered and fixed up into the media event, just before sending it.
130
131use std::{
132    collections::{BTreeMap, HashMap},
133    str::FromStr as _,
134    sync::{
135        atomic::{AtomicBool, Ordering},
136        Arc, RwLock,
137    },
138};
139
140use as_variant::as_variant;
141use matrix_sdk_base::{
142    event_cache::store::EventCacheStoreError,
143    media::MediaRequestParameters,
144    store::{
145        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
146        FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
147        SentMediaInfo, SentRequestKey, SerializableEventContent,
148    },
149    store_locks::LockStoreError,
150    RoomState, StoreError,
151};
152use matrix_sdk_common::executor::{spawn, JoinHandle};
153use mime::Mime;
154use ruma::{
155    events::{
156        reaction::ReactionEventContent,
157        relation::Annotation,
158        room::{
159            message::{FormattedBody, RoomMessageEventContent},
160            MediaSource,
161        },
162        AnyMessageLikeEventContent, EventContent as _, Mentions,
163    },
164    serde::Raw,
165    MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId,
166};
167use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
168use tracing::{debug, error, info, instrument, trace, warn};
169
170#[cfg(feature = "e2e-encryption")]
171use crate::crypto::{OlmError, SessionRecipientCollectionError};
172use crate::{
173    client::WeakClient,
174    config::RequestConfig,
175    error::RetryKind,
176    room::{edit::EditedContent, WeakRoom},
177    Client, Media, Room,
178};
179
180mod upload;
181
182/// A client-wide send queue, for all the rooms known by a client.
183pub struct SendQueue {
184    client: Client,
185}
186
187#[cfg(not(tarpaulin_include))]
188impl std::fmt::Debug for SendQueue {
189    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190        f.debug_struct("SendQueue").finish_non_exhaustive()
191    }
192}
193
194impl SendQueue {
195    pub(super) fn new(client: Client) -> Self {
196        Self { client }
197    }
198
199    /// Reload all the rooms which had unsent requests, and respawn tasks for
200    /// those rooms.
201    pub async fn respawn_tasks_for_rooms_with_unsent_requests(&self) {
202        if !self.is_enabled() {
203            return;
204        }
205
206        let room_ids =
207            self.client.state_store().load_rooms_with_unsent_requests().await.unwrap_or_else(
208                |err| {
209                    warn!("error when loading rooms with unsent requests: {err}");
210                    Vec::new()
211                },
212            );
213
214        // Getting the [`RoomSendQueue`] is sufficient to spawn the task if needs be.
215        for room_id in room_ids {
216            if let Some(room) = self.client.get_room(&room_id) {
217                let _ = self.for_room(room);
218            }
219        }
220    }
221
222    /// Tiny helper to get the send queue's global context from the [`Client`].
223    #[inline(always)]
224    fn data(&self) -> &SendQueueData {
225        &self.client.inner.send_queue_data
226    }
227
228    /// Get or create a new send queue for a given room, and insert it into our
229    /// memoized rooms mapping.
230    fn for_room(&self, room: Room) -> RoomSendQueue {
231        let data = self.data();
232
233        let mut map = data.rooms.write().unwrap();
234
235        let room_id = room.room_id();
236        if let Some(room_q) = map.get(room_id).cloned() {
237            return room_q;
238        }
239
240        let owned_room_id = room_id.to_owned();
241        let room_q = RoomSendQueue::new(
242            self.is_enabled(),
243            data.error_reporter.clone(),
244            data.is_dropping.clone(),
245            &self.client,
246            owned_room_id.clone(),
247        );
248
249        map.insert(owned_room_id, room_q.clone());
250
251        room_q
252    }
253
254    /// Enable or disable the send queue for the entire client, i.e. all rooms.
255    ///
256    /// If we're disabling the queue, and requests were being sent, they're not
257    /// aborted, and will continue until a status resolves (error responses
258    /// will keep the events in the buffer of events to send later). The
259    /// disablement will happen before the next request is sent.
260    ///
261    /// This may wake up background tasks and resume sending of requests in the
262    /// background.
263    pub async fn set_enabled(&self, enabled: bool) {
264        debug!(?enabled, "setting global send queue enablement");
265
266        self.data().globally_enabled.store(enabled, Ordering::SeqCst);
267
268        // Wake up individual rooms we already know about.
269        for room in self.data().rooms.read().unwrap().values() {
270            room.set_enabled(enabled);
271        }
272
273        // Reload some extra rooms that might not have been awaken yet, but could have
274        // requests from previous sessions.
275        self.respawn_tasks_for_rooms_with_unsent_requests().await;
276    }
277
278    /// Returns whether the send queue is enabled, at a client-wide
279    /// granularity.
280    pub fn is_enabled(&self) -> bool {
281        self.data().globally_enabled.load(Ordering::SeqCst)
282    }
283
284    /// A subscriber to the enablement status (enabled or disabled) of the
285    /// send queue, along with useful errors.
286    pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
287        self.data().error_reporter.subscribe()
288    }
289}
290
291/// A specific room's send queue ran into an error, and it has disabled itself.
292#[derive(Clone, Debug)]
293pub struct SendQueueRoomError {
294    /// For which room is the send queue failing?
295    pub room_id: OwnedRoomId,
296
297    /// The error the room has ran into, when trying to send a request.
298    pub error: Arc<crate::Error>,
299
300    /// Whether the error is considered recoverable or not.
301    ///
302    /// An error that's recoverable will disable the room's send queue, while an
303    /// unrecoverable error will be parked, until the user decides to do
304    /// something about it.
305    pub is_recoverable: bool,
306}
307
308impl Client {
309    /// Returns a [`SendQueue`] that handles sending, retrying and not
310    /// forgetting about requests that are to be sent.
311    pub fn send_queue(&self) -> SendQueue {
312        SendQueue::new(self.clone())
313    }
314}
315
316pub(super) struct SendQueueData {
317    /// Mapping of room to their unique send queue.
318    rooms: RwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
319
320    /// Is the whole mechanism enabled or disabled?
321    ///
322    /// This is only kept in memory to initialize new room queues with an
323    /// initial enablement state.
324    globally_enabled: AtomicBool,
325
326    /// Global error updates for the send queue.
327    error_reporter: broadcast::Sender<SendQueueRoomError>,
328
329    /// Are we currently dropping the Client?
330    is_dropping: Arc<AtomicBool>,
331}
332
333impl SendQueueData {
334    /// Create the data for a send queue, in the given enabled state.
335    pub fn new(globally_enabled: bool) -> Self {
336        let (sender, _) = broadcast::channel(32);
337
338        Self {
339            rooms: Default::default(),
340            globally_enabled: AtomicBool::new(globally_enabled),
341            error_reporter: sender,
342            is_dropping: Arc::new(false.into()),
343        }
344    }
345}
346
347impl Drop for SendQueueData {
348    fn drop(&mut self) {
349        // Mark the whole send queue as shutting down, then wake up all the room
350        // queues so they're stopped too.
351        debug!("globally dropping the send queue");
352        self.is_dropping.store(true, Ordering::SeqCst);
353
354        let rooms = self.rooms.read().unwrap();
355        for room in rooms.values() {
356            room.inner.notifier.notify_one();
357        }
358    }
359}
360
361impl Room {
362    /// Returns the [`RoomSendQueue`] for this specific room.
363    pub fn send_queue(&self) -> RoomSendQueue {
364        self.client.send_queue().for_room(self.clone())
365    }
366}
367
368/// A per-room send queue.
369///
370/// This is cheap to clone.
371#[derive(Clone)]
372pub struct RoomSendQueue {
373    inner: Arc<RoomSendQueueInner>,
374}
375
376#[cfg(not(tarpaulin_include))]
377impl std::fmt::Debug for RoomSendQueue {
378    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
379        f.debug_struct("RoomSendQueue").finish_non_exhaustive()
380    }
381}
382
383impl RoomSendQueue {
384    fn new(
385        globally_enabled: bool,
386        global_error_reporter: broadcast::Sender<SendQueueRoomError>,
387        is_dropping: Arc<AtomicBool>,
388        client: &Client,
389        room_id: OwnedRoomId,
390    ) -> Self {
391        let (updates_sender, _) = broadcast::channel(32);
392
393        let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone());
394        let notifier = Arc::new(Notify::new());
395
396        let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
397        let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
398
399        let task = spawn(Self::sending_task(
400            weak_room.clone(),
401            queue.clone(),
402            notifier.clone(),
403            updates_sender.clone(),
404            locally_enabled.clone(),
405            global_error_reporter,
406            is_dropping,
407        ));
408
409        Self {
410            inner: Arc::new(RoomSendQueueInner {
411                room: weak_room,
412                updates: updates_sender,
413                _task: task,
414                queue,
415                notifier,
416                locally_enabled,
417            }),
418        }
419    }
420
421    /// Queues a raw event for sending it to this room.
422    ///
423    /// This immediately returns, and will push the event to be sent into a
424    /// queue, handled in the background.
425    ///
426    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
427    /// the [`Self::subscribe()`] method to get updates about the sending of
428    /// that event.
429    ///
430    /// By default, if sending failed on the first attempt, it will be retried a
431    /// few times. If sending failed after those retries, the entire
432    /// client's sending queue will be disabled, and it will need to be
433    /// manually re-enabled by the caller (e.g. after network is back, or when
434    /// something has been done about the faulty requests).
435    pub async fn send_raw(
436        &self,
437        content: Raw<AnyMessageLikeEventContent>,
438        event_type: String,
439    ) -> Result<SendHandle, RoomSendQueueError> {
440        let Some(room) = self.inner.room.get() else {
441            return Err(RoomSendQueueError::RoomDisappeared);
442        };
443        if room.state() != RoomState::Joined {
444            return Err(RoomSendQueueError::RoomNotJoined);
445        }
446
447        let content = SerializableEventContent::from_raw(content, event_type);
448
449        let created_at = MilliSecondsSinceUnixEpoch::now();
450        let transaction_id = self.inner.queue.push(content.clone().into(), created_at).await?;
451        trace!(%transaction_id, "manager sends a raw event to the background task");
452
453        self.inner.notifier.notify_one();
454
455        let send_handle = SendHandle {
456            room: self.clone(),
457            transaction_id: transaction_id.clone(),
458            media_handles: None,
459            created_at,
460        };
461
462        let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
463            transaction_id,
464            content: LocalEchoContent::Event {
465                serialized_event: content,
466                send_handle: send_handle.clone(),
467                send_error: None,
468            },
469        }));
470
471        Ok(send_handle)
472    }
473
474    /// Queues an event for sending it to this room.
475    ///
476    /// This immediately returns, and will push the event to be sent into a
477    /// queue, handled in the background.
478    ///
479    /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling
480    /// the [`Self::subscribe()`] method to get updates about the sending of
481    /// that event.
482    ///
483    /// By default, if sending failed on the first attempt, it will be retried a
484    /// few times. If sending failed after those retries, the entire
485    /// client's sending queue will be disabled, and it will need to be
486    /// manually re-enabled by the caller (e.g. after network is back, or when
487    /// something has been done about the faulty requests).
488    pub async fn send(
489        &self,
490        content: AnyMessageLikeEventContent,
491    ) -> Result<SendHandle, RoomSendQueueError> {
492        self.send_raw(
493            Raw::new(&content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
494            content.event_type().to_string(),
495        )
496        .await
497    }
498
499    /// Returns the current local requests as well as a receiver to listen to
500    /// the send queue updates, as defined in [`RoomSendQueueUpdate`].
501    pub async fn subscribe(
502        &self,
503    ) -> Result<(Vec<LocalEcho>, broadcast::Receiver<RoomSendQueueUpdate>), RoomSendQueueError>
504    {
505        let local_echoes = self.inner.queue.local_echoes(self).await?;
506
507        Ok((local_echoes, self.inner.updates.subscribe()))
508    }
509
510    /// A task that must be spawned in the async runtime, running in the
511    /// background for each room that has a send queue.
512    ///
513    /// It only progresses forward: nothing can be cancelled at any point, which
514    /// makes the implementation not overly complicated to follow.
515    #[instrument(skip_all, fields(room_id = %room.room_id()))]
516    async fn sending_task(
517        room: WeakRoom,
518        queue: QueueStorage,
519        notifier: Arc<Notify>,
520        updates: broadcast::Sender<RoomSendQueueUpdate>,
521        locally_enabled: Arc<AtomicBool>,
522        global_error_reporter: broadcast::Sender<SendQueueRoomError>,
523        is_dropping: Arc<AtomicBool>,
524    ) {
525        trace!("spawned the sending task");
526
527        loop {
528            // A request to shut down should be preferred above everything else.
529            if is_dropping.load(Ordering::SeqCst) {
530                trace!("shutting down!");
531                break;
532            }
533
534            // Try to apply dependent requests now; those applying to previously failed
535            // attempts (local echoes) would succeed now.
536            let mut new_updates = Vec::new();
537            if let Err(err) = queue.apply_dependent_requests(&mut new_updates).await {
538                warn!("errors when applying dependent requests: {err}");
539            }
540
541            for up in new_updates {
542                let _ = updates.send(up);
543            }
544
545            if !locally_enabled.load(Ordering::SeqCst) {
546                trace!("not enabled, sleeping");
547                // Wait for an explicit wakeup.
548                notifier.notified().await;
549                continue;
550            }
551
552            let (queued_request, cancel_upload_rx) = match queue.peek_next_to_send().await {
553                Ok(Some(request)) => request,
554
555                Ok(None) => {
556                    trace!("queue is empty, sleeping");
557                    // Wait for an explicit wakeup.
558                    notifier.notified().await;
559                    continue;
560                }
561
562                Err(err) => {
563                    warn!("error when loading next request to send: {err}");
564                    continue;
565                }
566            };
567
568            let txn_id = queued_request.transaction_id.clone();
569            trace!(txn_id = %txn_id, "received a request to send!");
570
571            let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::MediaUpload { related_to, .. } => related_to.clone());
572
573            let Some(room) = room.get() else {
574                if is_dropping.load(Ordering::SeqCst) {
575                    break;
576                }
577                error!("the weak room couldn't be upgraded but we're not shutting down?");
578                continue;
579            };
580
581            match Self::handle_request(&room, queued_request, cancel_upload_rx).await {
582                Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
583                {
584                    Ok(()) => match parent_key {
585                        SentRequestKey::Event(event_id) => {
586                            let _ = updates.send(RoomSendQueueUpdate::SentEvent {
587                                transaction_id: txn_id,
588                                event_id,
589                            });
590                        }
591
592                        SentRequestKey::Media(media_info) => {
593                            let _ = updates.send(RoomSendQueueUpdate::UploadedMedia {
594                                related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
595                                file: media_info.file,
596                            });
597                        }
598                    },
599
600                    Err(err) => {
601                        warn!("unable to mark queued request as sent: {err}");
602                    }
603                },
604
605                Ok(None) => {
606                    debug!("Request has been aborted while running, continuing.");
607                }
608
609                Err(err) => {
610                    let is_recoverable = match err {
611                        crate::Error::Http(ref http_err) => {
612                            // All transient errors are recoverable.
613                            matches!(
614                                http_err.retry_kind(),
615                                RetryKind::Transient { .. } | RetryKind::NetworkFailure
616                            )
617                        }
618
619                        // `ConcurrentRequestFailed` typically happens because of an HTTP failure;
620                        // since we don't get the underlying error, be lax and consider it
621                        // recoverable, and let observers decide to retry it or not. At some point
622                        // we'll get the actual underlying error.
623                        crate::Error::ConcurrentRequestFailed => true,
624
625                        // As of 2024-06-27, all other error types are considered unrecoverable.
626                        _ => false,
627                    };
628
629                    // Disable the queue for this room after any kind of error happened.
630                    locally_enabled.store(false, Ordering::SeqCst);
631
632                    if is_recoverable {
633                        warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue");
634
635                        // In this case, we intentionally keep the request in the queue, but mark it
636                        // as not being sent anymore.
637                        queue.mark_as_not_being_sent(&txn_id).await;
638
639                        // Let observers know about a failure *after* we've
640                        // marked the item as not being sent anymore. Otherwise,
641                        // there's a possible race where a caller might try to
642                        // remove an item, while it's still marked as being
643                        // sent, resulting in a cancellation failure.
644                    } else {
645                        warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}");
646
647                        // Mark the request as wedged, so it's not picked at any future point.
648                        if let Err(storage_error) =
649                            queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await
650                        {
651                            warn!("unable to mark request as wedged: {storage_error}");
652                        }
653                    }
654
655                    let error = Arc::new(err);
656
657                    let _ = global_error_reporter.send(SendQueueRoomError {
658                        room_id: room.room_id().to_owned(),
659                        error: error.clone(),
660                        is_recoverable,
661                    });
662
663                    let _ = updates.send(RoomSendQueueUpdate::SendError {
664                        transaction_id: related_txn_id.unwrap_or(txn_id),
665                        error,
666                        is_recoverable,
667                    });
668                }
669            }
670        }
671
672        info!("exited sending task");
673    }
674
675    /// Handles a single request and returns the [`SentRequestKey`] on success
676    /// (unless the request was cancelled, in which case it'll return
677    /// `None`).
678    async fn handle_request(
679        room: &Room,
680        request: QueuedRequest,
681        cancel_upload_rx: Option<oneshot::Receiver<()>>,
682    ) -> Result<Option<SentRequestKey>, crate::Error> {
683        match request.kind {
684            QueuedRequestKind::Event { content } => {
685                let (event, event_type) = content.raw();
686
687                let res = room
688                    .send_raw(event_type, event)
689                    .with_transaction_id(&request.transaction_id)
690                    .with_request_config(RequestConfig::short_retry())
691                    .await?;
692
693                trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent");
694                Ok(Some(SentRequestKey::Event(res.event_id)))
695            }
696
697            QueuedRequestKind::MediaUpload {
698                content_type,
699                cache_key,
700                thumbnail_source,
701                related_to: relates_to,
702            } => {
703                trace!(%relates_to, "uploading media related to event");
704
705                let fut = async move {
706                    let mime = Mime::from_str(&content_type).map_err(|_| {
707                        crate::Error::SendQueueWedgeError(QueueWedgeError::InvalidMimeType {
708                            mime_type: content_type.clone(),
709                        })
710                    })?;
711
712                    let data = room
713                        .client()
714                        .event_cache_store()
715                        .lock()
716                        .await?
717                        .get_media_content(&cache_key)
718                        .await?
719                        .ok_or(crate::Error::SendQueueWedgeError(
720                            QueueWedgeError::MissingMediaContent,
721                        ))?;
722
723                    #[cfg(feature = "e2e-encryption")]
724                    let media_source = if room.latest_encryption_state().await?.is_encrypted() {
725                        trace!("upload will be encrypted (encrypted room)");
726                        let mut cursor = std::io::Cursor::new(data);
727                        let encrypted_file = room
728                            .client()
729                            .upload_encrypted_file(&mime, &mut cursor)
730                            .with_request_config(RequestConfig::short_retry())
731                            .await?;
732                        MediaSource::Encrypted(Box::new(encrypted_file))
733                    } else {
734                        trace!("upload will be in clear text (room without encryption)");
735                        let request_config = RequestConfig::short_retry()
736                            .timeout(Media::reasonable_upload_timeout(&data));
737                        let res =
738                            room.client().media().upload(&mime, data, Some(request_config)).await?;
739                        MediaSource::Plain(res.content_uri)
740                    };
741
742                    #[cfg(not(feature = "e2e-encryption"))]
743                    let media_source = {
744                        let request_config = RequestConfig::short_retry()
745                            .timeout(Media::reasonable_upload_timeout(&data));
746                        let res =
747                            room.client().media().upload(&mime, data, Some(request_config)).await?;
748                        MediaSource::Plain(res.content_uri)
749                    };
750
751                    let uri = match &media_source {
752                        MediaSource::Plain(uri) => uri,
753                        MediaSource::Encrypted(encrypted_file) => &encrypted_file.url,
754                    };
755                    trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded");
756
757                    Ok(SentRequestKey::Media(SentMediaInfo {
758                        file: media_source,
759                        thumbnail: thumbnail_source,
760                    }))
761                };
762
763                let wait_for_cancel = async move {
764                    if let Some(rx) = cancel_upload_rx {
765                        rx.await
766                    } else {
767                        std::future::pending().await
768                    }
769                };
770
771                tokio::select! {
772                    biased;
773
774                    _ = wait_for_cancel => {
775                        Ok(None)
776                    }
777
778                    res = fut => {
779                        res.map(Some)
780                    }
781                }
782            }
783        }
784    }
785
786    /// Returns whether the room is enabled, at the room level.
787    pub fn is_enabled(&self) -> bool {
788        self.inner.locally_enabled.load(Ordering::SeqCst)
789    }
790
791    /// Set the locally enabled flag for this room queue.
792    pub fn set_enabled(&self, enabled: bool) {
793        self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
794
795        // No need to wake a task to tell it it's been disabled, so only notify if we're
796        // re-enabling the queue.
797        if enabled {
798            self.inner.notifier.notify_one();
799        }
800    }
801}
802
803impl From<&crate::Error> for QueueWedgeError {
804    fn from(value: &crate::Error) -> Self {
805        match value {
806            #[cfg(feature = "e2e-encryption")]
807            crate::Error::OlmError(OlmError::SessionRecipientCollectionError(error)) => match error
808            {
809                SessionRecipientCollectionError::VerifiedUserHasUnsignedDevice(user_map) => {
810                    QueueWedgeError::InsecureDevices { user_device_map: user_map.clone() }
811                }
812
813                SessionRecipientCollectionError::VerifiedUserChangedIdentity(users) => {
814                    QueueWedgeError::IdentityViolations { users: users.clone() }
815                }
816
817                SessionRecipientCollectionError::CrossSigningNotSetup
818                | SessionRecipientCollectionError::SendingFromUnverifiedDevice => {
819                    QueueWedgeError::CrossVerificationRequired
820                }
821            },
822
823            // Flatten errors of `Self` type.
824            crate::Error::SendQueueWedgeError(error) => error.clone(),
825
826            _ => QueueWedgeError::GenericApiError { msg: value.to_string() },
827        }
828    }
829}
830
831struct RoomSendQueueInner {
832    /// The room which this send queue relates to.
833    room: WeakRoom,
834
835    /// Broadcaster for notifications about the statuses of requests to be sent.
836    ///
837    /// Can be subscribed to from the outside.
838    updates: broadcast::Sender<RoomSendQueueUpdate>,
839
840    /// Queue of requests that are either to be sent, or being sent.
841    ///
842    /// When a request has been sent to the server, it is removed from that
843    /// queue *after* being sent. That way, we will retry sending upon
844    /// failure, in the same order requests have been inserted in the first
845    /// place.
846    queue: QueueStorage,
847
848    /// A notifier that's updated any time common data is touched (stopped or
849    /// enabled statuses), or the associated room [`QueueStorage`].
850    notifier: Arc<Notify>,
851
852    /// Should the room process new requests or not (because e.g. it might be
853    /// running off the network)?
854    locally_enabled: Arc<AtomicBool>,
855
856    /// Handle to the actual sending task. Unused, but kept alive along this
857    /// data structure.
858    _task: JoinHandle<()>,
859}
860
861/// Information about a request being sent right this moment.
862struct BeingSentInfo {
863    /// Transaction id of the thing being sent.
864    transaction_id: OwnedTransactionId,
865
866    /// For an upload request, a trigger to cancel the upload before it
867    /// completes.
868    cancel_upload: Option<oneshot::Sender<()>>,
869}
870
871impl BeingSentInfo {
872    /// Aborts the upload, if a trigger is available.
873    ///
874    /// Consumes the object because the sender is a oneshot and will be consumed
875    /// upon sending.
876    fn cancel_upload(self) -> bool {
877        if let Some(cancel_upload) = self.cancel_upload {
878            let _ = cancel_upload.send(());
879            true
880        } else {
881            false
882        }
883    }
884}
885
886/// A specialized lock that guards both against the state store and the
887/// [`Self::being_sent`] data.
888#[derive(Clone)]
889struct StoreLock {
890    /// Reference to the client, to get access to the underlying store.
891    client: WeakClient,
892
893    /// The one queued request that is being sent at the moment, along with
894    /// associated data that can be useful to act upon it.
895    ///
896    /// Also used as the lock to access the state store.
897    being_sent: Arc<Mutex<Option<BeingSentInfo>>>,
898}
899
900impl StoreLock {
901    /// Gets a hold of the locked store and [`Self::being_sent`] pair.
902    async fn lock(&self) -> StoreLockGuard {
903        StoreLockGuard {
904            client: self.client.clone(),
905            being_sent: self.being_sent.clone().lock_owned().await,
906        }
907    }
908}
909
910/// A lock guard obtained through locking with [`StoreLock`].
911/// `being_sent` data.
912struct StoreLockGuard {
913    /// Reference to the client, to get access to the underlying store.
914    client: WeakClient,
915
916    /// The one queued request that is being sent at the moment, along with
917    /// associated data that can be useful to act upon it.
918    being_sent: OwnedMutexGuard<Option<BeingSentInfo>>,
919}
920
921impl StoreLockGuard {
922    /// Get a client from the locked state, useful to get a handle on a store.
923    fn client(&self) -> Result<Client, RoomSendQueueStorageError> {
924        self.client.get().ok_or(RoomSendQueueStorageError::ClientShuttingDown)
925    }
926}
927
928#[derive(Clone)]
929struct QueueStorage {
930    /// A lock to make sure the state store is only accessed once at a time, to
931    /// make some store operations atomic.
932    store: StoreLock,
933
934    /// To which room is this storage related.
935    room_id: OwnedRoomId,
936}
937
938impl QueueStorage {
939    /// Default priority for a queued request.
940    const LOW_PRIORITY: usize = 0;
941
942    /// High priority for a queued request that must be handled before others.
943    const HIGH_PRIORITY: usize = 10;
944
945    /// Create a new queue for queuing requests to be sent later.
946    fn new(client: WeakClient, room: OwnedRoomId) -> Self {
947        Self { room_id: room, store: StoreLock { client, being_sent: Default::default() } }
948    }
949
950    /// Push a new event to be sent in the queue, with a default priority of 0.
951    ///
952    /// Returns the transaction id chosen to identify the request.
953    async fn push(
954        &self,
955        request: QueuedRequestKind,
956        created_at: MilliSecondsSinceUnixEpoch,
957    ) -> Result<OwnedTransactionId, RoomSendQueueStorageError> {
958        let transaction_id = TransactionId::new();
959
960        self.store
961            .lock()
962            .await
963            .client()?
964            .state_store()
965            .save_send_queue_request(
966                &self.room_id,
967                transaction_id.clone(),
968                created_at,
969                request,
970                Self::LOW_PRIORITY,
971            )
972            .await?;
973
974        Ok(transaction_id)
975    }
976
977    /// Peeks the next request to be sent, marking it as being sent.
978    ///
979    /// It is required to call [`Self::mark_as_sent`] after it's been
980    /// effectively sent.
981    async fn peek_next_to_send(
982        &self,
983    ) -> Result<Option<(QueuedRequest, Option<oneshot::Receiver<()>>)>, RoomSendQueueStorageError>
984    {
985        let mut guard = self.store.lock().await;
986        let queued_requests =
987            guard.client()?.state_store().load_send_queue_requests(&self.room_id).await?;
988
989        if let Some(request) = queued_requests.iter().find(|queued| !queued.is_wedged()) {
990            let (cancel_upload_tx, cancel_upload_rx) =
991                if matches!(request.kind, QueuedRequestKind::MediaUpload { .. }) {
992                    let (tx, rx) = oneshot::channel();
993                    (Some(tx), Some(rx))
994                } else {
995                    Default::default()
996                };
997
998            let prev = guard.being_sent.replace(BeingSentInfo {
999                transaction_id: request.transaction_id.clone(),
1000                cancel_upload: cancel_upload_tx,
1001            });
1002
1003            if let Some(prev) = prev {
1004                error!(
1005                    prev_txn = ?prev.transaction_id,
1006                    "a previous request was still active while picking a new one"
1007                );
1008            }
1009
1010            Ok(Some((request.clone(), cancel_upload_rx)))
1011        } else {
1012            Ok(None)
1013        }
1014    }
1015
1016    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1017    /// with the given transaction id as not being sent anymore, so it can
1018    /// be removed from the queue later.
1019    async fn mark_as_not_being_sent(&self, transaction_id: &TransactionId) {
1020        let was_being_sent = self.store.lock().await.being_sent.take();
1021
1022        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1023        if prev_txn != Some(transaction_id) {
1024            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after transient error)");
1025        }
1026    }
1027
1028    /// Marks a request popped with [`Self::peek_next_to_send`] and identified
1029    /// with the given transaction id as being wedged (and not being sent
1030    /// anymore), so it can be removed from the queue later.
1031    async fn mark_as_wedged(
1032        &self,
1033        transaction_id: &TransactionId,
1034        reason: QueueWedgeError,
1035    ) -> Result<(), RoomSendQueueStorageError> {
1036        // Keep the lock until we're done touching the storage.
1037        let mut guard = self.store.lock().await;
1038        let was_being_sent = guard.being_sent.take();
1039
1040        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1041        if prev_txn != Some(transaction_id) {
1042            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after permanent error)");
1043        }
1044
1045        Ok(guard
1046            .client()?
1047            .state_store()
1048            .update_send_queue_request_status(&self.room_id, transaction_id, Some(reason))
1049            .await?)
1050    }
1051
1052    /// Marks a request identified with the given transaction id as being now
1053    /// unwedged and adds it back to the queue.
1054    async fn mark_as_unwedged(
1055        &self,
1056        transaction_id: &TransactionId,
1057    ) -> Result<(), RoomSendQueueStorageError> {
1058        Ok(self
1059            .store
1060            .lock()
1061            .await
1062            .client()?
1063            .state_store()
1064            .update_send_queue_request_status(&self.room_id, transaction_id, None)
1065            .await?)
1066    }
1067
1068    /// Marks a request pushed with [`Self::push`] and identified with the given
1069    /// transaction id as sent, by removing it from the local queue.
1070    async fn mark_as_sent(
1071        &self,
1072        transaction_id: &TransactionId,
1073        parent_key: SentRequestKey,
1074    ) -> Result<(), RoomSendQueueStorageError> {
1075        // Keep the lock until we're done touching the storage.
1076        let mut guard = self.store.lock().await;
1077        let was_being_sent = guard.being_sent.take();
1078
1079        let prev_txn = was_being_sent.as_ref().map(|info| info.transaction_id.as_ref());
1080        if prev_txn != Some(transaction_id) {
1081            error!(prev_txn = ?prev_txn, "previous active request didn't match that we expect (after successful send");
1082        }
1083
1084        let client = guard.client()?;
1085        let store = client.state_store();
1086
1087        // Update all dependent requests.
1088        store
1089            .mark_dependent_queued_requests_as_ready(&self.room_id, transaction_id, parent_key)
1090            .await?;
1091
1092        let removed = store.remove_send_queue_request(&self.room_id, transaction_id).await?;
1093
1094        if !removed {
1095            warn!(txn_id = %transaction_id, "request marked as sent was missing from storage");
1096        }
1097
1098        Ok(())
1099    }
1100
1101    /// Cancel a sending command for an event that has been sent with
1102    /// [`Self::push`] with the given transaction id.
1103    ///
1104    /// Returns whether the given transaction has been effectively removed. If
1105    /// false, this either means that the transaction id was unrelated to
1106    /// this queue, or that the request was sent before we cancelled it.
1107    async fn cancel_event(
1108        &self,
1109        transaction_id: &TransactionId,
1110    ) -> Result<bool, RoomSendQueueStorageError> {
1111        let guard = self.store.lock().await;
1112
1113        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1114            == Some(transaction_id)
1115        {
1116            // Save the intent to redact the event.
1117            guard
1118                .client()?
1119                .state_store()
1120                .save_dependent_queued_request(
1121                    &self.room_id,
1122                    transaction_id,
1123                    ChildTransactionId::new(),
1124                    MilliSecondsSinceUnixEpoch::now(),
1125                    DependentQueuedRequestKind::RedactEvent,
1126                )
1127                .await?;
1128
1129            return Ok(true);
1130        }
1131
1132        let removed = guard
1133            .client()?
1134            .state_store()
1135            .remove_send_queue_request(&self.room_id, transaction_id)
1136            .await?;
1137
1138        Ok(removed)
1139    }
1140
1141    /// Replace an event that has been sent with [`Self::push`] with the given
1142    /// transaction id, before it's been actually sent.
1143    ///
1144    /// Returns whether the given transaction has been effectively edited. If
1145    /// false, this either means that the transaction id was unrelated to
1146    /// this queue, or that the request was sent before we edited it.
1147    async fn replace_event(
1148        &self,
1149        transaction_id: &TransactionId,
1150        serializable: SerializableEventContent,
1151    ) -> Result<bool, RoomSendQueueStorageError> {
1152        let guard = self.store.lock().await;
1153
1154        if guard.being_sent.as_ref().map(|info| info.transaction_id.as_ref())
1155            == Some(transaction_id)
1156        {
1157            // Save the intent to edit the associated event.
1158            guard
1159                .client()?
1160                .state_store()
1161                .save_dependent_queued_request(
1162                    &self.room_id,
1163                    transaction_id,
1164                    ChildTransactionId::new(),
1165                    MilliSecondsSinceUnixEpoch::now(),
1166                    DependentQueuedRequestKind::EditEvent { new_content: serializable },
1167                )
1168                .await?;
1169
1170            return Ok(true);
1171        }
1172
1173        let edited = guard
1174            .client()?
1175            .state_store()
1176            .update_send_queue_request(&self.room_id, transaction_id, serializable.into())
1177            .await?;
1178
1179        Ok(edited)
1180    }
1181
1182    /// Push requests (and dependents) to upload a media.
1183    ///
1184    /// See the module-level description for details of the whole processus.
1185    #[allow(clippy::too_many_arguments)]
1186    async fn push_media(
1187        &self,
1188        event: RoomMessageEventContent,
1189        content_type: Mime,
1190        send_event_txn: OwnedTransactionId,
1191        created_at: MilliSecondsSinceUnixEpoch,
1192        upload_file_txn: OwnedTransactionId,
1193        file_media_request: MediaRequestParameters,
1194        thumbnail: Option<(FinishUploadThumbnailInfo, MediaRequestParameters, Mime)>,
1195    ) -> Result<(), RoomSendQueueStorageError> {
1196        let guard = self.store.lock().await;
1197        let client = guard.client()?;
1198        let store = client.state_store();
1199        let thumbnail_info =
1200            if let Some((thumbnail_info, thumbnail_media_request, thumbnail_content_type)) =
1201                thumbnail
1202            {
1203                let upload_thumbnail_txn = thumbnail_info.txn.clone();
1204
1205                // Save the thumbnail upload request.
1206                store
1207                    .save_send_queue_request(
1208                        &self.room_id,
1209                        upload_thumbnail_txn.clone(),
1210                        created_at,
1211                        QueuedRequestKind::MediaUpload {
1212                            content_type: thumbnail_content_type.to_string(),
1213                            cache_key: thumbnail_media_request,
1214                            thumbnail_source: None, // the thumbnail has no thumbnails :)
1215                            related_to: send_event_txn.clone(),
1216                        },
1217                        Self::LOW_PRIORITY,
1218                    )
1219                    .await?;
1220
1221                // Save the file upload request as a dependent request of the thumbnail upload.
1222                store
1223                    .save_dependent_queued_request(
1224                        &self.room_id,
1225                        &upload_thumbnail_txn,
1226                        upload_file_txn.clone().into(),
1227                        created_at,
1228                        DependentQueuedRequestKind::UploadFileWithThumbnail {
1229                            content_type: content_type.to_string(),
1230                            cache_key: file_media_request,
1231                            related_to: send_event_txn.clone(),
1232                        },
1233                    )
1234                    .await?;
1235
1236                Some(thumbnail_info)
1237            } else {
1238                // Save the file upload as its own request, not a dependent one.
1239                store
1240                    .save_send_queue_request(
1241                        &self.room_id,
1242                        upload_file_txn.clone(),
1243                        created_at,
1244                        QueuedRequestKind::MediaUpload {
1245                            content_type: content_type.to_string(),
1246                            cache_key: file_media_request,
1247                            thumbnail_source: None,
1248                            related_to: send_event_txn.clone(),
1249                        },
1250                        Self::LOW_PRIORITY,
1251                    )
1252                    .await?;
1253
1254                None
1255            };
1256
1257        // Push the dependent request for the event itself.
1258        store
1259            .save_dependent_queued_request(
1260                &self.room_id,
1261                &upload_file_txn,
1262                send_event_txn.into(),
1263                created_at,
1264                DependentQueuedRequestKind::FinishUpload {
1265                    local_echo: event,
1266                    file_upload: upload_file_txn.clone(),
1267                    thumbnail_info,
1268                },
1269            )
1270            .await?;
1271
1272        Ok(())
1273    }
1274
1275    /// Reacts to the given local echo of an event.
1276    #[instrument(skip(self))]
1277    async fn react(
1278        &self,
1279        transaction_id: &TransactionId,
1280        key: String,
1281        created_at: MilliSecondsSinceUnixEpoch,
1282    ) -> Result<Option<ChildTransactionId>, RoomSendQueueStorageError> {
1283        let guard = self.store.lock().await;
1284        let client = guard.client()?;
1285        let store = client.state_store();
1286
1287        let requests = store.load_send_queue_requests(&self.room_id).await?;
1288
1289        // If the target event has been already sent, abort immediately.
1290        if !requests.iter().any(|item| item.transaction_id == transaction_id) {
1291            // We didn't find it as a queued request; try to find it as a dependent queued
1292            // request.
1293            let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
1294            if !dependent_requests
1295                .into_iter()
1296                .filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
1297                .any(|child_txn| *child_txn == *transaction_id)
1298            {
1299                // We didn't find it as either a request or a dependent request, abort.
1300                return Ok(None);
1301            }
1302        }
1303
1304        // Record the dependent request.
1305        let reaction_txn_id = ChildTransactionId::new();
1306        store
1307            .save_dependent_queued_request(
1308                &self.room_id,
1309                transaction_id,
1310                reaction_txn_id.clone(),
1311                created_at,
1312                DependentQueuedRequestKind::ReactEvent { key },
1313            )
1314            .await?;
1315
1316        Ok(Some(reaction_txn_id))
1317    }
1318
1319    /// Returns a list of the local echoes, that is, all the requests that we're
1320    /// about to send but that haven't been sent yet (or are being sent).
1321    async fn local_echoes(
1322        &self,
1323        room: &RoomSendQueue,
1324    ) -> Result<Vec<LocalEcho>, RoomSendQueueStorageError> {
1325        let guard = self.store.lock().await;
1326        let client = guard.client()?;
1327        let store = client.state_store();
1328
1329        let local_requests =
1330            store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| {
1331                Some(LocalEcho {
1332                    transaction_id: queued.transaction_id.clone(),
1333                    content: match queued.kind {
1334                        QueuedRequestKind::Event { content } => LocalEchoContent::Event {
1335                            serialized_event: content,
1336                            send_handle: SendHandle {
1337                                room: room.clone(),
1338                                transaction_id: queued.transaction_id,
1339                                media_handles: None,
1340                                created_at: queued.created_at,
1341                            },
1342                            send_error: queued.error,
1343                        },
1344
1345                        QueuedRequestKind::MediaUpload { .. } => {
1346                            // Don't return uploaded medias as their own things; the accompanying
1347                            // event represented as a dependent request should be sufficient.
1348                            return None;
1349                        }
1350                    },
1351                })
1352            });
1353
1354        let reactions_and_medias = store
1355            .load_dependent_queued_requests(&self.room_id)
1356            .await?
1357            .into_iter()
1358            .filter_map(|dep| match dep.kind {
1359                DependentQueuedRequestKind::EditEvent { .. }
1360                | DependentQueuedRequestKind::RedactEvent => {
1361                    // TODO: reflect local edits/redacts too?
1362                    None
1363                }
1364
1365                DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho {
1366                    transaction_id: dep.own_transaction_id.clone().into(),
1367                    content: LocalEchoContent::React {
1368                        key,
1369                        send_handle: SendReactionHandle {
1370                            room: room.clone(),
1371                            transaction_id: dep.own_transaction_id,
1372                        },
1373                        applies_to: dep.parent_transaction_id,
1374                    },
1375                }),
1376
1377                DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => {
1378                    // Don't reflect these: only the associated event is interesting to observers.
1379                    None
1380                }
1381
1382                DependentQueuedRequestKind::FinishUpload {
1383                    local_echo,
1384                    file_upload,
1385                    thumbnail_info,
1386                } => {
1387                    // Materialize as an event local echo.
1388                    Some(LocalEcho {
1389                        transaction_id: dep.own_transaction_id.clone().into(),
1390                        content: LocalEchoContent::Event {
1391                            serialized_event: SerializableEventContent::new(&local_echo.into())
1392                                .ok()?,
1393                            send_handle: SendHandle {
1394                                room: room.clone(),
1395                                transaction_id: dep.own_transaction_id.into(),
1396                                media_handles: Some(MediaHandles {
1397                                    upload_thumbnail_txn: thumbnail_info.map(|info| info.txn),
1398                                    upload_file_txn: file_upload,
1399                                }),
1400                                created_at: dep.created_at,
1401                            },
1402                            send_error: None,
1403                        },
1404                    })
1405                }
1406            });
1407
1408        Ok(local_requests.chain(reactions_and_medias).collect())
1409    }
1410
1411    /// Try to apply a single dependent request, whether it's local or remote.
1412    ///
1413    /// This swallows errors that would retrigger every time if we retried
1414    /// applying the dependent request: invalid edit content, etc.
1415    ///
1416    /// Returns true if the dependent request has been sent (or should not be
1417    /// retried later).
1418    #[instrument(skip_all)]
1419    async fn try_apply_single_dependent_request(
1420        &self,
1421        client: &Client,
1422        dependent_request: DependentQueuedRequest,
1423        new_updates: &mut Vec<RoomSendQueueUpdate>,
1424    ) -> Result<bool, RoomSendQueueError> {
1425        let store = client.state_store();
1426
1427        let parent_key = dependent_request.parent_key;
1428
1429        match dependent_request.kind {
1430            DependentQueuedRequestKind::EditEvent { new_content } => {
1431                if let Some(parent_key) = parent_key {
1432                    let Some(event_id) = parent_key.into_event_id() else {
1433                        return Err(RoomSendQueueError::StorageError(
1434                            RoomSendQueueStorageError::InvalidParentKey,
1435                        ));
1436                    };
1437
1438                    // The parent event has been sent, so send an edit event.
1439                    let room = client
1440                        .get_room(&self.room_id)
1441                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1442
1443                    // Check the event is one we know how to edit with an edit event.
1444
1445                    // It must be deserializable…
1446                    let edited_content = match new_content.deserialize() {
1447                        Ok(AnyMessageLikeEventContent::RoomMessage(c)) => {
1448                            // Assume no relationships.
1449                            EditedContent::RoomMessage(c.into())
1450                        }
1451
1452                        Ok(AnyMessageLikeEventContent::UnstablePollStart(c)) => {
1453                            let poll_start = c.poll_start().clone();
1454                            EditedContent::PollStart {
1455                                fallback_text: poll_start.question.text.clone(),
1456                                new_content: poll_start,
1457                            }
1458                        }
1459
1460                        Ok(c) => {
1461                            warn!("Unsupported edit content type: {:?}", c.event_type());
1462                            return Ok(true);
1463                        }
1464
1465                        Err(err) => {
1466                            warn!("Unable to deserialize: {err}");
1467                            return Ok(true);
1468                        }
1469                    };
1470
1471                    let edit_event = match room.make_edit_event(&event_id, edited_content).await {
1472                        Ok(e) => e,
1473                        Err(err) => {
1474                            warn!("couldn't create edited event: {err}");
1475                            return Ok(true);
1476                        }
1477                    };
1478
1479                    // Queue the edit event in the send queue 🧠.
1480                    let serializable = SerializableEventContent::from_raw(
1481                        Raw::new(&edit_event)
1482                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1483                        edit_event.event_type().to_string(),
1484                    );
1485
1486                    store
1487                        .save_send_queue_request(
1488                            &self.room_id,
1489                            dependent_request.own_transaction_id.into(),
1490                            dependent_request.created_at,
1491                            serializable.into(),
1492                            Self::HIGH_PRIORITY,
1493                        )
1494                        .await
1495                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1496                } else {
1497                    // The parent event is still local; update the local echo.
1498                    let edited = store
1499                        .update_send_queue_request(
1500                            &self.room_id,
1501                            &dependent_request.parent_transaction_id,
1502                            new_content.into(),
1503                        )
1504                        .await
1505                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1506
1507                    if !edited {
1508                        warn!("missing local echo upon dependent edit");
1509                    }
1510                }
1511            }
1512
1513            DependentQueuedRequestKind::RedactEvent => {
1514                if let Some(parent_key) = parent_key {
1515                    let Some(event_id) = parent_key.into_event_id() else {
1516                        return Err(RoomSendQueueError::StorageError(
1517                            RoomSendQueueStorageError::InvalidParentKey,
1518                        ));
1519                    };
1520
1521                    // The parent event has been sent; send a redaction.
1522                    let room = client
1523                        .get_room(&self.room_id)
1524                        .ok_or(RoomSendQueueError::RoomDisappeared)?;
1525
1526                    // Ideally we'd use the send queue to send the redaction, but the protocol has
1527                    // changed the shape of a room.redaction after v11, so keep it simple and try
1528                    // once here.
1529
1530                    // Note: no reason is provided because we materialize the intent of "cancel
1531                    // sending the parent event".
1532
1533                    if let Err(err) = room
1534                        .redact(&event_id, None, Some(dependent_request.own_transaction_id.into()))
1535                        .await
1536                    {
1537                        warn!("error when sending a redact for {event_id}: {err}");
1538                        return Ok(false);
1539                    }
1540                } else {
1541                    // The parent event is still local (sending must have failed); redact the local
1542                    // echo.
1543                    let removed = store
1544                        .remove_send_queue_request(
1545                            &self.room_id,
1546                            &dependent_request.parent_transaction_id,
1547                        )
1548                        .await
1549                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1550
1551                    if !removed {
1552                        warn!("missing local echo upon dependent redact");
1553                    }
1554                }
1555            }
1556
1557            DependentQueuedRequestKind::ReactEvent { key } => {
1558                if let Some(parent_key) = parent_key {
1559                    let Some(parent_event_id) = parent_key.into_event_id() else {
1560                        return Err(RoomSendQueueError::StorageError(
1561                            RoomSendQueueStorageError::InvalidParentKey,
1562                        ));
1563                    };
1564
1565                    // Queue the reaction event in the send queue 🧠.
1566                    let react_event =
1567                        ReactionEventContent::new(Annotation::new(parent_event_id, key)).into();
1568                    let serializable = SerializableEventContent::from_raw(
1569                        Raw::new(&react_event)
1570                            .map_err(RoomSendQueueStorageError::JsonSerialization)?,
1571                        react_event.event_type().to_string(),
1572                    );
1573
1574                    store
1575                        .save_send_queue_request(
1576                            &self.room_id,
1577                            dependent_request.own_transaction_id.into(),
1578                            dependent_request.created_at,
1579                            serializable.into(),
1580                            Self::HIGH_PRIORITY,
1581                        )
1582                        .await
1583                        .map_err(RoomSendQueueStorageError::StateStoreError)?;
1584                } else {
1585                    // Not applied yet, we should retry later => false.
1586                    return Ok(false);
1587                }
1588            }
1589
1590            DependentQueuedRequestKind::UploadFileWithThumbnail {
1591                content_type,
1592                cache_key,
1593                related_to,
1594            } => {
1595                let Some(parent_key) = parent_key else {
1596                    // Not finished yet, we should retry later => false.
1597                    return Ok(false);
1598                };
1599                self.handle_dependent_file_upload_with_thumbnail(
1600                    client,
1601                    dependent_request.own_transaction_id.into(),
1602                    parent_key,
1603                    content_type,
1604                    cache_key,
1605                    related_to,
1606                )
1607                .await?;
1608            }
1609
1610            DependentQueuedRequestKind::FinishUpload {
1611                local_echo,
1612                file_upload,
1613                thumbnail_info,
1614            } => {
1615                let Some(parent_key) = parent_key else {
1616                    // Not finished yet, we should retry later => false.
1617                    return Ok(false);
1618                };
1619                self.handle_dependent_finish_upload(
1620                    client,
1621                    dependent_request.own_transaction_id.into(),
1622                    parent_key,
1623                    local_echo,
1624                    file_upload,
1625                    thumbnail_info,
1626                    new_updates,
1627                )
1628                .await?;
1629            }
1630        }
1631
1632        Ok(true)
1633    }
1634
1635    #[instrument(skip(self))]
1636    async fn apply_dependent_requests(
1637        &self,
1638        new_updates: &mut Vec<RoomSendQueueUpdate>,
1639    ) -> Result<(), RoomSendQueueError> {
1640        let guard = self.store.lock().await;
1641
1642        let client = guard.client()?;
1643        let store = client.state_store();
1644
1645        let dependent_requests = store
1646            .load_dependent_queued_requests(&self.room_id)
1647            .await
1648            .map_err(RoomSendQueueStorageError::StateStoreError)?;
1649
1650        let num_initial_dependent_requests = dependent_requests.len();
1651        if num_initial_dependent_requests == 0 {
1652            // Returning early here avoids a bit of useless logging.
1653            return Ok(());
1654        }
1655
1656        let canonicalized_dependent_requests = canonicalize_dependent_requests(&dependent_requests);
1657
1658        // Get rid of the all non-canonical dependent events.
1659        for original in &dependent_requests {
1660            if !canonicalized_dependent_requests
1661                .iter()
1662                .any(|canonical| canonical.own_transaction_id == original.own_transaction_id)
1663            {
1664                store
1665                    .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id)
1666                    .await
1667                    .map_err(RoomSendQueueStorageError::StateStoreError)?;
1668            }
1669        }
1670
1671        let mut num_dependent_requests = canonicalized_dependent_requests.len();
1672
1673        debug!(
1674            num_dependent_requests,
1675            num_initial_dependent_requests, "starting handling of dependent requests"
1676        );
1677
1678        for dependent in canonicalized_dependent_requests {
1679            let dependent_id = dependent.own_transaction_id.clone();
1680
1681            match self.try_apply_single_dependent_request(&client, dependent, new_updates).await {
1682                Ok(should_remove) => {
1683                    if should_remove {
1684                        // The dependent request has been successfully applied, forget about it.
1685                        store
1686                            .remove_dependent_queued_request(&self.room_id, &dependent_id)
1687                            .await
1688                            .map_err(RoomSendQueueStorageError::StateStoreError)?;
1689
1690                        num_dependent_requests -= 1;
1691                    }
1692                }
1693
1694                Err(err) => {
1695                    warn!("error when applying single dependent request: {err}");
1696                }
1697            }
1698        }
1699
1700        debug!(
1701            leftover_dependent_requests = num_dependent_requests,
1702            "stopped handling dependent request"
1703        );
1704
1705        Ok(())
1706    }
1707
1708    /// Remove a single dependent request from storage.
1709    async fn remove_dependent_send_queue_request(
1710        &self,
1711        dependent_event_id: &ChildTransactionId,
1712    ) -> Result<bool, RoomSendQueueStorageError> {
1713        Ok(self
1714            .store
1715            .lock()
1716            .await
1717            .client()?
1718            .state_store()
1719            .remove_dependent_queued_request(&self.room_id, dependent_event_id)
1720            .await?)
1721    }
1722}
1723
1724/// The content of a local echo.
1725#[derive(Clone, Debug)]
1726pub enum LocalEchoContent {
1727    /// The local echo contains an actual event ready to display.
1728    Event {
1729        /// Content of the event itself (along with its type) that we are about
1730        /// to send.
1731        serialized_event: SerializableEventContent,
1732        /// A handle to manipulate the sending of the associated event.
1733        send_handle: SendHandle,
1734        /// Whether trying to send this local echo failed in the past with an
1735        /// unrecoverable error (see [`SendQueueRoomError::is_recoverable`]).
1736        send_error: Option<QueueWedgeError>,
1737    },
1738
1739    /// A local echo has been reacted to.
1740    React {
1741        /// The key with which the local echo has been reacted to.
1742        key: String,
1743        /// A handle to manipulate the sending of the reaction.
1744        send_handle: SendReactionHandle,
1745        /// The local echo which has been reacted to.
1746        applies_to: OwnedTransactionId,
1747    },
1748}
1749
1750/// A local representation for a request that hasn't been sent yet to the user's
1751/// homeserver.
1752#[derive(Clone, Debug)]
1753pub struct LocalEcho {
1754    /// Transaction id used to identify the associated request.
1755    pub transaction_id: OwnedTransactionId,
1756    /// The content for the local echo.
1757    pub content: LocalEchoContent,
1758}
1759
1760/// An update to a room send queue, observable with
1761/// [`RoomSendQueue::subscribe`].
1762#[derive(Clone, Debug)]
1763pub enum RoomSendQueueUpdate {
1764    /// A new local event is being sent.
1765    ///
1766    /// There's been a user query to create this event. It is being sent to the
1767    /// server.
1768    NewLocalEvent(LocalEcho),
1769
1770    /// A local event that hadn't been sent to the server yet has been cancelled
1771    /// before sending.
1772    CancelledLocalEvent {
1773        /// Transaction id used to identify this event.
1774        transaction_id: OwnedTransactionId,
1775    },
1776
1777    /// A local event's content has been replaced with something else.
1778    ReplacedLocalEvent {
1779        /// Transaction id used to identify this event.
1780        transaction_id: OwnedTransactionId,
1781
1782        /// The new content replacing the previous one.
1783        new_content: SerializableEventContent,
1784    },
1785
1786    /// An error happened when an event was being sent.
1787    ///
1788    /// The event has not been removed from the queue. All the send queues
1789    /// will be disabled after this happens, and must be manually re-enabled.
1790    SendError {
1791        /// Transaction id used to identify this event.
1792        transaction_id: OwnedTransactionId,
1793        /// Error received while sending the event.
1794        error: Arc<crate::Error>,
1795        /// Whether the error is considered recoverable or not.
1796        ///
1797        /// An error that's recoverable will disable the room's send queue,
1798        /// while an unrecoverable error will be parked, until the user
1799        /// decides to cancel sending it.
1800        is_recoverable: bool,
1801    },
1802
1803    /// The event has been unwedged and sending is now being retried.
1804    RetryEvent {
1805        /// Transaction id used to identify this event.
1806        transaction_id: OwnedTransactionId,
1807    },
1808
1809    /// The event has been sent to the server, and the query returned
1810    /// successfully.
1811    SentEvent {
1812        /// Transaction id used to identify this event.
1813        transaction_id: OwnedTransactionId,
1814        /// Received event id from the send response.
1815        event_id: OwnedEventId,
1816    },
1817
1818    /// A media has been successfully uploaded.
1819    UploadedMedia {
1820        /// The media event this uploaded media relates to.
1821        related_to: OwnedTransactionId,
1822
1823        /// The final media source for the file that was just uploaded.
1824        file: MediaSource,
1825    },
1826}
1827
1828/// An error triggered by the send queue module.
1829#[derive(Debug, thiserror::Error)]
1830pub enum RoomSendQueueError {
1831    /// The room isn't in the joined state.
1832    #[error("the room isn't in the joined state")]
1833    RoomNotJoined,
1834
1835    /// The room is missing from the client.
1836    ///
1837    /// This happens only whenever the client is shutting down.
1838    #[error("the room is now missing from the client")]
1839    RoomDisappeared,
1840
1841    /// Error coming from storage.
1842    #[error(transparent)]
1843    StorageError(#[from] RoomSendQueueStorageError),
1844}
1845
1846/// An error triggered by the send queue storage.
1847#[derive(Debug, thiserror::Error)]
1848pub enum RoomSendQueueStorageError {
1849    /// Error caused by the state store.
1850    #[error(transparent)]
1851    StateStoreError(#[from] StoreError),
1852
1853    /// Error caused by the event cache store.
1854    #[error(transparent)]
1855    EventCacheStoreError(#[from] EventCacheStoreError),
1856
1857    /// Error caused when attempting to get a handle on the event cache store.
1858    #[error(transparent)]
1859    LockError(#[from] LockStoreError),
1860
1861    /// Error caused when (de)serializing into/from json.
1862    #[error(transparent)]
1863    JsonSerialization(#[from] serde_json::Error),
1864
1865    /// A parent key was expected to be of a certain type, and it was another
1866    /// type instead.
1867    #[error("a dependent event had an invalid parent key type")]
1868    InvalidParentKey,
1869
1870    /// The client is shutting down.
1871    #[error("The client is shutting down.")]
1872    ClientShuttingDown,
1873
1874    /// An operation not implemented on a send handle.
1875    #[error("This operation is not implemented for media uploads")]
1876    OperationNotImplementedYet,
1877
1878    /// Trying to edit a media caption for something that's not a media.
1879    #[error("Can't edit a media caption when the underlying event isn't a media")]
1880    InvalidMediaCaptionEdit,
1881}
1882
1883/// Extra transaction IDs useful during an upload.
1884#[derive(Clone, Debug)]
1885struct MediaHandles {
1886    /// Transaction id used when uploading the thumbnail.
1887    ///
1888    /// Optional because a media can be uploaded without a thumbnail.
1889    upload_thumbnail_txn: Option<OwnedTransactionId>,
1890
1891    /// Transaction id used when uploading the media itself.
1892    upload_file_txn: OwnedTransactionId,
1893}
1894
1895/// A handle to manipulate an event that was scheduled to be sent to a room.
1896#[derive(Clone, Debug)]
1897pub struct SendHandle {
1898    /// Link to the send queue used to send this request.
1899    room: RoomSendQueue,
1900
1901    /// Transaction id used for the sent request.
1902    ///
1903    /// If this is a media upload, this is the "main" transaction id, i.e. the
1904    /// one used to send the event, and that will be seen by observers.
1905    transaction_id: OwnedTransactionId,
1906
1907    /// Additional handles for a media upload.
1908    media_handles: Option<MediaHandles>,
1909
1910    /// The time at which the event to be sent has been created.
1911    pub created_at: MilliSecondsSinceUnixEpoch,
1912}
1913
1914impl SendHandle {
1915    fn nyi_for_uploads(&self) -> Result<(), RoomSendQueueStorageError> {
1916        if self.media_handles.is_some() {
1917            Err(RoomSendQueueStorageError::OperationNotImplementedYet)
1918        } else {
1919            Ok(())
1920        }
1921    }
1922
1923    /// Aborts the sending of the event, if it wasn't sent yet.
1924    ///
1925    /// Returns true if the sending could be aborted, false if not (i.e. the
1926    /// event had already been sent).
1927    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1928    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
1929        trace!("received an abort request");
1930
1931        let queue = &self.room.inner.queue;
1932
1933        if let Some(handles) = &self.media_handles {
1934            if queue.abort_upload(&self.transaction_id, handles).await? {
1935                // Propagate a cancelled update.
1936                let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1937                    transaction_id: self.transaction_id.clone(),
1938                });
1939
1940                return Ok(true);
1941            }
1942
1943            // If it failed, it means the sending of the event is not a
1944            // dependent request anymore. Fall back to the regular
1945            // code path below, that handles aborting sending of an event.
1946        }
1947
1948        if queue.cancel_event(&self.transaction_id).await? {
1949            trace!("successful abort");
1950
1951            // Propagate a cancelled update too.
1952            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
1953                transaction_id: self.transaction_id.clone(),
1954            });
1955
1956            Ok(true)
1957        } else {
1958            debug!("local echo didn't exist anymore, can't abort");
1959            Ok(false)
1960        }
1961    }
1962
1963    /// Edits the content of a local echo with a raw event content.
1964    ///
1965    /// Returns true if the event to be sent was replaced, false if not (i.e.
1966    /// the event had already been sent).
1967    #[instrument(skip(self, new_content), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
1968    pub async fn edit_raw(
1969        &self,
1970        new_content: Raw<AnyMessageLikeEventContent>,
1971        event_type: String,
1972    ) -> Result<bool, RoomSendQueueStorageError> {
1973        trace!("received an edit request");
1974        self.nyi_for_uploads()?;
1975
1976        let serializable = SerializableEventContent::from_raw(new_content, event_type);
1977
1978        if self.room.inner.queue.replace_event(&self.transaction_id, serializable.clone()).await? {
1979            trace!("successful edit");
1980
1981            // Wake up the queue, in case the room was asleep before the edit.
1982            self.room.inner.notifier.notify_one();
1983
1984            // Propagate a replaced update too.
1985            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
1986                transaction_id: self.transaction_id.clone(),
1987                new_content: serializable,
1988            });
1989
1990            Ok(true)
1991        } else {
1992            debug!("local echo doesn't exist anymore, can't edit");
1993            Ok(false)
1994        }
1995    }
1996
1997    /// Edits the content of a local echo with an event content.
1998    ///
1999    /// Returns true if the event to be sent was replaced, false if not (i.e.
2000    /// the event had already been sent).
2001    pub async fn edit(
2002        &self,
2003        new_content: AnyMessageLikeEventContent,
2004    ) -> Result<bool, RoomSendQueueStorageError> {
2005        self.edit_raw(
2006            Raw::new(&new_content).map_err(RoomSendQueueStorageError::JsonSerialization)?,
2007            new_content.event_type().to_string(),
2008        )
2009        .await
2010    }
2011
2012    /// Edits the content of a local echo with a media caption.
2013    ///
2014    /// Will fail if the event to be sent, represented by this send handle,
2015    /// wasn't a media.
2016    pub async fn edit_media_caption(
2017        &self,
2018        caption: Option<String>,
2019        formatted_caption: Option<FormattedBody>,
2020        mentions: Option<Mentions>,
2021    ) -> Result<bool, RoomSendQueueStorageError> {
2022        if let Some(new_content) = self
2023            .room
2024            .inner
2025            .queue
2026            .edit_media_caption(&self.transaction_id, caption, formatted_caption, mentions)
2027            .await?
2028        {
2029            trace!("successful edit of media caption");
2030
2031            // Wake up the queue, in case the room was asleep before the edit.
2032            self.room.inner.notifier.notify_one();
2033
2034            let new_content = SerializableEventContent::new(&new_content)
2035                .map_err(RoomSendQueueStorageError::JsonSerialization)?;
2036
2037            // Propagate a replaced update too.
2038            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent {
2039                transaction_id: self.transaction_id.clone(),
2040                new_content,
2041            });
2042
2043            Ok(true)
2044        } else {
2045            debug!("local echo doesn't exist anymore, can't edit media caption");
2046            Ok(false)
2047        }
2048    }
2049
2050    /// Unwedge a local echo identified by its transaction identifier and try to
2051    /// resend it.
2052    pub async fn unwedge(&self) -> Result<(), RoomSendQueueError> {
2053        let room = &self.room.inner;
2054        room.queue
2055            .mark_as_unwedged(&self.transaction_id)
2056            .await
2057            .map_err(RoomSendQueueError::StorageError)?;
2058
2059        // If we have media handles, also try to unwedge them.
2060        //
2061        // It's fine to always do it to *all* the transaction IDs at once, because only
2062        // one of the three requests will be active at the same time, i.e. only
2063        // one entry will be updated in the store. The other two are either
2064        // done, or dependent requests.
2065
2066        if let Some(handles) = &self.media_handles {
2067            room.queue
2068                .mark_as_unwedged(&handles.upload_file_txn)
2069                .await
2070                .map_err(RoomSendQueueError::StorageError)?;
2071
2072            if let Some(txn) = &handles.upload_thumbnail_txn {
2073                room.queue.mark_as_unwedged(txn).await.map_err(RoomSendQueueError::StorageError)?;
2074            }
2075        }
2076
2077        // Wake up the queue, in case the room was asleep before unwedging the request.
2078        room.notifier.notify_one();
2079
2080        let _ = room
2081            .updates
2082            .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() });
2083
2084        Ok(())
2085    }
2086
2087    /// Send a reaction to the event as soon as it's sent.
2088    ///
2089    /// If returning `Ok(None)`; this means the reaction couldn't be sent
2090    /// because the event is already a remote one.
2091    #[instrument(skip(self), fields(room_id = %self.room.inner.room.room_id(), txn_id = %self.transaction_id))]
2092    pub async fn react(
2093        &self,
2094        key: String,
2095    ) -> Result<Option<SendReactionHandle>, RoomSendQueueStorageError> {
2096        trace!("received an intent to react");
2097
2098        let created_at = MilliSecondsSinceUnixEpoch::now();
2099        if let Some(reaction_txn_id) =
2100            self.room.inner.queue.react(&self.transaction_id, key.clone(), created_at).await?
2101        {
2102            trace!("successfully queued react");
2103
2104            // Wake up the queue, in case the room was asleep before the sending.
2105            self.room.inner.notifier.notify_one();
2106
2107            // Propagate a new local event.
2108            let send_handle = SendReactionHandle {
2109                room: self.room.clone(),
2110                transaction_id: reaction_txn_id.clone(),
2111            };
2112
2113            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
2114                // Note: we do want to use the txn_id we're going to use for the reaction, not the
2115                // one for the event we're reacting to.
2116                transaction_id: reaction_txn_id.into(),
2117                content: LocalEchoContent::React {
2118                    key,
2119                    send_handle: send_handle.clone(),
2120                    applies_to: self.transaction_id.clone(),
2121                },
2122            }));
2123
2124            Ok(Some(send_handle))
2125        } else {
2126            debug!("local echo doesn't exist anymore, can't react");
2127            Ok(None)
2128        }
2129    }
2130}
2131
2132/// A handle to execute actions on the sending of a reaction.
2133#[derive(Clone, Debug)]
2134pub struct SendReactionHandle {
2135    /// Reference to the send queue for the room where this reaction was sent.
2136    room: RoomSendQueue,
2137    /// The own transaction id for the reaction.
2138    transaction_id: ChildTransactionId,
2139}
2140
2141impl SendReactionHandle {
2142    /// Abort the sending of the reaction.
2143    ///
2144    /// Will return true if the reaction could be aborted, false if it's been
2145    /// sent (and there's no matching local echo anymore).
2146    pub async fn abort(&self) -> Result<bool, RoomSendQueueStorageError> {
2147        if self.room.inner.queue.remove_dependent_send_queue_request(&self.transaction_id).await? {
2148            // Simple case: the reaction was found in the dependent event list.
2149
2150            // Propagate a cancelled update too.
2151            let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent {
2152                transaction_id: self.transaction_id.clone().into(),
2153            });
2154
2155            return Ok(true);
2156        }
2157
2158        // The reaction has already been queued for sending, try to abort it using a
2159        // regular abort.
2160        let handle = SendHandle {
2161            room: self.room.clone(),
2162            transaction_id: self.transaction_id.clone().into(),
2163            media_handles: None,
2164            created_at: MilliSecondsSinceUnixEpoch::now(),
2165        };
2166
2167        handle.abort().await
2168    }
2169
2170    /// The transaction id that will be used to send this reaction later.
2171    pub fn transaction_id(&self) -> &TransactionId {
2172        &self.transaction_id
2173    }
2174}
2175
2176/// From a given source of [`DependentQueuedRequest`], return only the most
2177/// meaningful, i.e. the ones that wouldn't be overridden after applying the
2178/// others.
2179fn canonicalize_dependent_requests(
2180    dependent: &[DependentQueuedRequest],
2181) -> Vec<DependentQueuedRequest> {
2182    let mut by_txn = HashMap::<OwnedTransactionId, Vec<&DependentQueuedRequest>>::new();
2183
2184    for d in dependent {
2185        let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default();
2186
2187        if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) {
2188            // The parent event has already been flagged for redaction, don't consider the
2189            // other dependent events.
2190            continue;
2191        }
2192
2193        match &d.kind {
2194            DependentQueuedRequestKind::EditEvent { .. } => {
2195                // Replace any previous edit with this one.
2196                if let Some(prev_edit) = prevs
2197                    .iter_mut()
2198                    .find(|prev| matches!(prev.kind, DependentQueuedRequestKind::EditEvent { .. }))
2199                {
2200                    *prev_edit = d;
2201                } else {
2202                    prevs.insert(0, d);
2203                }
2204            }
2205
2206            DependentQueuedRequestKind::UploadFileWithThumbnail { .. }
2207            | DependentQueuedRequestKind::FinishUpload { .. }
2208            | DependentQueuedRequestKind::ReactEvent { .. } => {
2209                // These requests can't be canonicalized, push them as is.
2210                prevs.push(d);
2211            }
2212
2213            DependentQueuedRequestKind::RedactEvent => {
2214                // Remove every other dependent action.
2215                prevs.clear();
2216                prevs.push(d);
2217            }
2218        }
2219    }
2220
2221    by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect()
2222}
2223
2224#[cfg(all(test, not(target_arch = "wasm32")))]
2225mod tests {
2226    use std::{sync::Arc, time::Duration};
2227
2228    use assert_matches2::{assert_let, assert_matches};
2229    use matrix_sdk_base::store::{
2230        ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind,
2231        SerializableEventContent,
2232    };
2233    use matrix_sdk_test::{async_test, JoinedRoomBuilder, SyncResponseBuilder};
2234    use ruma::{
2235        events::{room::message::RoomMessageEventContent, AnyMessageLikeEventContent},
2236        room_id, MilliSecondsSinceUnixEpoch, TransactionId,
2237    };
2238
2239    use super::canonicalize_dependent_requests;
2240    use crate::{client::WeakClient, test_utils::logged_in_client};
2241
2242    #[test]
2243    fn test_canonicalize_dependent_events_created_at() {
2244        // Test to ensure the created_at field is being serialized and retrieved
2245        // correctly.
2246        let txn = TransactionId::new();
2247        let created_at = MilliSecondsSinceUnixEpoch::now();
2248
2249        let edit = DependentQueuedRequest {
2250            own_transaction_id: ChildTransactionId::new(),
2251            parent_transaction_id: txn.clone(),
2252            kind: DependentQueuedRequestKind::EditEvent {
2253                new_content: SerializableEventContent::new(
2254                    &RoomMessageEventContent::text_plain("edit").into(),
2255                )
2256                .unwrap(),
2257            },
2258            parent_key: None,
2259            created_at,
2260        };
2261
2262        let res = canonicalize_dependent_requests(&[edit]);
2263
2264        assert_eq!(res.len(), 1);
2265        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2266        assert_let!(
2267            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2268        );
2269        assert_eq!(msg.body(), "edit");
2270        assert_eq!(res[0].parent_transaction_id, txn);
2271        assert_eq!(res[0].created_at, created_at);
2272    }
2273
2274    #[async_test]
2275    async fn test_client_no_cycle_with_send_queue() {
2276        for enabled in [true, false] {
2277            let client = logged_in_client(None).await;
2278            let weak_client = WeakClient::from_client(&client);
2279
2280            {
2281                let mut sync_response_builder = SyncResponseBuilder::new();
2282
2283                let room_id = room_id!("!a:b.c");
2284
2285                // Make sure the client knows about the room.
2286                client
2287                    .base_client()
2288                    .receive_sync_response(
2289                        sync_response_builder
2290                            .add_joined_room(JoinedRoomBuilder::new(room_id))
2291                            .build_sync_response(),
2292                    )
2293                    .await
2294                    .unwrap();
2295
2296                let room = client.get_room(room_id).unwrap();
2297                let q = room.send_queue();
2298
2299                let _watcher = q.subscribe().await;
2300
2301                client.send_queue().set_enabled(enabled).await;
2302            }
2303
2304            drop(client);
2305
2306            // Give a bit of time for background tasks to die.
2307            tokio::time::sleep(Duration::from_millis(500)).await;
2308
2309            // The weak client must be the last reference to the client now.
2310            let client = weak_client.get();
2311            assert!(
2312                client.is_none(),
2313                "too many strong references to the client: {}",
2314                Arc::strong_count(&client.unwrap().inner)
2315            );
2316        }
2317    }
2318
2319    #[test]
2320    fn test_canonicalize_dependent_events_smoke_test() {
2321        // Smoke test: canonicalizing a single dependent event returns it.
2322        let txn = TransactionId::new();
2323
2324        let edit = DependentQueuedRequest {
2325            own_transaction_id: ChildTransactionId::new(),
2326            parent_transaction_id: txn.clone(),
2327            kind: DependentQueuedRequestKind::EditEvent {
2328                new_content: SerializableEventContent::new(
2329                    &RoomMessageEventContent::text_plain("edit").into(),
2330                )
2331                .unwrap(),
2332            },
2333            parent_key: None,
2334            created_at: MilliSecondsSinceUnixEpoch::now(),
2335        };
2336        let res = canonicalize_dependent_requests(&[edit]);
2337
2338        assert_eq!(res.len(), 1);
2339        assert_matches!(&res[0].kind, DependentQueuedRequestKind::EditEvent { .. });
2340        assert_eq!(res[0].parent_transaction_id, txn);
2341        assert!(res[0].parent_key.is_none());
2342    }
2343
2344    #[test]
2345    fn test_canonicalize_dependent_events_redaction_preferred() {
2346        // A redaction is preferred over any other kind of dependent event.
2347        let txn = TransactionId::new();
2348
2349        let mut inputs = Vec::with_capacity(100);
2350        let redact = DependentQueuedRequest {
2351            own_transaction_id: ChildTransactionId::new(),
2352            parent_transaction_id: txn.clone(),
2353            kind: DependentQueuedRequestKind::RedactEvent,
2354            parent_key: None,
2355            created_at: MilliSecondsSinceUnixEpoch::now(),
2356        };
2357
2358        let edit = DependentQueuedRequest {
2359            own_transaction_id: ChildTransactionId::new(),
2360            parent_transaction_id: txn.clone(),
2361            kind: DependentQueuedRequestKind::EditEvent {
2362                new_content: SerializableEventContent::new(
2363                    &RoomMessageEventContent::text_plain("edit").into(),
2364                )
2365                .unwrap(),
2366            },
2367            parent_key: None,
2368            created_at: MilliSecondsSinceUnixEpoch::now(),
2369        };
2370
2371        inputs.push({
2372            let mut edit = edit.clone();
2373            edit.own_transaction_id = ChildTransactionId::new();
2374            edit
2375        });
2376
2377        inputs.push(redact);
2378
2379        for _ in 0..98 {
2380            let mut edit = edit.clone();
2381            edit.own_transaction_id = ChildTransactionId::new();
2382            inputs.push(edit);
2383        }
2384
2385        let res = canonicalize_dependent_requests(&inputs);
2386
2387        assert_eq!(res.len(), 1);
2388        assert_matches!(&res[0].kind, DependentQueuedRequestKind::RedactEvent);
2389        assert_eq!(res[0].parent_transaction_id, txn);
2390    }
2391
2392    #[test]
2393    fn test_canonicalize_dependent_events_last_edit_preferred() {
2394        let parent_txn = TransactionId::new();
2395
2396        // The latest edit of a list is always preferred.
2397        let inputs = (0..10)
2398            .map(|i| DependentQueuedRequest {
2399                own_transaction_id: ChildTransactionId::new(),
2400                parent_transaction_id: parent_txn.clone(),
2401                kind: DependentQueuedRequestKind::EditEvent {
2402                    new_content: SerializableEventContent::new(
2403                        &RoomMessageEventContent::text_plain(format!("edit{i}")).into(),
2404                    )
2405                    .unwrap(),
2406                },
2407                parent_key: None,
2408                created_at: MilliSecondsSinceUnixEpoch::now(),
2409            })
2410            .collect::<Vec<_>>();
2411
2412        let txn = inputs[9].parent_transaction_id.clone();
2413
2414        let res = canonicalize_dependent_requests(&inputs);
2415
2416        assert_eq!(res.len(), 1);
2417        assert_let!(DependentQueuedRequestKind::EditEvent { new_content } = &res[0].kind);
2418        assert_let!(
2419            AnyMessageLikeEventContent::RoomMessage(msg) = new_content.deserialize().unwrap()
2420        );
2421        assert_eq!(msg.body(), "edit9");
2422        assert_eq!(res[0].parent_transaction_id, txn);
2423    }
2424
2425    #[test]
2426    fn test_canonicalize_multiple_local_echoes() {
2427        let txn1 = TransactionId::new();
2428        let txn2 = TransactionId::new();
2429
2430        let child1 = ChildTransactionId::new();
2431        let child2 = ChildTransactionId::new();
2432
2433        let inputs = vec![
2434            // This one pertains to txn1.
2435            DependentQueuedRequest {
2436                own_transaction_id: child1.clone(),
2437                kind: DependentQueuedRequestKind::RedactEvent,
2438                parent_transaction_id: txn1.clone(),
2439                parent_key: None,
2440                created_at: MilliSecondsSinceUnixEpoch::now(),
2441            },
2442            // This one pertains to txn2.
2443            DependentQueuedRequest {
2444                own_transaction_id: child2,
2445                kind: DependentQueuedRequestKind::EditEvent {
2446                    new_content: SerializableEventContent::new(
2447                        &RoomMessageEventContent::text_plain("edit").into(),
2448                    )
2449                    .unwrap(),
2450                },
2451                parent_transaction_id: txn2.clone(),
2452                parent_key: None,
2453                created_at: MilliSecondsSinceUnixEpoch::now(),
2454            },
2455        ];
2456
2457        let res = canonicalize_dependent_requests(&inputs);
2458
2459        // The canonicalization shouldn't depend per event id.
2460        assert_eq!(res.len(), 2);
2461
2462        for dependent in res {
2463            if dependent.own_transaction_id == child1 {
2464                assert_eq!(dependent.parent_transaction_id, txn1);
2465                assert_matches!(dependent.kind, DependentQueuedRequestKind::RedactEvent);
2466            } else {
2467                assert_eq!(dependent.parent_transaction_id, txn2);
2468                assert_matches!(dependent.kind, DependentQueuedRequestKind::EditEvent { .. });
2469            }
2470        }
2471    }
2472
2473    #[test]
2474    fn test_canonicalize_reactions_after_edits() {
2475        // Sending reactions should happen after edits to a given event.
2476        let txn = TransactionId::new();
2477
2478        let react_id = ChildTransactionId::new();
2479        let react = DependentQueuedRequest {
2480            own_transaction_id: react_id.clone(),
2481            kind: DependentQueuedRequestKind::ReactEvent { key: "🧠".to_owned() },
2482            parent_transaction_id: txn.clone(),
2483            parent_key: None,
2484            created_at: MilliSecondsSinceUnixEpoch::now(),
2485        };
2486
2487        let edit_id = ChildTransactionId::new();
2488        let edit = DependentQueuedRequest {
2489            own_transaction_id: edit_id.clone(),
2490            kind: DependentQueuedRequestKind::EditEvent {
2491                new_content: SerializableEventContent::new(
2492                    &RoomMessageEventContent::text_plain("edit").into(),
2493                )
2494                .unwrap(),
2495            },
2496            parent_transaction_id: txn,
2497            parent_key: None,
2498            created_at: MilliSecondsSinceUnixEpoch::now(),
2499        };
2500
2501        let res = canonicalize_dependent_requests(&[react, edit]);
2502
2503        assert_eq!(res.len(), 2);
2504        assert_eq!(res[0].own_transaction_id, edit_id);
2505        assert_eq!(res[1].own_transaction_id, react_id);
2506    }
2507}