matrix_sdk/room/
futures.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Named futures returned from methods on types in [the `room` module][super].
16
17#![deny(unreachable_pub)]
18
19use std::future::IntoFuture;
20
21use eyeball::SharedObservable;
22use matrix_sdk_common::boxed_into_future;
23use mime::Mime;
24#[cfg(doc)]
25use ruma::events::{MessageLikeUnsigned, SyncMessageLikeEvent};
26use ruma::{
27    api::client::message::send_message_event,
28    assign,
29    events::{AnyMessageLikeEventContent, MessageLikeEventContent},
30    serde::Raw,
31    OwnedTransactionId, TransactionId,
32};
33use tracing::{info, trace, Instrument, Span};
34
35use super::Room;
36use crate::{
37    attachment::AttachmentConfig, config::RequestConfig, utils::IntoRawMessageLikeEventContent,
38    Result, TransmissionProgress,
39};
40
41/// Future returned by [`Room::send`].
42#[allow(missing_debug_implementations)]
43pub struct SendMessageLikeEvent<'a> {
44    room: &'a Room,
45    event_type: String,
46    content: serde_json::Result<serde_json::Value>,
47    transaction_id: Option<OwnedTransactionId>,
48    request_config: Option<RequestConfig>,
49}
50
51impl<'a> SendMessageLikeEvent<'a> {
52    pub(crate) fn new(room: &'a Room, content: impl MessageLikeEventContent) -> Self {
53        let event_type = content.event_type().to_string();
54        let content = serde_json::to_value(&content);
55        Self { room, event_type, content, transaction_id: None, request_config: None }
56    }
57
58    /// Set a transaction ID for this event.
59    ///
60    /// Since sending message-like events always requires a transaction ID, one
61    /// is generated if this method is not called.
62    ///
63    /// The transaction ID is a locally-unique ID describing a message
64    /// transaction with the homeserver.
65    ///
66    /// * On the sending side, this field is used for re-trying earlier failed
67    ///   transactions. Subsequent messages *must never* re-use an earlier
68    ///   transaction ID.
69    /// * On the receiving side, the field is used for recognizing our own
70    ///   messages when they arrive down the sync: the server includes the ID in
71    ///   the [`MessageLikeUnsigned`] field `transaction_id` of the
72    ///   corresponding [`SyncMessageLikeEvent`], but only for the *sending*
73    ///   device. Other devices will not see it. This is then used to ignore
74    ///   events sent by our own device and/or to implement local echo.
75    pub fn with_transaction_id(mut self, txn_id: OwnedTransactionId) -> Self {
76        self.transaction_id = Some(txn_id);
77        self
78    }
79
80    /// Assign a given [`RequestConfig`] to configure how this request should
81    /// behave with respect to the network.
82    pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
83        self.request_config = Some(request_config);
84        self
85    }
86}
87
88impl<'a> IntoFuture for SendMessageLikeEvent<'a> {
89    type Output = Result<send_message_event::v3::Response>;
90    boxed_into_future!(extra_bounds: 'a);
91
92    fn into_future(self) -> Self::IntoFuture {
93        let Self { room, event_type, content, transaction_id, request_config } = self;
94        Box::pin(async move {
95            let content = content?;
96            assign!(room.send_raw(&event_type, content), { transaction_id, request_config }).await
97        })
98    }
99}
100
101/// Future returned by [`Room::send_raw`].
102#[allow(missing_debug_implementations)]
103pub struct SendRawMessageLikeEvent<'a> {
104    room: &'a Room,
105    event_type: &'a str,
106    content: Raw<AnyMessageLikeEventContent>,
107    tracing_span: Span,
108    transaction_id: Option<OwnedTransactionId>,
109    request_config: Option<RequestConfig>,
110}
111
112impl<'a> SendRawMessageLikeEvent<'a> {
113    pub(crate) fn new(
114        room: &'a Room,
115        event_type: &'a str,
116        content: impl IntoRawMessageLikeEventContent,
117    ) -> Self {
118        let content = content.into_raw_message_like_event_content();
119        Self {
120            room,
121            event_type,
122            content,
123            tracing_span: Span::current(),
124            transaction_id: None,
125            request_config: None,
126        }
127    }
128
129    /// Set a transaction ID for this event.
130    ///
131    /// Since sending message-like events always requires a transaction ID, one
132    /// is generated if this method is not called.
133    ///
134    /// * On the sending side, this field is used for re-trying earlier failed
135    ///   transactions. Subsequent messages *must never* re-use an earlier
136    ///   transaction ID.
137    /// * On the receiving side, the field is used for recognizing our own
138    ///   messages when they arrive down the sync: the server includes the ID in
139    ///   the [`MessageLikeUnsigned`] field `transaction_id` of the
140    ///   corresponding [`SyncMessageLikeEvent`], but only for the *sending*
141    ///   device. Other devices will not see it. This is then used to ignore
142    ///   events sent by our own device and/or to implement local echo.
143    pub fn with_transaction_id(mut self, txn_id: &TransactionId) -> Self {
144        self.transaction_id = Some(txn_id.to_owned());
145        self
146    }
147
148    /// Assign a given [`RequestConfig`] to configure how this request should
149    /// behave with respect to the network.
150    pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
151        self.request_config = Some(request_config);
152        self
153    }
154}
155
156impl<'a> IntoFuture for SendRawMessageLikeEvent<'a> {
157    type Output = Result<send_message_event::v3::Response>;
158    boxed_into_future!(extra_bounds: 'a);
159
160    fn into_future(self) -> Self::IntoFuture {
161        #[cfg_attr(not(feature = "e2e-encryption"), allow(unused_mut))]
162        let Self {
163            room,
164            mut event_type,
165            mut content,
166            tracing_span,
167            transaction_id,
168            request_config,
169        } = self;
170
171        let fut = async move {
172            room.ensure_room_joined()?;
173
174            let txn_id = transaction_id.unwrap_or_else(TransactionId::new);
175            Span::current().record("transaction_id", tracing::field::debug(&txn_id));
176
177            #[cfg(not(feature = "e2e-encryption"))]
178            trace!("Sending plaintext event to room because we don't have encryption support.");
179
180            #[cfg(feature = "e2e-encryption")]
181            if room.is_encrypted().await? {
182                Span::current().record("is_room_encrypted", true);
183                // Reactions are currently famously not encrypted, skip encrypting
184                // them until they are.
185                if event_type == "m.reaction" {
186                    trace!("Sending plaintext event because of the event type.");
187                } else {
188                    trace!(
189                        room_id = ?room.room_id(),
190                        "Sending encrypted event because the room is encrypted.",
191                    );
192
193                    if !room.are_members_synced() {
194                        room.sync_members().await?;
195                    }
196
197                    // Query keys in case we don't have them for newly synced members.
198                    //
199                    // Note we do it all the time, because we might have sync'd members before
200                    // sending a message (so didn't enter the above branch), but
201                    // could have not query their keys ever.
202                    room.query_keys_for_untracked_users().await?;
203
204                    room.preshare_room_key().await?;
205
206                    let olm = room.client.olm_machine().await;
207                    let olm = olm.as_ref().expect("Olm machine wasn't started");
208
209                    content = olm
210                        .encrypt_room_event_raw(room.room_id(), event_type, &content)
211                        .await?
212                        .cast();
213                    event_type = "m.room.encrypted";
214                }
215            } else {
216                Span::current().record("is_room_encrypted", false);
217                trace!("Sending plaintext event because the room is NOT encrypted.",);
218            };
219
220            let request = send_message_event::v3::Request::new_raw(
221                room.room_id().to_owned(),
222                txn_id,
223                event_type.into(),
224                content,
225            );
226
227            let response = room.client.send(request).with_request_config(request_config).await?;
228
229            Span::current().record("event_id", tracing::field::debug(&response.event_id));
230            info!("Sent event in room");
231
232            Ok(response)
233        };
234
235        Box::pin(fut.instrument(tracing_span))
236    }
237}
238
239/// Future returned by [`Room::send_attachment`].
240#[allow(missing_debug_implementations)]
241pub struct SendAttachment<'a> {
242    room: &'a Room,
243    filename: String,
244    content_type: &'a Mime,
245    data: Vec<u8>,
246    config: AttachmentConfig,
247    tracing_span: Span,
248    send_progress: SharedObservable<TransmissionProgress>,
249    store_in_cache: bool,
250}
251
252impl<'a> SendAttachment<'a> {
253    pub(crate) fn new(
254        room: &'a Room,
255        filename: String,
256        content_type: &'a Mime,
257        data: Vec<u8>,
258        config: AttachmentConfig,
259    ) -> Self {
260        Self {
261            room,
262            filename,
263            content_type,
264            data,
265            config,
266            tracing_span: Span::current(),
267            send_progress: Default::default(),
268            store_in_cache: false,
269        }
270    }
271
272    /// Replace the default `SharedObservable` used for tracking upload
273    /// progress.
274    pub fn with_send_progress_observable(
275        mut self,
276        send_progress: SharedObservable<TransmissionProgress>,
277    ) -> Self {
278        self.send_progress = send_progress;
279        self
280    }
281
282    /// Whether the sent attachment should be stored in the cache or not.
283    ///
284    /// If set to true, then retrieving the data for the attachment will result
285    /// in a cache hit immediately after upload.
286    pub fn store_in_cache(mut self) -> Self {
287        self.store_in_cache = true;
288        self
289    }
290}
291
292impl<'a> IntoFuture for SendAttachment<'a> {
293    type Output = Result<send_message_event::v3::Response>;
294    boxed_into_future!(extra_bounds: 'a);
295
296    fn into_future(self) -> Self::IntoFuture {
297        let Self {
298            room,
299            filename,
300            content_type,
301            data,
302            config,
303            tracing_span,
304            send_progress,
305            store_in_cache,
306        } = self;
307        let fut = async move {
308            room.prepare_and_send_attachment(
309                filename,
310                content_type,
311                data,
312                config,
313                send_progress,
314                store_in_cache,
315            )
316            .await
317        };
318
319        Box::pin(fut.instrument(tracing_span))
320    }
321}