1use std::collections::{BTreeMap, BTreeSet};
19
20use as_variant::as_variant;
21use matrix_sdk_base::{
22 crypto::CollectStrategy,
23 deserialized_responses::{EncryptionInfo, RawAnySyncOrStrippedState},
24 media::{MediaFormat, MediaRequestParameters},
25 sync::State,
26};
27use ruma::{
28 EventId, OwnedDeviceId, OwnedMxcUri, OwnedUserId, RoomId, TransactionId,
29 api::client::{
30 account::request_openid_token::v3::{Request as OpenIdRequest, Response as OpenIdResponse},
31 delayed_events::{self, update_delayed_event::unstable::UpdateAction},
32 filter::RoomEventFilter,
33 to_device::send_event_to_device::v3::Request as RumaToDeviceRequest,
34 },
35 assign,
36 events::{
37 AnyMessageLikeEventContent, AnyStateEvent, AnyStateEventContent, AnySyncStateEvent,
38 AnySyncTimelineEvent, AnyTimelineEvent, AnyToDeviceEvent, AnyToDeviceEventContent,
39 MessageLikeEventType, StateEventType, TimelineEventType, ToDeviceEventType,
40 room::MediaSource,
41 },
42 serde::{Base64, Raw, from_raw_json_value},
43 to_device::DeviceIdOrAllDevices,
44};
45use serde::{Deserialize, Serialize};
46use serde_json::{Value, value::RawValue as RawJsonValue};
47use tokio::sync::{
48 broadcast::{Receiver, error::RecvError},
49 mpsc::{UnboundedReceiver, unbounded_channel},
50};
51use tracing::{error, trace, warn};
52
53use super::{StateKeySelector, machine::SendEventResponse};
54use crate::{
55 Client, Error, Result, Room, event_handler::EventHandlerDropGuard, room::MessagesOptions,
56 sync::RoomUpdate, widget::machine::SendToDeviceEventResponse,
57};
58
59pub(crate) struct MatrixDriver {
62 room: Room,
63}
64
65impl MatrixDriver {
66 pub(crate) fn new(room: Room) -> Self {
68 Self { room }
69 }
70
71 pub(crate) async fn get_open_id(&self) -> Result<OpenIdResponse> {
73 let user_id = self.room.own_user_id().to_owned();
74 self.room
75 .client
76 .send(OpenIdRequest::new(user_id))
77 .await
78 .map_err(|error| Error::Http(Box::new(error)))
79 }
80
81 pub(crate) async fn read_events(
84 &self,
85 event_type: TimelineEventType,
86 state_key: Option<StateKeySelector>,
87 limit: u32,
88 ) -> Result<Vec<Raw<AnyTimelineEvent>>> {
89 let options = assign!(MessagesOptions::backward(), {
90 limit: limit.into(),
91 filter: assign!(RoomEventFilter::default(), {
92 types: Some(vec![event_type.to_string()])
93 }),
94 });
95
96 let messages = self.room.messages(options).await?;
97
98 Ok(messages
99 .chunk
100 .into_iter()
101 .map(|ev| ev.into_raw().cast_unchecked())
102 .filter(|ev| match &state_key {
103 Some(state_key) => {
104 ev.get_field::<String>("state_key").is_ok_and(|key| match state_key {
105 StateKeySelector::Key(state_key) => {
106 key.is_some_and(|key| &key == state_key)
107 }
108 StateKeySelector::Any => key.is_some(),
109 })
110 }
111 None => true,
112 })
113 .collect())
114 }
115
116 pub(crate) async fn read_state(
119 &self,
120 event_type: StateEventType,
121 state_key: &StateKeySelector,
122 ) -> Result<Vec<Raw<AnyStateEvent>>> {
123 let room_id = self.room.room_id();
124 let convert = |sync_or_stripped_state| match sync_or_stripped_state {
125 RawAnySyncOrStrippedState::Sync(ev) => Some(attach_room_id_state(&ev, room_id)),
126 RawAnySyncOrStrippedState::Stripped(_) => {
127 error!("MatrixDriver can't operate in invited rooms");
128 None
129 }
130 };
131
132 let events = match state_key {
133 StateKeySelector::Key(state_key) => self
134 .room
135 .get_state_event(event_type, state_key)
136 .await?
137 .and_then(convert)
138 .into_iter()
139 .collect(),
140 StateKeySelector::Any => {
141 let events = self.room.get_state_events(event_type).await?;
142 events.into_iter().filter_map(convert).collect()
143 }
144 };
145
146 Ok(events)
147 }
148
149 pub(crate) async fn download_attachment(&self, mxc_uri: OwnedMxcUri) -> Result<Base64> {
150 let req = MediaRequestParameters {
151 source: MediaSource::Plain(mxc_uri.to_owned()),
152 format: MediaFormat::File,
153 };
154 let response = self.room.client().media().get_media_content(&req, true).await?;
155 Ok(Base64::new(response))
156 }
157
158 pub(crate) async fn send(
164 &self,
165 event_type: TimelineEventType,
166 state_key: Option<String>,
167 content: Box<RawJsonValue>,
168 delayed_event_parameters: Option<delayed_events::DelayParameters>,
169 ) -> Result<SendEventResponse> {
170 let type_str = event_type.to_string();
171
172 if let Some(redacts) = from_raw_json_value::<Value, serde_json::Error>(&content)
173 .ok()
174 .and_then(|b| b["redacts"].as_str().and_then(|s| EventId::parse(s).ok()))
175 {
176 return Ok(SendEventResponse::from_event_id(
177 self.room.redact(&redacts, None, None).await?.event_id,
178 ));
179 }
180
181 Ok(match (state_key, delayed_event_parameters) {
182 (None, None) => SendEventResponse::from_event_id(
183 self.room.send_raw(&type_str, content).await?.response.event_id,
184 ),
185
186 (Some(key), None) => SendEventResponse::from_event_id(
187 self.room.send_state_event_raw(&type_str, &key, content).await?.event_id,
188 ),
189
190 (None, Some(delayed_event_parameters)) => {
191 let r = delayed_events::delayed_message_event::unstable::Request::new_raw(
192 self.room.room_id().to_owned(),
193 TransactionId::new(),
194 MessageLikeEventType::from(type_str),
195 delayed_event_parameters,
196 Raw::<AnyMessageLikeEventContent>::from_json(content),
197 );
198 self.room.client.send(r).await.map(|r| r.into())?
199 }
200
201 (Some(key), Some(delayed_event_parameters)) => {
202 let r = delayed_events::delayed_state_event::unstable::Request::new_raw(
203 self.room.room_id().to_owned(),
204 key,
205 StateEventType::from(type_str),
206 delayed_event_parameters,
207 Raw::<AnyStateEventContent>::from_json(content),
208 );
209 self.room.client.send(r).await.map(|r| r.into())?
210 }
211 })
212 }
213
214 pub(crate) async fn update_delayed_event(
219 &self,
220 delay_id: String,
221 action: UpdateAction,
222 ) -> Result<delayed_events::update_delayed_event::unstable::Response> {
223 let r = delayed_events::update_delayed_event::unstable::Request::new(delay_id, action);
224 self.room.client.send(r).await.map_err(|error| Error::Http(Box::new(error)))
225 }
226
227 pub(crate) fn events(&self) -> EventReceiver<Raw<AnyTimelineEvent>> {
230 let (tx, rx) = unbounded_channel();
231 let room_id = self.room.room_id().to_owned();
232
233 let handle = self.room.add_event_handler(move |raw: Raw<AnySyncTimelineEvent>| {
234 let _ = tx.send(attach_room_id(&raw, &room_id));
235 async {}
236 });
237 let drop_guard = self.room.client().event_handler_drop_guard(handle);
238
239 EventReceiver { rx, _drop_guard: drop_guard }
243 }
244
245 pub(crate) fn state_updates(&self) -> StateUpdateReceiver {
247 StateUpdateReceiver { room_updates: self.room.subscribe_to_updates() }
248 }
249
250 pub(crate) fn to_device_events(&self) -> EventReceiver<Raw<AnyToDeviceEvent>> {
253 let (tx, rx) = unbounded_channel();
254
255 let room_id = self.room.room_id().to_owned();
256 let to_device_handle = self.room.client().add_event_handler(
257
258 async move |raw: Raw<AnyToDeviceEvent>, encryption_info: Option<EncryptionInfo>, client: Client| {
259
260 if Self::should_filter_message_to_widget(&raw) {
263 return;
264 }
265
266 let Some(room) = client.get_room(&room_id) else {
269 warn!("Room {room_id} not found in client.");
270 return;
271 };
272
273 let room_encrypted = room.latest_encryption_state().await
274 .map(|s| s.is_encrypted())
275 .unwrap_or(true);
277 if room_encrypted {
278 if encryption_info.is_none() {
280 warn!(
281 ?room_id,
282 "Received to-device event in clear for a widget in an e2e room, dropping."
283 );
284 return;
285 }
286
287 #[derive(Deserialize, Serialize)]
293 struct CleanEventHelper<'a> {
294 #[serde(rename = "type")]
295 event_type: String,
296 #[serde(borrow)]
297 content: &'a RawJsonValue,
298 sender: String,
299 }
300
301 let _ = serde_json::from_str::<CleanEventHelper<'_>>(raw.json().get())
302 .and_then(|clean_event_helper| {
303 serde_json::value::to_raw_value(&clean_event_helper)
304 })
305 .map_err(|err| warn!(?room_id, "Unable to process to-device message for widget: {err}"))
306 .map(|box_value | {
307 tx.send(Raw::from_json(box_value))
308 });
309
310 } else {
311 let _ = tx.send(raw);
314 }
315 },
316 );
317
318 let drop_guard = self.room.client().event_handler_drop_guard(to_device_handle);
319 EventReceiver { rx, _drop_guard: drop_guard }
320 }
321
322 fn should_filter_message_to_widget(raw_message: &Raw<AnyToDeviceEvent>) -> bool {
323 let Ok(Some(event_type)) = raw_message.get_field::<String>("type") else {
324 trace!("Invalid to-device message (no type) filtered out by widget driver.");
325 return true;
326 };
327
328 let filtered = Self::is_internal_type(event_type.as_str());
332
333 if filtered {
334 trace!("To-device message of type <{event_type}> filtered out by widget driver.",);
335 }
336 filtered
337 }
338
339 fn is_internal_type(event_type: &str) -> bool {
340 matches!(
341 event_type,
342 "m.dummy"
343 | "m.room_key"
344 | "m.room_key_request"
345 | "m.forwarded_room_key"
346 | "m.key.verification.request"
347 | "m.key.verification.ready"
348 | "m.key.verification.start"
349 | "m.key.verification.cancel"
350 | "m.key.verification.accept"
351 | "m.key.verification.key"
352 | "m.key.verification.mac"
353 | "m.key.verification.done"
354 | "m.secret.request"
355 | "m.secret.send"
356 | "m.room.encrypted"
358 )
359 }
360
361 pub(crate) async fn send_to_device(
365 &self,
366 event_type: ToDeviceEventType,
367 messages: BTreeMap<
368 OwnedUserId,
369 BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
370 >,
371 ) -> Result<SendToDeviceEventResponse> {
372 if Self::is_internal_type(&event_type.to_string()) {
375 warn!("Widget tried to send internal to-device message <{}>, ignoring", event_type);
376 return Ok(Default::default());
378 }
379
380 let client = self.room.client();
381
382 let mut failures: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>> = BTreeMap::new();
383
384 let room_encrypted = self
385 .room
386 .latest_encryption_state()
387 .await
388 .map(|s| s.is_encrypted())
389 .unwrap_or(true);
391
392 if room_encrypted {
393 trace!("Sending to-device message in encrypted room <{}>", self.room.room_id());
394
395 let mut content_to_recipients_map: BTreeMap<
400 &str,
401 BTreeMap<OwnedUserId, Vec<DeviceIdOrAllDevices>>,
402 > = BTreeMap::new();
403
404 for (user_id, device_map) in messages.iter() {
405 for (device_id, content) in device_map.iter() {
406 content_to_recipients_map
407 .entry(content.json().get())
408 .or_default()
409 .entry(user_id.clone())
410 .or_default()
411 .push(device_id.to_owned());
412 }
413 }
414
415 for (content, user_to_list_of_device_id_or_all) in content_to_recipients_map {
417 self.encrypt_and_send_content_to_devices_helper(
418 &event_type,
419 content,
420 user_to_list_of_device_id_or_all,
421 &mut failures,
422 )
423 .await?
424 }
425
426 let failures = failures
427 .into_iter()
428 .map(|(u, list_of_devices)| {
429 (u.into(), list_of_devices.into_iter().map(|d| d.into()).collect())
430 })
431 .collect();
432
433 let response = SendToDeviceEventResponse { failures };
434 Ok(response)
435 } else {
436 let request = RumaToDeviceRequest::new_raw(event_type, TransactionId::new(), messages);
438 client.send(request).await?;
439 Ok(Default::default())
440 }
441 }
442
443 async fn encrypt_and_send_content_to_devices_helper(
445 &self,
446 event_type: &ToDeviceEventType,
447 content: &str,
448 user_to_list_of_device_id_or_all: BTreeMap<OwnedUserId, Vec<DeviceIdOrAllDevices>>,
449 failures: &mut BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
450 ) -> Result<()> {
451 let client = self.room.client();
452 let mut recipient_devices = Vec::<_>::new();
453
454 for (user_id, recipient_device_ids) in user_to_list_of_device_id_or_all {
455 let user_devices = client.encryption().get_user_devices(&user_id).await?;
456
457 let user_devices = if recipient_device_ids.contains(&DeviceIdOrAllDevices::AllDevices) {
458 let devices: Vec<_> = user_devices.devices().collect();
461 if devices.is_empty() {
463 warn!(
464 "Recipient list contains `AllDevices` but no devices found for user {user_id}."
465 )
466 }
467 if recipient_device_ids.len() > 1 {
470 warn!(
471 "The recipient_device_ids list for {user_id} contains both `AllDevices` and explicit `DeviceId` entries. Only consider `AllDevices`",
472 );
473 }
474 devices
475 } else {
476 let filtered_devices = user_devices
479 .devices()
480 .map(|device| (device.device_id().to_owned(), device))
481 .filter(|(device_id, _)| {
482 recipient_device_ids
483 .contains(&DeviceIdOrAllDevices::DeviceId(device_id.clone()))
484 });
485
486 let (found_device_ids, devices): (BTreeSet<_>, Vec<_>) = filtered_devices.unzip();
487
488 let list_of_devices: BTreeSet<_> = recipient_device_ids
489 .into_iter()
490 .filter_map(|d| as_variant!(d, DeviceIdOrAllDevices::DeviceId))
491 .collect();
492
493 let missing_devices: Vec<_> =
496 list_of_devices.difference(&found_device_ids).map(|d| d.to_owned()).collect();
497 if !missing_devices.is_empty() {
498 failures.insert(user_id, missing_devices);
499 }
500 devices
501 };
502
503 recipient_devices.extend(user_devices);
504 }
505
506 if !recipient_devices.is_empty() {
507 let encrypt_and_send_failures = client
509 .encryption()
510 .encrypt_and_send_raw_to_device(
511 recipient_devices.iter().collect(),
512 &event_type.to_string(),
513 Raw::from_json_string(content.to_owned())?,
514 CollectStrategy::AllDevices,
515 )
516 .await?;
517
518 for (user_id, device_id) in encrypt_and_send_failures {
519 failures.entry(user_id).or_default().push(device_id)
520 }
521 }
522
523 Ok(())
524 }
525}
526
527pub(crate) struct EventReceiver<E> {
530 rx: UnboundedReceiver<E>,
531 _drop_guard: EventHandlerDropGuard,
532}
533
534impl<T> EventReceiver<T> {
535 pub(crate) async fn recv(&mut self) -> Option<T> {
536 self.rx.recv().await
537 }
538}
539
540pub(crate) struct StateUpdateReceiver {
543 room_updates: Receiver<RoomUpdate>,
544}
545
546impl StateUpdateReceiver {
547 pub(crate) async fn recv(&mut self) -> Result<Vec<Raw<AnyStateEvent>>, RecvError> {
548 loop {
549 match self.room_updates.recv().await? {
550 RoomUpdate::Joined { room, updates } => {
551 let state_events = match updates.state {
552 State::Before(events) => events,
553 State::After(events) => events,
554 };
555
556 if !state_events.is_empty() {
557 return Ok(state_events
558 .into_iter()
559 .map(|ev| attach_room_id_state(&ev, room.room_id()))
560 .collect());
561 }
562 }
563 _ => {
564 error!("MatrixDriver can only operate in joined rooms");
565 return Err(RecvError::Closed);
566 }
567 }
568 }
569 }
570}
571
572fn attach_room_id(raw_ev: &Raw<AnySyncTimelineEvent>, room_id: &RoomId) -> Raw<AnyTimelineEvent> {
573 let mut ev_obj =
574 raw_ev.deserialize_as_unchecked::<BTreeMap<String, Box<RawJsonValue>>>().unwrap();
575 ev_obj.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id).unwrap());
576 Raw::new(&ev_obj).unwrap().cast_unchecked()
577}
578
579fn attach_room_id_state(raw_ev: &Raw<AnySyncStateEvent>, room_id: &RoomId) -> Raw<AnyStateEvent> {
580 attach_room_id(raw_ev.cast_ref(), room_id).cast_unchecked()
581}
582
583#[cfg(test)]
584mod tests {
585 use insta;
586 use ruma::{events::AnyTimelineEvent, room_id, serde::Raw};
587 use serde_json::{Value, json};
588
589 use super::attach_room_id;
590
591 #[test]
592 fn test_add_room_id_to_raw() {
593 let raw = Raw::new(&json!({
594 "type": "m.room.message",
595 "event_id": "$1676512345:example.org",
596 "sender": "@user:example.org",
597 "origin_server_ts": 1676512345,
598 "content": {
599 "msgtype": "m.text",
600 "body": "Hello world"
601 }
602 }))
603 .unwrap()
604 .cast_unchecked();
605 let room_id = room_id!("!my_id:example.org");
606 let new = attach_room_id(&raw, room_id);
607
608 insta::with_settings!({prepend_module_to_snapshot => false}, {
609 insta::assert_json_snapshot!(new.deserialize_as::<Value>().unwrap())
610 });
611
612 let attached: AnyTimelineEvent = new.deserialize().unwrap();
613 assert_eq!(attached.room_id(), room_id);
614 }
615
616 #[test]
617 fn test_add_room_id_to_raw_override() {
618 let raw = Raw::new(&json!({
621 "type": "m.room.message",
622 "event_id": "$1676512345:example.org",
623 "room_id": "!override_me:example.org",
624 "sender": "@user:example.org",
625 "origin_server_ts": 1676512345,
626 "content": {
627 "msgtype": "m.text",
628 "body": "Hello world"
629 }
630 }))
631 .unwrap()
632 .cast_unchecked();
633 let room_id = room_id!("!my_id:example.org");
634 let new = attach_room_id(&raw, room_id);
635
636 insta::with_settings!({prepend_module_to_snapshot => false}, {
637 insta::assert_json_snapshot!(new.deserialize_as::<Value>().unwrap())
638 });
639
640 let attached: AnyTimelineEvent = new.deserialize().unwrap();
641 assert_eq!(attached.room_id(), room_id);
642 }
643}