matrix_sdk/room/
futures.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Named futures returned from methods on types in [the `room` module][super].
16
17#![deny(unreachable_pub)]
18
19#[cfg(feature = "experimental-encrypted-state-events")]
20use std::borrow::Borrow;
21use std::future::IntoFuture;
22
23use eyeball::SharedObservable;
24use matrix_sdk_base::deserialized_responses::EncryptionInfo;
25use matrix_sdk_common::boxed_into_future;
26use mime::Mime;
27#[cfg(doc)]
28use ruma::events::{MessageLikeUnsigned, SyncMessageLikeEvent};
29use ruma::{
30    OwnedTransactionId, TransactionId,
31    api::client::message::send_message_event,
32    assign,
33    events::{AnyMessageLikeEventContent, MessageLikeEventContent},
34    serde::Raw,
35};
36#[cfg(feature = "experimental-encrypted-state-events")]
37use ruma::{
38    api::client::state::send_state_event,
39    events::{AnyStateEventContent, StateEventContent},
40};
41use tracing::{Instrument, Span, info, trace};
42
43use super::Room;
44#[cfg(feature = "experimental-encrypted-state-events")]
45use crate::utils::IntoRawStateEventContent;
46use crate::{
47    Result, TransmissionProgress, attachment::AttachmentConfig, config::RequestConfig,
48    utils::IntoRawMessageLikeEventContent,
49};
50
51/// The result of the [`Room::send`] future
52#[derive(Debug)]
53pub struct SendMessageLikeEventResult {
54    /// The response
55    pub response: send_message_event::v3::Response,
56    /// The encryption info, if the event was encrypted
57    pub encryption_info: Option<EncryptionInfo>,
58}
59
60/// Future returned by [`Room::send`].
61#[allow(missing_debug_implementations)]
62pub struct SendMessageLikeEvent<'a> {
63    room: &'a Room,
64    event_type: String,
65    content: serde_json::Result<serde_json::Value>,
66    transaction_id: Option<OwnedTransactionId>,
67    request_config: Option<RequestConfig>,
68}
69
70impl<'a> SendMessageLikeEvent<'a> {
71    pub(crate) fn new(room: &'a Room, content: impl MessageLikeEventContent) -> Self {
72        let event_type = content.event_type().to_string();
73        let content = serde_json::to_value(&content);
74        Self { room, event_type, content, transaction_id: None, request_config: None }
75    }
76
77    /// Set a transaction ID for this event.
78    ///
79    /// Since sending message-like events always requires a transaction ID, one
80    /// is generated if this method is not called.
81    ///
82    /// The transaction ID is a locally-unique ID describing a message
83    /// transaction with the homeserver.
84    ///
85    /// * On the sending side, this field is used for re-trying earlier failed
86    ///   transactions. Subsequent messages *must never* re-use an earlier
87    ///   transaction ID.
88    /// * On the receiving side, the field is used for recognizing our own
89    ///   messages when they arrive down the sync: the server includes the ID in
90    ///   the [`MessageLikeUnsigned`] field `transaction_id` of the
91    ///   corresponding [`SyncMessageLikeEvent`], but only for the *sending*
92    ///   device. Other devices will not see it. This is then used to ignore
93    ///   events sent by our own device and/or to implement local echo.
94    pub fn with_transaction_id(mut self, txn_id: OwnedTransactionId) -> Self {
95        self.transaction_id = Some(txn_id);
96        self
97    }
98
99    /// Assign a given [`RequestConfig`] to configure how this request should
100    /// behave with respect to the network.
101    pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
102        self.request_config = Some(request_config);
103        self
104    }
105}
106
107impl<'a> IntoFuture for SendMessageLikeEvent<'a> {
108    type Output = Result<SendMessageLikeEventResult>;
109    boxed_into_future!(extra_bounds: 'a);
110
111    fn into_future(self) -> Self::IntoFuture {
112        let Self { room, event_type, content, transaction_id, request_config } = self;
113        Box::pin(async move {
114            let content = content?;
115            assign!(room.send_raw(&event_type, content), { transaction_id, request_config }).await
116        })
117    }
118}
119
120/// Future returned by [`Room::send_raw`].
121#[allow(missing_debug_implementations)]
122pub struct SendRawMessageLikeEvent<'a> {
123    room: &'a Room,
124    event_type: &'a str,
125    content: Raw<AnyMessageLikeEventContent>,
126    tracing_span: Span,
127    transaction_id: Option<OwnedTransactionId>,
128    request_config: Option<RequestConfig>,
129}
130
131impl<'a> SendRawMessageLikeEvent<'a> {
132    pub(crate) fn new(
133        room: &'a Room,
134        event_type: &'a str,
135        content: impl IntoRawMessageLikeEventContent,
136    ) -> Self {
137        let content = content.into_raw_message_like_event_content();
138        Self {
139            room,
140            event_type,
141            content,
142            tracing_span: Span::current(),
143            transaction_id: None,
144            request_config: None,
145        }
146    }
147
148    /// Set a transaction ID for this event.
149    ///
150    /// Since sending message-like events always requires a transaction ID, one
151    /// is generated if this method is not called.
152    ///
153    /// * On the sending side, this field is used for re-trying earlier failed
154    ///   transactions. Subsequent messages *must never* re-use an earlier
155    ///   transaction ID.
156    /// * On the receiving side, the field is used for recognizing our own
157    ///   messages when they arrive down the sync: the server includes the ID in
158    ///   the [`MessageLikeUnsigned`] field `transaction_id` of the
159    ///   corresponding [`SyncMessageLikeEvent`], but only for the *sending*
160    ///   device. Other devices will not see it. This is then used to ignore
161    ///   events sent by our own device and/or to implement local echo.
162    pub fn with_transaction_id(mut self, txn_id: &TransactionId) -> Self {
163        self.transaction_id = Some(txn_id.to_owned());
164        self
165    }
166
167    /// Assign a given [`RequestConfig`] to configure how this request should
168    /// behave with respect to the network.
169    pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
170        self.request_config = Some(request_config);
171        self
172    }
173}
174
175impl<'a> IntoFuture for SendRawMessageLikeEvent<'a> {
176    type Output = Result<SendMessageLikeEventResult>;
177    boxed_into_future!(extra_bounds: 'a);
178
179    fn into_future(self) -> Self::IntoFuture {
180        #[cfg_attr(not(feature = "e2e-encryption"), allow(unused_mut))]
181        let Self {
182            room,
183            mut event_type,
184            mut content,
185            tracing_span,
186            transaction_id,
187            request_config,
188        } = self;
189
190        let fut = async move {
191            room.ensure_room_joined()?;
192
193            let txn_id = transaction_id.unwrap_or_else(TransactionId::new);
194            Span::current().record("transaction_id", tracing::field::debug(&txn_id));
195
196            #[cfg(not(feature = "e2e-encryption"))]
197            trace!("Sending plaintext event to room because we don't have encryption support.");
198
199            #[cfg(feature = "e2e-encryption")]
200            let mut encryption_info: Option<EncryptionInfo> = None;
201            #[cfg(not(feature = "e2e-encryption"))]
202            let encryption_info: Option<EncryptionInfo> = None;
203
204            #[cfg(feature = "e2e-encryption")]
205            if room.latest_encryption_state().await?.is_encrypted() {
206                Span::current().record("is_room_encrypted", true);
207                // Reactions are currently famously not encrypted, skip encrypting
208                // them until they are.
209                if event_type == "m.reaction" {
210                    trace!("Sending plaintext event because of the event type.");
211                } else {
212                    trace!(
213                        room_id = ?room.room_id(),
214                        "Sending encrypted event because the room is encrypted.",
215                    );
216
217                    ensure_room_encryption_ready(room).await?;
218
219                    let olm = room.client.olm_machine().await;
220                    let olm = olm.as_ref().expect("Olm machine wasn't started");
221
222                    let result =
223                        olm.encrypt_room_event_raw(room.room_id(), event_type, &content).await?;
224                    content = result.content.cast();
225                    encryption_info = Some(result.encryption_info);
226                    event_type = "m.room.encrypted";
227                }
228            } else {
229                Span::current().record("is_room_encrypted", false);
230                trace!("Sending plaintext event because the room is NOT encrypted.");
231            }
232
233            let request = send_message_event::v3::Request::new_raw(
234                room.room_id().to_owned(),
235                txn_id,
236                event_type.into(),
237                content,
238            );
239
240            let response = room.client.send(request).with_request_config(request_config).await?;
241
242            Span::current().record("event_id", tracing::field::debug(&response.event_id));
243            info!("Sent event in room");
244
245            Ok(SendMessageLikeEventResult { response, encryption_info })
246        };
247
248        Box::pin(fut.instrument(tracing_span))
249    }
250}
251
252/// Future returned by [`Room::send_attachment`].
253#[allow(missing_debug_implementations)]
254pub struct SendAttachment<'a> {
255    room: &'a Room,
256    filename: String,
257    content_type: &'a Mime,
258    data: Vec<u8>,
259    config: AttachmentConfig,
260    tracing_span: Span,
261    send_progress: SharedObservable<TransmissionProgress>,
262    store_in_cache: bool,
263}
264
265impl<'a> SendAttachment<'a> {
266    pub(crate) fn new(
267        room: &'a Room,
268        filename: String,
269        content_type: &'a Mime,
270        data: Vec<u8>,
271        config: AttachmentConfig,
272    ) -> Self {
273        Self {
274            room,
275            filename,
276            content_type,
277            data,
278            config,
279            tracing_span: Span::current(),
280            send_progress: Default::default(),
281            store_in_cache: false,
282        }
283    }
284
285    /// Replace the default `SharedObservable` used for tracking upload
286    /// progress.
287    pub fn with_send_progress_observable(
288        mut self,
289        send_progress: SharedObservable<TransmissionProgress>,
290    ) -> Self {
291        self.send_progress = send_progress;
292        self
293    }
294
295    /// Whether the sent attachment should be stored in the cache or not.
296    ///
297    /// If set to true, then retrieving the data for the attachment will result
298    /// in a cache hit immediately after upload.
299    pub fn store_in_cache(mut self) -> Self {
300        self.store_in_cache = true;
301        self
302    }
303}
304
305impl<'a> IntoFuture for SendAttachment<'a> {
306    type Output = Result<send_message_event::v3::Response>;
307    boxed_into_future!(extra_bounds: 'a);
308
309    fn into_future(self) -> Self::IntoFuture {
310        let Self {
311            room,
312            filename,
313            content_type,
314            data,
315            config,
316            tracing_span,
317            send_progress,
318            store_in_cache,
319        } = self;
320        let fut = async move {
321            room.prepare_and_send_attachment(
322                filename,
323                content_type,
324                data,
325                config,
326                send_progress,
327                store_in_cache,
328            )
329            .await
330        };
331
332        Box::pin(fut.instrument(tracing_span))
333    }
334}
335
336/// Future returned by [`Room::send_state_event_raw`].
337#[cfg(feature = "experimental-encrypted-state-events")]
338#[allow(missing_debug_implementations)]
339pub struct SendRawStateEvent<'a> {
340    room: &'a Room,
341    event_type: &'a str,
342    state_key: &'a str,
343    content: Raw<AnyStateEventContent>,
344    tracing_span: Span,
345    request_config: Option<RequestConfig>,
346}
347
348#[cfg(feature = "experimental-encrypted-state-events")]
349impl<'a> SendRawStateEvent<'a> {
350    pub(crate) fn new(
351        room: &'a Room,
352        event_type: &'a str,
353        state_key: &'a str,
354        content: impl IntoRawStateEventContent,
355    ) -> Self {
356        let content = content.into_raw_state_event_content();
357        Self {
358            room,
359            event_type,
360            state_key,
361            content,
362            tracing_span: Span::current(),
363            request_config: None,
364        }
365    }
366
367    /// Assign a given [`RequestConfig`] to configure how this request should
368    /// behave with respect to the network.
369    pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
370        self.request_config = Some(request_config);
371        self
372    }
373
374    /// Determines whether the inner state event should be encrypted before
375    /// sending.
376    ///
377    /// This method checks two conditions:
378    /// 1. Whether the room supports encrypted state events, by inspecting the
379    ///    room's encryption state.
380    /// 2. Whether the event type is considered "critical" or excluded from
381    ///    encryption under MSC4362.
382    ///
383    /// # Returns
384    ///
385    /// Returns `true` if the event should be encrypted, otherwise returns
386    /// `false`.
387    fn should_encrypt(room: &Room, event_type: &str) -> bool {
388        if !room.encryption_state().is_state_encrypted() {
389            trace!("Sending plaintext event as the room does NOT support encrypted state events.");
390            return false;
391        }
392
393        // Check the event is not critical.
394        if matches!(
395            event_type,
396            "m.room.create"
397                | "m.room.member"
398                | "m.room.join_rules"
399                | "m.room.power_levels"
400                | "m.room.third_party_invite"
401                | "m.room.history_visibility"
402                | "m.room.guest_access"
403                | "m.room.encryption"
404                | "m.space.child"
405                | "m.space.parent"
406        ) {
407            trace!("Sending plaintext event as its type is excluded from encryption.");
408            return false;
409        }
410
411        true
412    }
413}
414
415#[cfg(feature = "experimental-encrypted-state-events")]
416impl<'a> IntoFuture for SendRawStateEvent<'a> {
417    type Output = Result<send_state_event::v3::Response>;
418    boxed_into_future!(extra_bounds: 'a);
419
420    fn into_future(self) -> Self::IntoFuture {
421        let Self { room, mut event_type, state_key, mut content, tracing_span, request_config } =
422            self;
423
424        let fut = async move {
425            room.ensure_room_joined()?;
426
427            let mut state_key = state_key.to_owned();
428
429            if Self::should_encrypt(room, event_type) {
430                use tracing::debug;
431
432                Span::current().record("should_encrypt", true);
433                debug!(
434                    room_id = ?room.room_id(),
435                    "Sending encrypted event because the room is encrypted.",
436                );
437
438                ensure_room_encryption_ready(room).await?;
439
440                let olm = room.client.olm_machine().await;
441                let olm = olm.as_ref().expect("Olm machine wasn't started");
442
443                content = olm
444                    .encrypt_state_event_raw(room.room_id(), event_type, &state_key, &content)
445                    .await?
446                    .cast_unchecked();
447
448                state_key = format!("{event_type}:{state_key}");
449                event_type = "m.room.encrypted";
450            } else {
451                Span::current().record("should_encrypt", false);
452            }
453
454            let request = send_state_event::v3::Request::new_raw(
455                room.room_id().to_owned(),
456                event_type.into(),
457                state_key.to_owned(),
458                content,
459            );
460
461            let response = room.client.send(request).with_request_config(request_config).await?;
462
463            Span::current().record("event_id", tracing::field::debug(&response.event_id));
464            info!("Sent event in room");
465
466            Ok(response)
467        };
468
469        Box::pin(fut.instrument(tracing_span))
470    }
471}
472
473/// Future returned by `Room::send_state_event`.
474#[allow(missing_debug_implementations)]
475#[cfg(feature = "experimental-encrypted-state-events")]
476pub struct SendStateEvent<'a> {
477    room: &'a Room,
478    event_type: String,
479    state_key: String,
480    content: serde_json::Result<serde_json::Value>,
481    request_config: Option<RequestConfig>,
482}
483
484#[cfg(feature = "experimental-encrypted-state-events")]
485impl<'a> SendStateEvent<'a> {
486    pub(crate) fn new<C, K>(room: &'a Room, state_key: &K, content: C) -> Self
487    where
488        C: StateEventContent,
489        C::StateKey: Borrow<K>,
490        K: AsRef<str> + ?Sized,
491    {
492        let event_type = content.event_type().to_string();
493        let state_key = state_key.as_ref().to_owned();
494        let content = serde_json::to_value(&content);
495        Self { room, event_type, state_key, content, request_config: None }
496    }
497
498    /// Assign a given [`RequestConfig`] to configure how this request should
499    /// behave with respect to the network.
500    pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
501        self.request_config = Some(request_config);
502        self
503    }
504}
505
506#[cfg(feature = "experimental-encrypted-state-events")]
507impl<'a> IntoFuture for SendStateEvent<'a> {
508    type Output = Result<send_state_event::v3::Response>;
509    boxed_into_future!(extra_bounds: 'a);
510
511    fn into_future(self) -> Self::IntoFuture {
512        let Self { room, state_key, event_type, content, request_config } = self;
513        Box::pin(async move {
514            let content = content?;
515            assign!(room.send_state_event_raw(&event_type, &state_key, content), { request_config })
516                .await
517        })
518    }
519}
520
521/// Ensures the room is ready for encrypted events to be sent.
522#[cfg(feature = "e2e-encryption")]
523async fn ensure_room_encryption_ready(room: &Room) -> Result<()> {
524    if !room.are_members_synced() {
525        room.sync_members().await?;
526    }
527
528    // Query keys in case we don't have them for newly synced members.
529    //
530    // Note we do it all the time, because we might have sync'd members before
531    // sending a message (so didn't enter the above branch), but
532    // could have not query their keys ever.
533    room.query_keys_for_untracked_or_dirty_users().await?;
534
535    room.preshare_room_key().await?;
536
537    Ok(())
538}