matrix_sdk_base/store/send_queue.rs
1// Copyright 2024 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! All data types related to the send queue.
16
17use std::{collections::BTreeMap, fmt, ops::Deref};
18
19use as_variant::as_variant;
20use ruma::{
21 MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId,
22 TransactionId, UInt,
23 events::{
24 AnyMessageLikeEventContent, MessageLikeEventContent as _, RawExt as _,
25 room::{MediaSource, message::RoomMessageEventContent},
26 },
27 serde::Raw,
28};
29use serde::{Deserialize, Serialize};
30
31use crate::media::MediaRequestParameters;
32
33/// A thin wrapper to serialize a `AnyMessageLikeEventContent`.
34#[derive(Clone, Serialize, Deserialize)]
35pub struct SerializableEventContent {
36 event: Raw<AnyMessageLikeEventContent>,
37 event_type: String,
38}
39
40#[cfg(not(tarpaulin_include))]
41impl fmt::Debug for SerializableEventContent {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 // Don't include the event in the debug display.
44 f.debug_struct("SerializedEventContent")
45 .field("event_type", &self.event_type)
46 .finish_non_exhaustive()
47 }
48}
49
50impl SerializableEventContent {
51 /// Create a [`SerializableEventContent`] from a raw
52 /// [`AnyMessageLikeEventContent`] along with its type.
53 pub fn from_raw(event: Raw<AnyMessageLikeEventContent>, event_type: String) -> Self {
54 Self { event_type, event }
55 }
56
57 /// Create a [`SerializableEventContent`] from an
58 /// [`AnyMessageLikeEventContent`].
59 pub fn new(event: &AnyMessageLikeEventContent) -> Result<Self, serde_json::Error> {
60 Ok(Self::from_raw(Raw::new(event)?, event.event_type().to_string()))
61 }
62
63 /// Convert a [`SerializableEventContent`] back into a
64 /// [`AnyMessageLikeEventContent`].
65 pub fn deserialize(&self) -> Result<AnyMessageLikeEventContent, serde_json::Error> {
66 self.event.deserialize_with_type(&self.event_type)
67 }
68
69 /// Returns the raw event content along with its type, borrowed variant.
70 ///
71 /// Useful for callers manipulating custom events.
72 pub fn raw(&self) -> (&Raw<AnyMessageLikeEventContent>, &str) {
73 (&self.event, &self.event_type)
74 }
75
76 /// Returns the raw event content along with its type, owned variant.
77 ///
78 /// Useful for callers manipulating custom events.
79 pub fn into_raw(self) -> (Raw<AnyMessageLikeEventContent>, String) {
80 (self.event, self.event_type)
81 }
82}
83
84/// The kind of a send queue request.
85#[derive(Clone, Debug, Serialize, Deserialize)]
86pub enum QueuedRequestKind {
87 /// An event to be sent via the send queue.
88 Event {
89 /// The content of the message-like event we'd like to send.
90 content: SerializableEventContent,
91 },
92
93 /// Content to upload on the media server.
94 ///
95 /// The bytes must be stored in the media cache, and are identified by the
96 /// cache key.
97 MediaUpload {
98 /// Content type of the media to be uploaded.
99 ///
100 /// Stored as a `String` because `Mime` which we'd really want to use
101 /// here, is not serializable. Oh well.
102 content_type: String,
103
104 /// The cache key used to retrieve the media's bytes in the event cache
105 /// store.
106 cache_key: MediaRequestParameters,
107
108 /// An optional media source for a thumbnail already uploaded.
109 thumbnail_source: Option<MediaSource>,
110
111 /// To which media event transaction does this upload relate?
112 related_to: OwnedTransactionId,
113
114 /// Accumulated list of infos for previously uploaded files and
115 /// thumbnails if used during a gallery transaction. Otherwise empty.
116 #[cfg(feature = "unstable-msc4274")]
117 #[serde(default)]
118 accumulated: Vec<AccumulatedSentMediaInfo>,
119 },
120}
121
122impl From<SerializableEventContent> for QueuedRequestKind {
123 fn from(content: SerializableEventContent) -> Self {
124 Self::Event { content }
125 }
126}
127
128/// A request to be sent with a send queue.
129#[derive(Clone)]
130pub struct QueuedRequest {
131 /// The kind of queued request we're going to send.
132 pub kind: QueuedRequestKind,
133
134 /// Unique transaction id for the queued request, acting as a key.
135 pub transaction_id: OwnedTransactionId,
136
137 /// Error returned when the request couldn't be sent and is stuck in the
138 /// unrecoverable state.
139 ///
140 /// `None` if the request is in the queue, waiting to be sent.
141 pub error: Option<QueueWedgeError>,
142
143 /// At which priority should this be handled?
144 ///
145 /// The bigger the value, the higher the priority at which this request
146 /// should be handled.
147 pub priority: usize,
148
149 /// The time that the request was originally attempted.
150 pub created_at: MilliSecondsSinceUnixEpoch,
151}
152
153impl QueuedRequest {
154 /// Returns `Some` if the queued request is about sending an event.
155 pub fn as_event(&self) -> Option<&SerializableEventContent> {
156 as_variant!(&self.kind, QueuedRequestKind::Event { content } => content)
157 }
158
159 /// True if the request couldn't be sent because of an unrecoverable API
160 /// error. See [`Self::error`] for more details on the reason.
161 pub fn is_wedged(&self) -> bool {
162 self.error.is_some()
163 }
164}
165
166/// Represents a failed to send unrecoverable error of an event sent via the
167/// send queue.
168///
169/// It is a serializable representation of a client error, see
170/// `From` implementation for more details. These errors can not be
171/// automatically retried, but yet some manual action can be taken before retry
172/// sending. If not the only solution is to delete the local event.
173#[derive(Clone, Debug, Serialize, Deserialize, thiserror::Error)]
174pub enum QueueWedgeError {
175 /// This error occurs when there are some insecure devices in the room, and
176 /// the current encryption setting prohibits sharing with them.
177 #[error("There are insecure devices in the room")]
178 InsecureDevices {
179 /// The insecure devices as a Map of userID to deviceID.
180 user_device_map: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
181 },
182
183 /// This error occurs when a previously verified user is not anymore, and
184 /// the current encryption setting prohibits sharing when it happens.
185 #[error("Some users that were previously verified are not anymore")]
186 IdentityViolations {
187 /// The users that are expected to be verified but are not.
188 users: Vec<OwnedUserId>,
189 },
190
191 /// It is required to set up cross-signing and properly verify the current
192 /// session before sending.
193 #[error("Own verification is required")]
194 CrossVerificationRequired,
195
196 /// Media content was cached in the media store, but has disappeared before
197 /// we could upload it.
198 #[error("Media content disappeared")]
199 MissingMediaContent,
200
201 /// We tried to upload some media content with an unknown mime type.
202 #[error("Invalid mime type '{mime_type}' for media")]
203 InvalidMimeType {
204 /// The observed mime type that's expected to be invalid.
205 mime_type: String,
206 },
207
208 /// Other errors.
209 #[error("Other unrecoverable error: {msg}")]
210 GenericApiError {
211 /// Description of the error.
212 msg: String,
213 },
214}
215
216/// The specific user intent that characterizes a [`DependentQueuedRequest`].
217#[derive(Clone, Debug, Serialize, Deserialize)]
218pub enum DependentQueuedRequestKind {
219 /// The event should be edited.
220 EditEvent {
221 /// The new event for the content.
222 new_content: SerializableEventContent,
223 },
224
225 /// The event should be redacted/aborted/removed.
226 RedactEvent,
227
228 /// The event should be reacted to, with the given key.
229 ReactEvent {
230 /// Key used for the reaction.
231 key: String,
232 },
233
234 /// Upload a file or thumbnail depending on another file or thumbnail
235 /// upload.
236 #[serde(alias = "UploadFileWithThumbnail")]
237 UploadFileOrThumbnail {
238 /// Content type for the file or thumbnail.
239 content_type: String,
240
241 /// Media request necessary to retrieve the file or thumbnail itself.
242 cache_key: MediaRequestParameters,
243
244 /// To which media transaction id does this upload relate to?
245 related_to: OwnedTransactionId,
246
247 /// Whether the depended upon request was a thumbnail or a file upload.
248 #[serde(default = "default_parent_is_thumbnail_upload")]
249 parent_is_thumbnail_upload: bool,
250 },
251
252 /// Finish an upload by updating references to the media cache and sending
253 /// the final media event with the remote MXC URIs.
254 FinishUpload {
255 /// Local echo for the event (containing the local MXC URIs).
256 ///
257 /// `Box` the local echo so that it reduces the size of the whole enum.
258 local_echo: Box<RoomMessageEventContent>,
259
260 /// Transaction id for the file upload.
261 file_upload: OwnedTransactionId,
262
263 /// Information about the thumbnail, if present.
264 thumbnail_info: Option<FinishUploadThumbnailInfo>,
265 },
266
267 /// Finish a gallery upload by updating references to the media cache and
268 /// sending the final gallery event with the remote MXC URIs.
269 #[cfg(feature = "unstable-msc4274")]
270 FinishGallery {
271 /// Local echo for the event (containing the local MXC URIs).
272 ///
273 /// `Box` the local echo so that it reduces the size of the whole enum.
274 local_echo: Box<RoomMessageEventContent>,
275
276 /// Metadata about the gallery items.
277 item_infos: Vec<FinishGalleryItemInfo>,
278 },
279}
280
281/// If parent_is_thumbnail_upload is missing, we assume the request is for a
282/// file upload following a thumbnail upload. This was the only possible case
283/// before parent_is_thumbnail_upload was introduced.
284fn default_parent_is_thumbnail_upload() -> bool {
285 true
286}
287
288/// Detailed record about a thumbnail used when finishing a media upload.
289#[derive(Clone, Debug, Serialize, Deserialize)]
290pub struct FinishUploadThumbnailInfo {
291 /// Transaction id for the thumbnail upload.
292 pub txn: OwnedTransactionId,
293 /// Thumbnail's width.
294 #[serde(default, skip_serializing_if = "Option::is_none")]
295 pub width: Option<UInt>,
296 /// Thumbnail's height.
297 #[serde(default, skip_serializing_if = "Option::is_none")]
298 pub height: Option<UInt>,
299}
300
301/// Detailed record about a file and thumbnail. When finishing a gallery
302/// upload, one [`FinishGalleryItemInfo`] will be used for each media in the
303/// gallery.
304#[cfg(feature = "unstable-msc4274")]
305#[derive(Clone, Debug, Serialize, Deserialize)]
306pub struct FinishGalleryItemInfo {
307 /// Transaction id for the file upload.
308 pub file_upload: OwnedTransactionId,
309 /// Information about the thumbnail, if present.
310 pub thumbnail_info: Option<FinishUploadThumbnailInfo>,
311}
312
313/// A transaction id identifying a [`DependentQueuedRequest`] rather than its
314/// parent [`QueuedRequest`].
315///
316/// This thin wrapper adds some safety to some APIs, making it possible to
317/// distinguish between the parent's `TransactionId` and the dependent event's
318/// own `TransactionId`.
319#[repr(transparent)]
320#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
321#[serde(transparent)]
322pub struct ChildTransactionId(OwnedTransactionId);
323
324impl ChildTransactionId {
325 /// Returns a new [`ChildTransactionId`].
326 #[allow(clippy::new_without_default)]
327 pub fn new() -> Self {
328 Self(TransactionId::new())
329 }
330}
331
332impl Deref for ChildTransactionId {
333 type Target = TransactionId;
334
335 fn deref(&self) -> &Self::Target {
336 &self.0
337 }
338}
339
340impl From<String> for ChildTransactionId {
341 fn from(val: String) -> Self {
342 Self(val.into())
343 }
344}
345
346impl From<ChildTransactionId> for OwnedTransactionId {
347 fn from(val: ChildTransactionId) -> Self {
348 val.0
349 }
350}
351
352impl From<OwnedTransactionId> for ChildTransactionId {
353 fn from(val: OwnedTransactionId) -> Self {
354 Self(val)
355 }
356}
357
358/// Information about a media (and its thumbnail) that have been sent to a
359/// homeserver.
360#[derive(Clone, Debug, Serialize, Deserialize)]
361pub struct SentMediaInfo {
362 /// File that was uploaded by this request.
363 ///
364 /// If the request related to a thumbnail upload, this contains the
365 /// thumbnail media source.
366 pub file: MediaSource,
367
368 /// Optional thumbnail previously uploaded, when uploading a file.
369 ///
370 /// When uploading a thumbnail, this is set to `None`.
371 pub thumbnail: Option<MediaSource>,
372
373 /// Accumulated list of infos for previously uploaded files and thumbnails
374 /// if used during a gallery transaction. Otherwise empty.
375 #[cfg(feature = "unstable-msc4274")]
376 #[serde(default)]
377 pub accumulated: Vec<AccumulatedSentMediaInfo>,
378}
379
380/// Accumulated information about a media (and its thumbnail) that have been
381/// sent to a homeserver.
382#[cfg(feature = "unstable-msc4274")]
383#[derive(Clone, Debug, Serialize, Deserialize)]
384pub struct AccumulatedSentMediaInfo {
385 /// File that was uploaded by this request.
386 ///
387 /// If the request related to a thumbnail upload, this contains the
388 /// thumbnail media source.
389 pub file: MediaSource,
390
391 /// Optional thumbnail previously uploaded, when uploading a file.
392 ///
393 /// When uploading a thumbnail, this is set to `None`.
394 pub thumbnail: Option<MediaSource>,
395}
396
397#[cfg(feature = "unstable-msc4274")]
398impl From<AccumulatedSentMediaInfo> for SentMediaInfo {
399 fn from(value: AccumulatedSentMediaInfo) -> Self {
400 Self { file: value.file, thumbnail: value.thumbnail, accumulated: vec![] }
401 }
402}
403
404/// A unique key (identifier) indicating that a transaction has been
405/// successfully sent to the server.
406///
407/// The owning child transactions can now be resolved.
408#[derive(Clone, Debug, Serialize, Deserialize)]
409pub enum SentRequestKey {
410 /// The parent transaction returned an event when it succeeded.
411 Event {
412 /// The event ID returned by the server.
413 event_id: OwnedEventId,
414
415 /// The sent event.
416 event: Raw<AnyMessageLikeEventContent>,
417
418 /// The type of the sent event.
419 event_type: String,
420 },
421
422 /// The parent transaction returned an uploaded resource URL.
423 Media(SentMediaInfo),
424}
425
426impl SentRequestKey {
427 /// Converts the current parent key into an event id, if possible.
428 pub fn into_event_id(self) -> Option<OwnedEventId> {
429 as_variant!(self, Self::Event { event_id, .. } => event_id)
430 }
431
432 /// Converts the current parent key into information about a sent media, if
433 /// possible.
434 pub fn into_media(self) -> Option<SentMediaInfo> {
435 as_variant!(self, Self::Media)
436 }
437}
438
439/// A request to be sent, depending on a [`QueuedRequest`] to be sent first.
440///
441/// Depending on whether the parent request has been sent or not, this will
442/// either update the local echo in the storage, or materialize an equivalent
443/// request implementing the user intent to the homeserver.
444#[derive(Clone, Debug, Serialize, Deserialize)]
445pub struct DependentQueuedRequest {
446 /// Unique identifier for this dependent queued request.
447 ///
448 /// Useful for deletion.
449 pub own_transaction_id: ChildTransactionId,
450
451 /// The kind of user intent.
452 pub kind: DependentQueuedRequestKind,
453
454 /// Transaction id for the parent's local echo / used in the server request.
455 ///
456 /// Note: this is the transaction id used for the depended-on request, i.e.
457 /// the one that was originally sent and that's being modified with this
458 /// dependent request.
459 pub parent_transaction_id: OwnedTransactionId,
460
461 /// If the parent request has been sent, the parent's request identifier
462 /// returned by the server once the local echo has been sent out.
463 pub parent_key: Option<SentRequestKey>,
464
465 /// The time that the request was originally attempted.
466 pub created_at: MilliSecondsSinceUnixEpoch,
467}
468
469impl DependentQueuedRequest {
470 /// Does the dependent request represent a new event that is *not*
471 /// aggregated, aka it is going to be its own item in a timeline?
472 pub fn is_own_event(&self) -> bool {
473 match self.kind {
474 DependentQueuedRequestKind::EditEvent { .. }
475 | DependentQueuedRequestKind::RedactEvent
476 | DependentQueuedRequestKind::ReactEvent { .. }
477 | DependentQueuedRequestKind::UploadFileOrThumbnail { .. } => {
478 // These are all aggregated events, or non-visible items (file upload producing
479 // a new MXC ID).
480 false
481 }
482 DependentQueuedRequestKind::FinishUpload { .. } => {
483 // This one graduates into a new media event.
484 true
485 }
486 #[cfg(feature = "unstable-msc4274")]
487 DependentQueuedRequestKind::FinishGallery { .. } => {
488 // This one graduates into a new gallery event.
489 true
490 }
491 }
492 }
493}
494
495#[cfg(not(tarpaulin_include))]
496impl fmt::Debug for QueuedRequest {
497 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
498 // Hide the content from the debug log.
499 f.debug_struct("QueuedRequest")
500 .field("transaction_id", &self.transaction_id)
501 .field("is_wedged", &self.is_wedged())
502 .finish_non_exhaustive()
503 }
504}