matrix_sdk_base/store/
send_queue.rs

1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All data types related to the send queue.
16
17use std::{collections::BTreeMap, fmt, ops::Deref};
18
19use as_variant::as_variant;
20use ruma::{
21    events::{
22        room::{message::RoomMessageEventContent, MediaSource},
23        AnyMessageLikeEventContent, EventContent as _, RawExt as _,
24    },
25    serde::Raw,
26    MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
27    TransactionId, UInt,
28};
29use serde::{Deserialize, Serialize};
30
31use crate::media::MediaRequestParameters;
32
33/// A thin wrapper to serialize a `AnyMessageLikeEventContent`.
34#[derive(Clone, Serialize, Deserialize)]
35pub struct SerializableEventContent {
36    event: Raw<AnyMessageLikeEventContent>,
37    event_type: String,
38}
39
40#[cfg(not(tarpaulin_include))]
41impl fmt::Debug for SerializableEventContent {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        // Don't include the event in the debug display.
44        f.debug_struct("SerializedEventContent")
45            .field("event_type", &self.event_type)
46            .finish_non_exhaustive()
47    }
48}
49
50impl SerializableEventContent {
51    /// Create a [`SerializableEventContent`] from a raw
52    /// [`AnyMessageLikeEventContent`] along with its type.
53    pub fn from_raw(event: Raw<AnyMessageLikeEventContent>, event_type: String) -> Self {
54        Self { event_type, event }
55    }
56
57    /// Create a [`SerializableEventContent`] from an
58    /// [`AnyMessageLikeEventContent`].
59    pub fn new(event: &AnyMessageLikeEventContent) -> Result<Self, serde_json::Error> {
60        Ok(Self::from_raw(Raw::new(event)?, event.event_type().to_string()))
61    }
62
63    /// Convert a [`SerializableEventContent`] back into a
64    /// [`AnyMessageLikeEventContent`].
65    pub fn deserialize(&self) -> Result<AnyMessageLikeEventContent, serde_json::Error> {
66        self.event.deserialize_with_type(self.event_type.clone().into())
67    }
68
69    /// Returns the raw event content along with its type.
70    ///
71    /// Useful for callers manipulating custom events.
72    pub fn raw(&self) -> (&Raw<AnyMessageLikeEventContent>, &str) {
73        (&self.event, &self.event_type)
74    }
75}
76
77/// The kind of a send queue request.
78#[derive(Clone, Debug, Serialize, Deserialize)]
79pub enum QueuedRequestKind {
80    /// An event to be sent via the send queue.
81    Event {
82        /// The content of the message-like event we'd like to send.
83        content: SerializableEventContent,
84    },
85
86    /// Content to upload on the media server.
87    ///
88    /// The bytes must be stored in the media cache, and are identified by the
89    /// cache key.
90    MediaUpload {
91        /// Content type of the media to be uploaded.
92        ///
93        /// Stored as a `String` because `Mime` which we'd really want to use
94        /// here, is not serializable. Oh well.
95        content_type: String,
96
97        /// The cache key used to retrieve the media's bytes in the event cache
98        /// store.
99        cache_key: MediaRequestParameters,
100
101        /// An optional media source for a thumbnail already uploaded.
102        thumbnail_source: Option<MediaSource>,
103
104        /// To which media event transaction does this upload relate?
105        related_to: OwnedTransactionId,
106    },
107}
108
109impl From<SerializableEventContent> for QueuedRequestKind {
110    fn from(content: SerializableEventContent) -> Self {
111        Self::Event { content }
112    }
113}
114
115/// A request to be sent with a send queue.
116#[derive(Clone)]
117pub struct QueuedRequest {
118    /// The kind of queued request we're going to send.
119    pub kind: QueuedRequestKind,
120
121    /// Unique transaction id for the queued request, acting as a key.
122    pub transaction_id: OwnedTransactionId,
123
124    /// Error returned when the request couldn't be sent and is stuck in the
125    /// unrecoverable state.
126    ///
127    /// `None` if the request is in the queue, waiting to be sent.
128    pub error: Option<QueueWedgeError>,
129
130    /// At which priority should this be handled?
131    ///
132    /// The bigger the value, the higher the priority at which this request
133    /// should be handled.
134    pub priority: usize,
135
136    /// The time that the request was originally attempted.
137    pub created_at: MilliSecondsSinceUnixEpoch,
138}
139
140impl QueuedRequest {
141    /// Returns `Some` if the queued request is about sending an event.
142    pub fn as_event(&self) -> Option<&SerializableEventContent> {
143        as_variant!(&self.kind, QueuedRequestKind::Event { content } => content)
144    }
145
146    /// True if the request couldn't be sent because of an unrecoverable API
147    /// error. See [`Self::error`] for more details on the reason.
148    pub fn is_wedged(&self) -> bool {
149        self.error.is_some()
150    }
151}
152
153/// Represents a failed to send unrecoverable error of an event sent via the
154/// send queue.
155///
156/// It is a serializable representation of a client error, see
157/// `From` implementation for more details. These errors can not be
158/// automatically retried, but yet some manual action can be taken before retry
159/// sending. If not the only solution is to delete the local event.
160#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
161pub enum QueueWedgeError {
162    /// This error occurs when there are some insecure devices in the room, and
163    /// the current encryption setting prohibits sharing with them.
164    #[error("There are insecure devices in the room")]
165    InsecureDevices {
166        /// The insecure devices as a Map of userID to deviceID.
167        user_device_map: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
168    },
169
170    /// This error occurs when a previously verified user is not anymore, and
171    /// the current encryption setting prohibits sharing when it happens.
172    #[error("Some users that were previously verified are not anymore")]
173    IdentityViolations {
174        /// The users that are expected to be verified but are not.
175        users: Vec<OwnedUserId>,
176    },
177
178    /// It is required to set up cross-signing and properly verify the current
179    /// session before sending.
180    #[error("Own verification is required")]
181    CrossVerificationRequired,
182
183    /// Media content was cached in the media store, but has disappeared before
184    /// we could upload it.
185    #[error("Media content disappeared")]
186    MissingMediaContent,
187
188    /// We tried to upload some media content with an unknown mime type.
189    #[error("Invalid mime type '{mime_type}' for media")]
190    InvalidMimeType {
191        /// The observed mime type that's expected to be invalid.
192        mime_type: String,
193    },
194
195    /// Other errors.
196    #[error("Other unrecoverable error: {msg}")]
197    GenericApiError {
198        /// Description of the error.
199        msg: String,
200    },
201}
202
203/// The specific user intent that characterizes a
204/// [`DependentQueuedRequestKind`].
205#[derive(Clone, Debug, Serialize, Deserialize)]
206pub enum DependentQueuedRequestKind {
207    /// The event should be edited.
208    EditEvent {
209        /// The new event for the content.
210        new_content: SerializableEventContent,
211    },
212
213    /// The event should be redacted/aborted/removed.
214    RedactEvent,
215
216    /// The event should be reacted to, with the given key.
217    ReactEvent {
218        /// Key used for the reaction.
219        key: String,
220    },
221
222    /// Upload a file that had a thumbnail.
223    UploadFileWithThumbnail {
224        /// Content type for the file itself (not the thumbnail).
225        content_type: String,
226
227        /// Media request necessary to retrieve the file itself (not the
228        /// thumbnail).
229        cache_key: MediaRequestParameters,
230
231        /// To which media transaction id does this upload relate to?
232        related_to: OwnedTransactionId,
233    },
234
235    /// Finish an upload by updating references to the media cache and sending
236    /// the final media event with the remote MXC URIs.
237    FinishUpload {
238        /// Local echo for the event (containing the local MXC URIs).
239        local_echo: RoomMessageEventContent,
240
241        /// Transaction id for the file upload.
242        file_upload: OwnedTransactionId,
243
244        /// Information about the thumbnail, if present.
245        thumbnail_info: Option<FinishUploadThumbnailInfo>,
246    },
247}
248
249/// Detailed record about a thumbnail used when finishing a media upload.
250#[derive(Clone, Debug, Serialize, Deserialize)]
251pub struct FinishUploadThumbnailInfo {
252    /// Transaction id for the thumbnail upload.
253    pub txn: OwnedTransactionId,
254    /// Thumbnail's width.
255    ///
256    /// Used previously, kept for backwards compatibility.
257    #[serde(default, skip_serializing_if = "Option::is_none")]
258    pub width: Option<UInt>,
259    /// Thumbnail's height.
260    ///
261    /// Used previously, kept for backwards compatibility.
262    #[serde(default, skip_serializing_if = "Option::is_none")]
263    pub height: Option<UInt>,
264}
265
266/// A transaction id identifying a [`DependentQueuedRequest`] rather than its
267/// parent [`QueuedRequest`].
268///
269/// This thin wrapper adds some safety to some APIs, making it possible to
270/// distinguish between the parent's `TransactionId` and the dependent event's
271/// own `TransactionId`.
272#[repr(transparent)]
273#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
274#[serde(transparent)]
275pub struct ChildTransactionId(OwnedTransactionId);
276
277impl ChildTransactionId {
278    /// Returns a new [`ChildTransactionId`].
279    #[allow(clippy::new_without_default)]
280    pub fn new() -> Self {
281        Self(TransactionId::new())
282    }
283}
284
285impl Deref for ChildTransactionId {
286    type Target = TransactionId;
287
288    fn deref(&self) -> &Self::Target {
289        &self.0
290    }
291}
292
293impl From<String> for ChildTransactionId {
294    fn from(val: String) -> Self {
295        Self(val.into())
296    }
297}
298
299impl From<ChildTransactionId> for OwnedTransactionId {
300    fn from(val: ChildTransactionId) -> Self {
301        val.0
302    }
303}
304
305impl From<OwnedTransactionId> for ChildTransactionId {
306    fn from(val: OwnedTransactionId) -> Self {
307        Self(val)
308    }
309}
310
311/// Information about a media (and its thumbnail) that have been sent to an
312/// homeserver.
313#[derive(Clone, Debug, Serialize, Deserialize)]
314pub struct SentMediaInfo {
315    /// File that was uploaded by this request.
316    ///
317    /// If the request related to a thumbnail upload, this contains the
318    /// thumbnail media source.
319    pub file: MediaSource,
320
321    /// Optional thumbnail previously uploaded, when uploading a file.
322    ///
323    /// When uploading a thumbnail, this is set to `None`.
324    pub thumbnail: Option<MediaSource>,
325}
326
327/// A unique key (identifier) indicating that a transaction has been
328/// successfully sent to the server.
329///
330/// The owning child transactions can now be resolved.
331#[derive(Clone, Debug, Serialize, Deserialize)]
332pub enum SentRequestKey {
333    /// The parent transaction returned an event when it succeeded.
334    Event(OwnedEventId),
335
336    /// The parent transaction returned an uploaded resource URL.
337    Media(SentMediaInfo),
338}
339
340impl SentRequestKey {
341    /// Converts the current parent key into an event id, if possible.
342    pub fn into_event_id(self) -> Option<OwnedEventId> {
343        as_variant!(self, Self::Event)
344    }
345
346    /// Converts the current parent key into information about a sent media, if
347    /// possible.
348    pub fn into_media(self) -> Option<SentMediaInfo> {
349        as_variant!(self, Self::Media)
350    }
351}
352
353/// A request to be sent, depending on a [`QueuedRequest`] to be sent first.
354///
355/// Depending on whether the parent request has been sent or not, this will
356/// either update the local echo in the storage, or materialize an equivalent
357/// request implementing the user intent to the homeserver.
358#[derive(Clone, Debug, Serialize, Deserialize)]
359pub struct DependentQueuedRequest {
360    /// Unique identifier for this dependent queued request.
361    ///
362    /// Useful for deletion.
363    pub own_transaction_id: ChildTransactionId,
364
365    /// The kind of user intent.
366    pub kind: DependentQueuedRequestKind,
367
368    /// Transaction id for the parent's local echo / used in the server request.
369    ///
370    /// Note: this is the transaction id used for the depended-on request, i.e.
371    /// the one that was originally sent and that's being modified with this
372    /// dependent request.
373    pub parent_transaction_id: OwnedTransactionId,
374
375    /// If the parent request has been sent, the parent's request identifier
376    /// returned by the server once the local echo has been sent out.
377    pub parent_key: Option<SentRequestKey>,
378
379    /// The time that the request was originally attempted.
380    pub created_at: MilliSecondsSinceUnixEpoch,
381}
382
383impl DependentQueuedRequest {
384    /// Does the dependent request represent a new event that is *not*
385    /// aggregated, aka it is going to be its own item in a timeline?
386    pub fn is_own_event(&self) -> bool {
387        match self.kind {
388            DependentQueuedRequestKind::EditEvent { .. }
389            | DependentQueuedRequestKind::RedactEvent
390            | DependentQueuedRequestKind::ReactEvent { .. }
391            | DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => {
392                // These are all aggregated events, or non-visible items (file upload producing
393                // a new MXC ID).
394                false
395            }
396            DependentQueuedRequestKind::FinishUpload { .. } => {
397                // This one graduates into a new media event.
398                true
399            }
400        }
401    }
402}
403
404#[cfg(not(tarpaulin_include))]
405impl fmt::Debug for QueuedRequest {
406    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
407        // Hide the content from the debug log.
408        f.debug_struct("QueuedRequest")
409            .field("transaction_id", &self.transaction_id)
410            .field("is_wedged", &self.is_wedged())
411            .finish_non_exhaustive()
412    }
413}