Skip to main content

matrix_sdk_base/store/
send_queue.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All data types related to the send queue.
16
17use std::{collections::BTreeMap, fmt, ops::Deref};
18
19use as_variant::as_variant;
20use ruma::{
21    MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
22    TransactionId, UInt,
23    events::{
24        AnyMessageLikeEventContent, MessageLikeEventContent as _, RawExt as _,
25        room::{MediaSource, message::RoomMessageEventContent},
26    },
27    serde::Raw,
28};
29use serde::{Deserialize, Serialize};
30
31use crate::media::MediaRequestParameters;
32
33/// A thin wrapper to serialize a `AnyMessageLikeEventContent`.
34#[derive(Clone, Serialize, Deserialize)]
35pub struct SerializableEventContent {
36    event: Raw<AnyMessageLikeEventContent>,
37    event_type: String,
38}
39
40#[cfg(not(tarpaulin_include))]
41impl fmt::Debug for SerializableEventContent {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        // Don't include the event in the debug display.
44        f.debug_struct("SerializedEventContent")
45            .field("event_type", &self.event_type)
46            .finish_non_exhaustive()
47    }
48}
49
50impl SerializableEventContent {
51    /// Create a [`SerializableEventContent`] from a raw
52    /// [`AnyMessageLikeEventContent`] along with its type.
53    pub fn from_raw(event: Raw<AnyMessageLikeEventContent>, event_type: String) -> Self {
54        Self { event_type, event }
55    }
56
57    /// Create a [`SerializableEventContent`] from an
58    /// [`AnyMessageLikeEventContent`].
59    pub fn new(event: &AnyMessageLikeEventContent) -> Result<Self, serde_json::Error> {
60        Ok(Self::from_raw(Raw::new(event)?, event.event_type().to_string()))
61    }
62
63    /// Convert a [`SerializableEventContent`] back into a
64    /// [`AnyMessageLikeEventContent`].
65    pub fn deserialize(&self) -> Result<AnyMessageLikeEventContent, serde_json::Error> {
66        self.event.deserialize_with_type(&self.event_type)
67    }
68
69    /// Returns the raw event content along with its type, borrowed variant.
70    ///
71    /// Useful for callers manipulating custom events.
72    pub fn raw(&self) -> (&Raw<AnyMessageLikeEventContent>, &str) {
73        (&self.event, &self.event_type)
74    }
75
76    /// Returns the raw event content along with its type, owned variant.
77    ///
78    /// Useful for callers manipulating custom events.
79    pub fn into_raw(self) -> (Raw<AnyMessageLikeEventContent>, String) {
80        (self.event, self.event_type)
81    }
82}
83
84/// The kind of a send queue request.
85#[derive(Clone, Debug, Serialize, Deserialize)]
86pub enum QueuedRequestKind {
87    /// An event to be sent via the send queue.
88    Event {
89        /// The content of the message-like event we'd like to send.
90        content: SerializableEventContent,
91    },
92
93    /// Content to upload on the media server.
94    ///
95    /// The bytes must be stored in the media cache, and are identified by the
96    /// cache key.
97    MediaUpload {
98        /// Content type of the media to be uploaded.
99        ///
100        /// Stored as a `String` because `Mime` which we'd really want to use
101        /// here, is not serializable. Oh well.
102        content_type: String,
103
104        /// The cache key used to retrieve the media's bytes in the event cache
105        /// store.
106        cache_key: MediaRequestParameters,
107
108        /// An optional media source for a thumbnail already uploaded.
109        thumbnail_source: Option<MediaSource>,
110
111        /// To which media event transaction does this upload relate?
112        related_to: OwnedTransactionId,
113
114        /// Accumulated list of infos for previously uploaded files and
115        /// thumbnails if used during a gallery transaction. Otherwise empty.
116        #[cfg(feature = "unstable-msc4274")]
117        #[serde(default)]
118        accumulated: Vec<AccumulatedSentMediaInfo>,
119    },
120
121    /// A redaction of another event to send.
122    Redaction {
123        /// The ID of the event to redact.
124        redacts: OwnedEventId,
125        /// The reason for the event being redacted.
126        reason: Option<String>,
127    },
128}
129
130impl From<SerializableEventContent> for QueuedRequestKind {
131    fn from(content: SerializableEventContent) -> Self {
132        Self::Event { content }
133    }
134}
135
136/// A request to be sent with a send queue.
137#[derive(Clone)]
138pub struct QueuedRequest {
139    /// The kind of queued request we're going to send.
140    pub kind: QueuedRequestKind,
141
142    /// Unique transaction id for the queued request, acting as a key.
143    pub transaction_id: OwnedTransactionId,
144
145    /// Error returned when the request couldn't be sent and is stuck in the
146    /// unrecoverable state.
147    ///
148    /// `None` if the request is in the queue, waiting to be sent.
149    pub error: Option<QueueWedgeError>,
150
151    /// At which priority should this be handled?
152    ///
153    /// The bigger the value, the higher the priority at which this request
154    /// should be handled.
155    pub priority: usize,
156
157    /// The time that the request was originally attempted.
158    pub created_at: MilliSecondsSinceUnixEpoch,
159}
160
161impl QueuedRequest {
162    /// Returns `Some` if the queued request is about sending an event.
163    pub fn as_event(&self) -> Option<&SerializableEventContent> {
164        as_variant!(&self.kind, QueuedRequestKind::Event { content } => content)
165    }
166
167    /// True if the request couldn't be sent because of an unrecoverable API
168    /// error. See [`Self::error`] for more details on the reason.
169    pub fn is_wedged(&self) -> bool {
170        self.error.is_some()
171    }
172}
173
174/// Represents a failed to send unrecoverable error of an event sent via the
175/// send queue.
176///
177/// It is a serializable representation of a client error, see
178/// `From` implementation for more details. These errors can not be
179/// automatically retried, but yet some manual action can be taken before retry
180/// sending. If not the only solution is to delete the local event.
181#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
182pub enum QueueWedgeError {
183    /// This error occurs when there are some insecure devices in the room, and
184    /// the current encryption setting prohibits sharing with them.
185    #[error("There are insecure devices in the room")]
186    InsecureDevices {
187        /// The insecure devices as a Map of userID to deviceID.
188        user_device_map: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
189    },
190
191    /// This error occurs when a previously verified user is not anymore, and
192    /// the current encryption setting prohibits sharing when it happens.
193    #[error("Some users that were previously verified are not anymore")]
194    IdentityViolations {
195        /// The users that are expected to be verified but are not.
196        users: Vec<OwnedUserId>,
197    },
198
199    /// It is required to set up cross-signing and properly verify the current
200    /// session before sending.
201    #[error("Own verification is required")]
202    CrossVerificationRequired,
203
204    /// Media content was cached in the media store, but has disappeared before
205    /// we could upload it.
206    #[error("Media content disappeared")]
207    MissingMediaContent,
208
209    /// We tried to upload some media content with an unknown mime type.
210    #[error("Invalid mime type '{mime_type}' for media")]
211    InvalidMimeType {
212        /// The observed mime type that's expected to be invalid.
213        mime_type: String,
214    },
215
216    /// Other errors.
217    #[error("Other unrecoverable error: {msg}")]
218    GenericApiError {
219        /// Description of the error.
220        msg: String,
221    },
222}
223
224/// The specific user intent that characterizes a [`DependentQueuedRequest`].
225#[derive(Clone, Debug, Serialize, Deserialize)]
226pub enum DependentQueuedRequestKind {
227    /// The event should be edited.
228    EditEvent {
229        /// The new event for the content.
230        new_content: SerializableEventContent,
231    },
232
233    /// The event should be redacted/aborted/removed.
234    RedactEvent,
235
236    /// The event should be reacted to, with the given key.
237    ReactEvent {
238        /// Key used for the reaction.
239        key: String,
240    },
241
242    /// Upload a file or thumbnail depending on another file or thumbnail
243    /// upload.
244    #[serde(alias = "UploadFileWithThumbnail")]
245    UploadFileOrThumbnail {
246        /// Content type for the file or thumbnail.
247        content_type: String,
248
249        /// Media request necessary to retrieve the file or thumbnail itself.
250        cache_key: MediaRequestParameters,
251
252        /// To which media transaction id does this upload relate to?
253        related_to: OwnedTransactionId,
254
255        /// Whether the depended upon request was a thumbnail or a file upload.
256        #[serde(default = "default_parent_is_thumbnail_upload")]
257        parent_is_thumbnail_upload: bool,
258    },
259
260    /// Finish an upload by updating references to the media cache and sending
261    /// the final media event with the remote MXC URIs.
262    FinishUpload {
263        /// Local echo for the event (containing the local MXC URIs).
264        ///
265        /// `Box` the local echo so that it reduces the size of the whole enum.
266        local_echo: Box<RoomMessageEventContent>,
267
268        /// Transaction id for the file upload.
269        file_upload: OwnedTransactionId,
270
271        /// Information about the thumbnail, if present.
272        thumbnail_info: Option<FinishUploadThumbnailInfo>,
273    },
274
275    /// Finish a gallery upload by updating references to the media cache and
276    /// sending the final gallery event with the remote MXC URIs.
277    #[cfg(feature = "unstable-msc4274")]
278    FinishGallery {
279        /// Local echo for the event (containing the local MXC URIs).
280        ///
281        /// `Box` the local echo so that it reduces the size of the whole enum.
282        local_echo: Box<RoomMessageEventContent>,
283
284        /// Metadata about the gallery items.
285        item_infos: Vec<FinishGalleryItemInfo>,
286    },
287}
288
289/// If parent_is_thumbnail_upload is missing, we assume the request is for a
290/// file upload following a thumbnail upload. This was the only possible case
291/// before parent_is_thumbnail_upload was introduced.
292fn default_parent_is_thumbnail_upload() -> bool {
293    true
294}
295
296/// Detailed record about a thumbnail used when finishing a media upload.
297#[derive(Clone, Debug, Serialize, Deserialize)]
298pub struct FinishUploadThumbnailInfo {
299    /// Transaction id for the thumbnail upload.
300    pub txn: OwnedTransactionId,
301    /// Thumbnail's width.
302    #[serde(default, skip_serializing_if = "Option::is_none")]
303    pub width: Option<UInt>,
304    /// Thumbnail's height.
305    #[serde(default, skip_serializing_if = "Option::is_none")]
306    pub height: Option<UInt>,
307}
308
309/// Detailed record about a file and thumbnail. When finishing a gallery
310/// upload, one [`FinishGalleryItemInfo`] will be used for each media in the
311/// gallery.
312#[cfg(feature = "unstable-msc4274")]
313#[derive(Clone, Debug, Serialize, Deserialize)]
314pub struct FinishGalleryItemInfo {
315    /// Transaction id for the file upload.
316    pub file_upload: OwnedTransactionId,
317    /// Information about the thumbnail, if present.
318    pub thumbnail_info: Option<FinishUploadThumbnailInfo>,
319}
320
321/// A transaction id identifying a [`DependentQueuedRequest`] rather than its
322/// parent [`QueuedRequest`].
323///
324/// This thin wrapper adds some safety to some APIs, making it possible to
325/// distinguish between the parent's `TransactionId` and the dependent event's
326/// own `TransactionId`.
327#[repr(transparent)]
328#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
329#[serde(transparent)]
330pub struct ChildTransactionId(OwnedTransactionId);
331
332impl ChildTransactionId {
333    /// Returns a new [`ChildTransactionId`].
334    #[allow(clippy::new_without_default)]
335    pub fn new() -> Self {
336        Self(TransactionId::new())
337    }
338}
339
340impl Deref for ChildTransactionId {
341    type Target = TransactionId;
342
343    fn deref(&self) -> &Self::Target {
344        &self.0
345    }
346}
347
348impl From<String> for ChildTransactionId {
349    fn from(val: String) -> Self {
350        Self(val.into())
351    }
352}
353
354impl From<ChildTransactionId> for OwnedTransactionId {
355    fn from(val: ChildTransactionId) -> Self {
356        val.0
357    }
358}
359
360impl From<OwnedTransactionId> for ChildTransactionId {
361    fn from(val: OwnedTransactionId) -> Self {
362        Self(val)
363    }
364}
365
366/// Information about a media (and its thumbnail) that have been sent to a
367/// homeserver.
368#[derive(Clone, Debug, Serialize, Deserialize)]
369pub struct SentMediaInfo {
370    /// File that was uploaded by this request.
371    ///
372    /// If the request related to a thumbnail upload, this contains the
373    /// thumbnail media source.
374    pub file: MediaSource,
375
376    /// Optional thumbnail previously uploaded, when uploading a file.
377    ///
378    /// When uploading a thumbnail, this is set to `None`.
379    pub thumbnail: Option<MediaSource>,
380
381    /// Accumulated list of infos for previously uploaded files and thumbnails
382    /// if used during a gallery transaction. Otherwise empty.
383    #[cfg(feature = "unstable-msc4274")]
384    #[serde(default)]
385    pub accumulated: Vec<AccumulatedSentMediaInfo>,
386}
387
388/// Accumulated information about a media (and its thumbnail) that have been
389/// sent to a homeserver.
390#[cfg(feature = "unstable-msc4274")]
391#[derive(Clone, Debug, Serialize, Deserialize)]
392pub struct AccumulatedSentMediaInfo {
393    /// File that was uploaded by this request.
394    ///
395    /// If the request related to a thumbnail upload, this contains the
396    /// thumbnail media source.
397    pub file: MediaSource,
398
399    /// Optional thumbnail previously uploaded, when uploading a file.
400    ///
401    /// When uploading a thumbnail, this is set to `None`.
402    pub thumbnail: Option<MediaSource>,
403}
404
405#[cfg(feature = "unstable-msc4274")]
406impl From<AccumulatedSentMediaInfo> for SentMediaInfo {
407    fn from(value: AccumulatedSentMediaInfo) -> Self {
408        Self { file: value.file, thumbnail: value.thumbnail, accumulated: vec![] }
409    }
410}
411
412/// A unique key (identifier) indicating that a transaction has been
413/// successfully sent to the server.
414///
415/// The owning child transactions can now be resolved.
416#[derive(Clone, Debug, Serialize, Deserialize)]
417pub enum SentRequestKey {
418    /// The parent transaction returned an event when it succeeded.
419    Event {
420        /// The event ID returned by the server.
421        event_id: OwnedEventId,
422
423        /// The sent event.
424        event: Raw<AnyMessageLikeEventContent>,
425
426        /// The type of the sent event.
427        event_type: String,
428    },
429
430    /// The parent transaction returned an uploaded resource URL.
431    Media(SentMediaInfo),
432
433    /// The parent transaction returned a redaction event when it succeeded.
434    Redaction {
435        /// The event ID returned by the server.
436        event_id: OwnedEventId,
437
438        /// The ID of the redacted event.
439        redacts: OwnedEventId,
440
441        /// The reason for the event being redacted.
442        reason: Option<String>,
443    },
444}
445
446impl SentRequestKey {
447    /// Converts the current parent key into an event id, if possible.
448    pub fn into_event_id(self) -> Option<OwnedEventId> {
449        match self {
450            Self::Event { event_id, .. } | Self::Redaction { event_id, .. } => Some(event_id),
451            _ => None,
452        }
453    }
454
455    /// Converts the current parent key into information about a sent media, if
456    /// possible.
457    pub fn into_media(self) -> Option<SentMediaInfo> {
458        as_variant!(self, Self::Media)
459    }
460}
461
462/// A request to be sent, depending on a [`QueuedRequest`] to be sent first.
463///
464/// Depending on whether the parent request has been sent or not, this will
465/// either update the local echo in the storage, or materialize an equivalent
466/// request implementing the user intent to the homeserver.
467#[derive(Clone, Debug, Serialize, Deserialize)]
468pub struct DependentQueuedRequest {
469    /// Unique identifier for this dependent queued request.
470    ///
471    /// Useful for deletion.
472    pub own_transaction_id: ChildTransactionId,
473
474    /// The kind of user intent.
475    pub kind: DependentQueuedRequestKind,
476
477    /// Transaction id for the parent's local echo / used in the server request.
478    ///
479    /// Note: this is the transaction id used for the depended-on request, i.e.
480    /// the one that was originally sent and that's being modified with this
481    /// dependent request.
482    pub parent_transaction_id: OwnedTransactionId,
483
484    /// If the parent request has been sent, the parent's request identifier
485    /// returned by the server once the local echo has been sent out.
486    pub parent_key: Option<SentRequestKey>,
487
488    /// The time that the request was originally attempted.
489    pub created_at: MilliSecondsSinceUnixEpoch,
490}
491
492impl DependentQueuedRequest {
493    /// Does the dependent request represent a new event that is *not*
494    /// aggregated, aka it is going to be its own item in a timeline?
495    pub fn is_own_event(&self) -> bool {
496        match self.kind {
497            DependentQueuedRequestKind::EditEvent { .. }
498            | DependentQueuedRequestKind::RedactEvent
499            | DependentQueuedRequestKind::ReactEvent { .. }
500            | DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
501                // These are all aggregated events, or non-visible items (file upload producing
502                // a new MXC ID).
503                false
504            }
505            DependentQueuedRequestKind::FinishUpload { .. } => {
506                // This one graduates into a new media event.
507                true
508            }
509            #[cfg(feature = "unstable-msc4274")]
510            DependentQueuedRequestKind::FinishGallery { .. } => {
511                // This one graduates into a new gallery event.
512                true
513            }
514        }
515    }
516}
517
518#[cfg(not(tarpaulin_include))]
519impl fmt::Debug for QueuedRequest {
520    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
521        // Hide the content from the debug log.
522        f.debug_struct("QueuedRequest")
523            .field("transaction_id", &self.transaction_id)
524            .field("is_wedged", &self.is_wedged())
525            .finish_non_exhaustive()
526    }
527}