1#![deny(unreachable_pub)]
18
19use std::future::IntoFuture;
20
21use eyeball::SharedObservable;
22use matrix_sdk_common::boxed_into_future;
23use mime::Mime;
24#[cfg(doc)]
25use ruma::events::{MessageLikeUnsigned, SyncMessageLikeEvent};
26use ruma::{
27 api::client::message::send_message_event,
28 assign,
29 events::{AnyMessageLikeEventContent, MessageLikeEventContent},
30 serde::Raw,
31 OwnedTransactionId, TransactionId,
32};
33use tracing::{info, trace, Instrument, Span};
34
35use super::Room;
36use crate::{
37 attachment::AttachmentConfig, config::RequestConfig, utils::IntoRawMessageLikeEventContent,
38 Result, TransmissionProgress,
39};
40
41#[allow(missing_debug_implementations)]
43pub struct SendMessageLikeEvent<'a> {
44 room: &'a Room,
45 event_type: String,
46 content: serde_json::Result<serde_json::Value>,
47 transaction_id: Option<OwnedTransactionId>,
48 request_config: Option<RequestConfig>,
49}
50
51impl<'a> SendMessageLikeEvent<'a> {
52 pub(crate) fn new(room: &'a Room, content: impl MessageLikeEventContent) -> Self {
53 let event_type = content.event_type().to_string();
54 let content = serde_json::to_value(&content);
55 Self { room, event_type, content, transaction_id: None, request_config: None }
56 }
57
58 pub fn with_transaction_id(mut self, txn_id: OwnedTransactionId) -> Self {
76 self.transaction_id = Some(txn_id);
77 self
78 }
79
80 pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
83 self.request_config = Some(request_config);
84 self
85 }
86}
87
88impl<'a> IntoFuture for SendMessageLikeEvent<'a> {
89 type Output = Result<send_message_event::v3::Response>;
90 boxed_into_future!(extra_bounds: 'a);
91
92 fn into_future(self) -> Self::IntoFuture {
93 let Self { room, event_type, content, transaction_id, request_config } = self;
94 Box::pin(async move {
95 let content = content?;
96 assign!(room.send_raw(&event_type, content), { transaction_id, request_config }).await
97 })
98 }
99}
100
101#[allow(missing_debug_implementations)]
103pub struct SendRawMessageLikeEvent<'a> {
104 room: &'a Room,
105 event_type: &'a str,
106 content: Raw<AnyMessageLikeEventContent>,
107 tracing_span: Span,
108 transaction_id: Option<OwnedTransactionId>,
109 request_config: Option<RequestConfig>,
110}
111
112impl<'a> SendRawMessageLikeEvent<'a> {
113 pub(crate) fn new(
114 room: &'a Room,
115 event_type: &'a str,
116 content: impl IntoRawMessageLikeEventContent,
117 ) -> Self {
118 let content = content.into_raw_message_like_event_content();
119 Self {
120 room,
121 event_type,
122 content,
123 tracing_span: Span::current(),
124 transaction_id: None,
125 request_config: None,
126 }
127 }
128
129 pub fn with_transaction_id(mut self, txn_id: &TransactionId) -> Self {
144 self.transaction_id = Some(txn_id.to_owned());
145 self
146 }
147
148 pub fn with_request_config(mut self, request_config: RequestConfig) -> Self {
151 self.request_config = Some(request_config);
152 self
153 }
154}
155
156impl<'a> IntoFuture for SendRawMessageLikeEvent<'a> {
157 type Output = Result<send_message_event::v3::Response>;
158 boxed_into_future!(extra_bounds: 'a);
159
160 fn into_future(self) -> Self::IntoFuture {
161 #[cfg_attr(not(feature = "e2e-encryption"), allow(unused_mut))]
162 let Self {
163 room,
164 mut event_type,
165 mut content,
166 tracing_span,
167 transaction_id,
168 request_config,
169 } = self;
170
171 let fut = async move {
172 room.ensure_room_joined()?;
173
174 let txn_id = transaction_id.unwrap_or_else(TransactionId::new);
175 Span::current().record("transaction_id", tracing::field::debug(&txn_id));
176
177 #[cfg(not(feature = "e2e-encryption"))]
178 trace!("Sending plaintext event to room because we don't have encryption support.");
179
180 #[cfg(feature = "e2e-encryption")]
181 if room.is_encrypted().await? {
182 Span::current().record("is_room_encrypted", true);
183 if event_type == "m.reaction" {
186 trace!("Sending plaintext event because of the event type.");
187 } else {
188 trace!(
189 room_id = ?room.room_id(),
190 "Sending encrypted event because the room is encrypted.",
191 );
192
193 if !room.are_members_synced() {
194 room.sync_members().await?;
195 }
196
197 room.query_keys_for_untracked_users().await?;
203
204 room.preshare_room_key().await?;
205
206 let olm = room.client.olm_machine().await;
207 let olm = olm.as_ref().expect("Olm machine wasn't started");
208
209 content = olm
210 .encrypt_room_event_raw(room.room_id(), event_type, &content)
211 .await?
212 .cast();
213 event_type = "m.room.encrypted";
214 }
215 } else {
216 Span::current().record("is_room_encrypted", false);
217 trace!("Sending plaintext event because the room is NOT encrypted.",);
218 };
219
220 let request = send_message_event::v3::Request::new_raw(
221 room.room_id().to_owned(),
222 txn_id,
223 event_type.into(),
224 content,
225 );
226
227 let response = room.client.send(request).with_request_config(request_config).await?;
228
229 Span::current().record("event_id", tracing::field::debug(&response.event_id));
230 info!("Sent event in room");
231
232 Ok(response)
233 };
234
235 Box::pin(fut.instrument(tracing_span))
236 }
237}
238
239#[allow(missing_debug_implementations)]
241pub struct SendAttachment<'a> {
242 room: &'a Room,
243 filename: String,
244 content_type: &'a Mime,
245 data: Vec<u8>,
246 config: AttachmentConfig,
247 tracing_span: Span,
248 send_progress: SharedObservable<TransmissionProgress>,
249 store_in_cache: bool,
250}
251
252impl<'a> SendAttachment<'a> {
253 pub(crate) fn new(
254 room: &'a Room,
255 filename: String,
256 content_type: &'a Mime,
257 data: Vec<u8>,
258 config: AttachmentConfig,
259 ) -> Self {
260 Self {
261 room,
262 filename,
263 content_type,
264 data,
265 config,
266 tracing_span: Span::current(),
267 send_progress: Default::default(),
268 store_in_cache: false,
269 }
270 }
271
272 pub fn with_send_progress_observable(
275 mut self,
276 send_progress: SharedObservable<TransmissionProgress>,
277 ) -> Self {
278 self.send_progress = send_progress;
279 self
280 }
281
282 pub fn store_in_cache(mut self) -> Self {
287 self.store_in_cache = true;
288 self
289 }
290}
291
292impl<'a> IntoFuture for SendAttachment<'a> {
293 type Output = Result<send_message_event::v3::Response>;
294 boxed_into_future!(extra_bounds: 'a);
295
296 fn into_future(self) -> Self::IntoFuture {
297 let Self {
298 room,
299 filename,
300 content_type,
301 data,
302 config,
303 tracing_span,
304 send_progress,
305 store_in_cache,
306 } = self;
307 let fut = async move {
308 room.prepare_and_send_attachment(
309 filename,
310 content_type,
311 data,
312 config,
313 send_progress,
314 store_in_cache,
315 )
316 .await
317 };
318
319 Box::pin(fut.instrument(tracing_span))
320 }
321}