Skip to main content

matrix_sdk/widget/
matrix.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Matrix driver implementation that exposes Matrix functionality
16//! that is relevant for the widget API.
17
18use std::collections::{BTreeMap, BTreeSet};
19
20use as_variant::as_variant;
21use matrix_sdk_base::{
22    crypto::CollectStrategy,
23    deserialized_responses::{EncryptionInfo, RawAnySyncOrStrippedState},
24    media::{MediaFormat, MediaRequestParameters},
25    sync::State,
26};
27use ruma::{
28    EventId, OwnedDeviceId, OwnedMxcUri, OwnedUserId, RoomId, TransactionId,
29    api::client::{
30        account::request_openid_token::v3::{Request as OpenIdRequest, Response as OpenIdResponse},
31        delayed_events::{self, update_delayed_event::unstable::UpdateAction},
32        filter::RoomEventFilter,
33        to_device::send_event_to_device::v3::Request as RumaToDeviceRequest,
34    },
35    assign,
36    events::{
37        AnyMessageLikeEventContent, AnyStateEvent, AnyStateEventContent, AnySyncStateEvent,
38        AnySyncTimelineEvent, AnyTimelineEvent, AnyToDeviceEvent, AnyToDeviceEventContent,
39        MessageLikeEventType, StateEventType, TimelineEventType, ToDeviceEventType,
40        room::MediaSource,
41    },
42    serde::{Base64, Raw, from_raw_json_value},
43    to_device::DeviceIdOrAllDevices,
44};
45use serde::{Deserialize, Serialize};
46use serde_json::{Value, value::RawValue as RawJsonValue};
47use tokio::sync::{
48    broadcast::{Receiver, error::RecvError},
49    mpsc::{UnboundedReceiver, unbounded_channel},
50};
51use tracing::{error, trace, warn};
52
53use super::{StateKeySelector, machine::SendEventResponse};
54use crate::{
55    Client, Error, Result, Room, event_handler::EventHandlerDropGuard, room::MessagesOptions,
56    sync::RoomUpdate, widget::machine::SendToDeviceEventResponse,
57};
58
59/// Thin wrapper around a [`Room`] that provides functionality relevant for
60/// widgets.
61pub(crate) struct MatrixDriver {
62    room: Room,
63}
64
65impl MatrixDriver {
66    /// Creates a new `MatrixDriver` for a given `room`.
67    pub(crate) fn new(room: Room) -> Self {
68        Self { room }
69    }
70
71    /// Requests an OpenID token for the current user.
72    pub(crate) async fn get_open_id(&self) -> Result<OpenIdResponse> {
73        let user_id = self.room.own_user_id().to_owned();
74        self.room
75            .client
76            .send(OpenIdRequest::new(user_id))
77            .await
78            .map_err(|error| Error::Http(Box::new(error)))
79    }
80
81    /// Reads the latest `limit` events of a given `event_type` from the room's
82    /// timeline.
83    pub(crate) async fn read_events(
84        &self,
85        event_type: TimelineEventType,
86        state_key: Option<StateKeySelector>,
87        limit: u32,
88    ) -> Result<Vec<Raw<AnyTimelineEvent>>> {
89        let options = assign!(MessagesOptions::backward(), {
90            limit: limit.into(),
91            filter: assign!(RoomEventFilter::default(), {
92                types: Some(vec![event_type.to_string()])
93            }),
94        });
95
96        let messages = self.room.messages(options).await?;
97
98        Ok(messages
99            .chunk
100            .into_iter()
101            .map(|ev| ev.into_raw().cast_unchecked())
102            .filter(|ev| match &state_key {
103                Some(state_key) => {
104                    ev.get_field::<String>("state_key").is_ok_and(|key| match state_key {
105                        StateKeySelector::Key(state_key) => {
106                            key.is_some_and(|key| &key == state_key)
107                        }
108                        StateKeySelector::Any => key.is_some(),
109                    })
110                }
111                None => true,
112            })
113            .collect())
114    }
115
116    /// Reads the current values of the room state entries matching the given
117    /// `event_type` and `state_key` selections.
118    pub(crate) async fn read_state(
119        &self,
120        event_type: StateEventType,
121        state_key: &StateKeySelector,
122    ) -> Result<Vec<Raw<AnyStateEvent>>> {
123        let room_id = self.room.room_id();
124        let convert = |sync_or_stripped_state| match sync_or_stripped_state {
125            RawAnySyncOrStrippedState::Sync(ev) => Some(attach_room_id_state(&ev, room_id)),
126            RawAnySyncOrStrippedState::Stripped(_) => {
127                error!("MatrixDriver can't operate in invited rooms");
128                None
129            }
130        };
131
132        let events = match state_key {
133            StateKeySelector::Key(state_key) => self
134                .room
135                .get_state_event(event_type, state_key)
136                .await?
137                .and_then(convert)
138                .into_iter()
139                .collect(),
140            StateKeySelector::Any => {
141                let events = self.room.get_state_events(event_type).await?;
142                events.into_iter().filter_map(convert).collect()
143            }
144        };
145
146        Ok(events)
147    }
148
149    pub(crate) async fn download_attachment(&self, mxc_uri: OwnedMxcUri) -> Result<Base64> {
150        let req = MediaRequestParameters {
151            source: MediaSource::Plain(mxc_uri.to_owned()),
152            format: MediaFormat::File,
153        };
154        let response = self.room.client().media().get_media_content(&req, true).await?;
155        Ok(Base64::new(response))
156    }
157
158    /// Sends the given `event` to the room.
159    ///
160    /// This method allows the widget machine to handle widget requests by
161    /// providing a unified, high-level widget-specific API for sending events
162    /// to the room.
163    pub(crate) async fn send(
164        &self,
165        event_type: TimelineEventType,
166        state_key: Option<String>,
167        content: Box<RawJsonValue>,
168        delayed_event_parameters: Option<delayed_events::DelayParameters>,
169    ) -> Result<SendEventResponse> {
170        let type_str = event_type.to_string();
171
172        if let Some(redacts) = from_raw_json_value::<Value, serde_json::Error>(&content)
173            .ok()
174            .and_then(|b| b["redacts"].as_str().and_then(|s| EventId::parse(s).ok()))
175        {
176            return Ok(SendEventResponse::from_event_id(
177                self.room.redact(&redacts, None, None).await?.event_id,
178            ));
179        }
180
181        Ok(match (state_key, delayed_event_parameters) {
182            (None, None) => SendEventResponse::from_event_id(
183                self.room.send_raw(&type_str, content).await?.response.event_id,
184            ),
185
186            (Some(key), None) => SendEventResponse::from_event_id(
187                self.room.send_state_event_raw(&type_str, &key, content).await?.event_id,
188            ),
189
190            (None, Some(delayed_event_parameters)) => {
191                let r = delayed_events::delayed_message_event::unstable::Request::new_raw(
192                    self.room.room_id().to_owned(),
193                    TransactionId::new(),
194                    MessageLikeEventType::from(type_str),
195                    delayed_event_parameters,
196                    Raw::<AnyMessageLikeEventContent>::from_json(content),
197                );
198                self.room.client.send(r).await.map(|r| r.into())?
199            }
200
201            (Some(key), Some(delayed_event_parameters)) => {
202                let r = delayed_events::delayed_state_event::unstable::Request::new_raw(
203                    self.room.room_id().to_owned(),
204                    key,
205                    StateEventType::from(type_str),
206                    delayed_event_parameters,
207                    Raw::<AnyStateEventContent>::from_json(content),
208                );
209                self.room.client.send(r).await.map(|r| r.into())?
210            }
211        })
212    }
213
214    /// Send a request to the `/delayed_events`` endpoint ([MSC4140](https://github.com/matrix-org/matrix-spec-proposals/pull/4140))
215    /// This can be used to refresh cancel or send a Delayed Event (An Event
216    /// that is send ahead of time to the homeserver and gets distributed
217    /// once it times out.)
218    pub(crate) async fn update_delayed_event(
219        &self,
220        delay_id: String,
221        action: UpdateAction,
222    ) -> Result<delayed_events::update_delayed_event::unstable::Response> {
223        let r = delayed_events::update_delayed_event::unstable::Request::new(delay_id, action);
224        self.room.client.send(r).await.map_err(|error| Error::Http(Box::new(error)))
225    }
226
227    /// Starts forwarding new room events. Once the returned `EventReceiver`
228    /// is dropped, forwarding will be stopped.
229    pub(crate) fn events(&self) -> EventReceiver<Raw<AnyTimelineEvent>> {
230        let (tx, rx) = unbounded_channel();
231        let room_id = self.room.room_id().to_owned();
232
233        let handle = self.room.add_event_handler(move |raw: Raw<AnySyncTimelineEvent>| {
234            let _ = tx.send(attach_room_id(&raw, &room_id));
235            async {}
236        });
237        let drop_guard = self.room.client().event_handler_drop_guard(handle);
238
239        // The receiver will get a combination of state and message like events.
240        // These always come from the timeline (rather than the state section of the
241        // sync).
242        EventReceiver { rx, _drop_guard: drop_guard }
243    }
244
245    /// Starts forwarding new updates to room state.
246    pub(crate) fn state_updates(&self) -> StateUpdateReceiver {
247        StateUpdateReceiver { room_updates: self.room.subscribe_to_updates() }
248    }
249
250    /// Starts forwarding new room events. Once the returned `EventReceiver`
251    /// is dropped, forwarding will be stopped.
252    pub(crate) fn to_device_events(&self) -> EventReceiver<Raw<AnyToDeviceEvent>> {
253        let (tx, rx) = unbounded_channel();
254
255        let room_id = self.room.room_id().to_owned();
256        let to_device_handle = self.room.client().add_event_handler(
257
258            async move |raw: Raw<AnyToDeviceEvent>, encryption_info: Option<EncryptionInfo>, client: Client| {
259
260                // Some to-device traffic is used by the SDK for internal machinery.
261                // They should not be exposed to widgets.
262                if Self::should_filter_message_to_widget(&raw) {
263                    return;
264                }
265
266                // Encryption can be enabled after the widget has been instantiated,
267                // we want to keep track of the latest status
268                let Some(room) = client.get_room(&room_id) else {
269                    warn!("Room {room_id} not found in client.");
270                    return;
271                };
272
273                let room_encrypted = room.latest_encryption_state().await
274                    .map(|s| s.is_encrypted())
275                    // Default consider encrypted
276                    .unwrap_or(true);
277                if room_encrypted {
278                    // The room is encrypted so the to-device traffic should be too.
279                    if encryption_info.is_none() {
280                        warn!(
281                            ?room_id,
282                            "Received to-device event in clear for a widget in an e2e room, dropping."
283                        );
284                        return;
285                    }
286
287                    // There are no per-room specific decryption settings (trust requirements), so we can just send it to the
288                    // widget.
289
290                    // The raw to-device event contains more fields than the widget needs, so we need to clean it up
291                    // to only type/content/sender.
292                    #[derive(Deserialize, Serialize)]
293                    struct CleanEventHelper<'a> {
294                        #[serde(rename = "type")]
295                        event_type: String,
296                        #[serde(borrow)]
297                        content: &'a RawJsonValue,
298                        sender: String,
299                    }
300
301                    let _ = serde_json::from_str::<CleanEventHelper<'_>>(raw.json().get())
302                        .and_then(|clean_event_helper| {
303                            serde_json::value::to_raw_value(&clean_event_helper)
304                        })
305                        .map_err(|err| warn!(?room_id, "Unable to process to-device message for widget: {err}"))
306                        .map(|box_value | {
307                            tx.send(Raw::from_json(box_value))
308                        });
309
310                } else {
311                    // forward to the widget
312                    // It is ok to send an encrypted to-device message even if the room is clear.
313                    let _ = tx.send(raw);
314                }
315            },
316        );
317
318        let drop_guard = self.room.client().event_handler_drop_guard(to_device_handle);
319        EventReceiver { rx, _drop_guard: drop_guard }
320    }
321
322    fn should_filter_message_to_widget(raw_message: &Raw<AnyToDeviceEvent>) -> bool {
323        let Ok(Some(event_type)) = raw_message.get_field::<String>("type") else {
324            trace!("Invalid to-device message (no type) filtered out by widget driver.");
325            return true;
326        };
327
328        // Filter out all the internal crypto related traffic.
329        // The SDK has already zeroized the critical data, but let's not leak any
330        // information
331        let filtered = Self::is_internal_type(event_type.as_str());
332
333        if filtered {
334            trace!("To-device message of type <{event_type}> filtered out by widget driver.",);
335        }
336        filtered
337    }
338
339    fn is_internal_type(event_type: &str) -> bool {
340        matches!(
341            event_type,
342            "m.dummy"
343                | "m.room_key"
344                | "m.room_key_request"
345                | "m.forwarded_room_key"
346                | "m.key.verification.request"
347                | "m.key.verification.ready"
348                | "m.key.verification.start"
349                | "m.key.verification.cancel"
350                | "m.key.verification.accept"
351                | "m.key.verification.key"
352                | "m.key.verification.mac"
353                | "m.key.verification.done"
354                | "m.secret.request"
355                | "m.secret.send"
356                // drop utd traffic
357                | "m.room.encrypted"
358        )
359    }
360
361    /// If the room the widget is in is encrypted, then the to-device message
362    /// will be encrypted. If one of the named devices does not exist, then
363    /// the call will fail with an error.
364    pub(crate) async fn send_to_device(
365        &self,
366        event_type: ToDeviceEventType,
367        messages: BTreeMap<
368            OwnedUserId,
369            BTreeMap<DeviceIdOrAllDevices, Raw<AnyToDeviceEventContent>>,
370        >,
371    ) -> Result<SendToDeviceEventResponse> {
372        // TODO: block this at the negotiation stage, no reason to let widget believe
373        // they can do that
374        if Self::is_internal_type(&event_type.to_string()) {
375            warn!("Widget tried to send internal to-device message <{}>, ignoring", event_type);
376            // Silently return a success response, the widget will not receive the message
377            return Ok(Default::default());
378        }
379
380        let client = self.room.client();
381
382        let mut failures: BTreeMap<OwnedUserId, Vec<OwnedDeviceId>> = BTreeMap::new();
383
384        let room_encrypted = self
385            .room
386            .latest_encryption_state()
387            .await
388            .map(|s| s.is_encrypted())
389            // Default consider encrypted
390            .unwrap_or(true);
391
392        if room_encrypted {
393            trace!("Sending to-device message in encrypted room <{}>", self.room.room_id());
394
395            // The widget-api uses a [user -> device -> content] map, but the
396            // crypto-sdk API allow to encrypt a given content for multiple recipients.
397            // Lets convert the [user -> device -> content] to a [content -> user -> device
398            // map].
399            let mut content_to_recipients_map: BTreeMap<
400                &str,
401                BTreeMap<OwnedUserId, Vec<DeviceIdOrAllDevices>>,
402            > = BTreeMap::new();
403
404            for (user_id, device_map) in messages.iter() {
405                for (device_id, content) in device_map.iter() {
406                    content_to_recipients_map
407                        .entry(content.json().get())
408                        .or_default()
409                        .entry(user_id.clone())
410                        .or_default()
411                        .push(device_id.to_owned());
412                }
413            }
414
415            // Encrypt and send this content
416            for (content, user_to_list_of_device_id_or_all) in content_to_recipients_map {
417                self.encrypt_and_send_content_to_devices_helper(
418                    &event_type,
419                    content,
420                    user_to_list_of_device_id_or_all,
421                    &mut failures,
422                )
423                .await?
424            }
425
426            let failures = failures
427                .into_iter()
428                .map(|(u, list_of_devices)| {
429                    (u.into(), list_of_devices.into_iter().map(|d| d.into()).collect())
430                })
431                .collect();
432
433            let response = SendToDeviceEventResponse { failures };
434            Ok(response)
435        } else {
436            // send in clear
437            let request = RumaToDeviceRequest::new_raw(event_type, TransactionId::new(), messages);
438            client.send(request).await?;
439            Ok(Default::default())
440        }
441    }
442
443    // Helper to encrypt a single content to several devices
444    async fn encrypt_and_send_content_to_devices_helper(
445        &self,
446        event_type: &ToDeviceEventType,
447        content: &str,
448        user_to_list_of_device_id_or_all: BTreeMap<OwnedUserId, Vec<DeviceIdOrAllDevices>>,
449        failures: &mut BTreeMap<OwnedUserId, Vec<OwnedDeviceId>>,
450    ) -> Result<()> {
451        let client = self.room.client();
452        let mut recipient_devices = Vec::<_>::new();
453
454        for (user_id, recipient_device_ids) in user_to_list_of_device_id_or_all {
455            let user_devices = client.encryption().get_user_devices(&user_id).await?;
456
457            let user_devices = if recipient_device_ids.contains(&DeviceIdOrAllDevices::AllDevices) {
458                // If the user wants to send to all devices, there's nothing to filter and no
459                // need to inspect other entries in the user's device list.
460                let devices: Vec<_> = user_devices.devices().collect();
461                // TODO: What to do if the user has no devices?
462                if devices.is_empty() {
463                    warn!(
464                        "Recipient list contains `AllDevices` but no devices found for user {user_id}."
465                    )
466                }
467                // TODO: What if the `recipient_device_ids` has both
468                // `AllDevices` and other devices but one of the  other devices is not found.
469                if recipient_device_ids.len() > 1 {
470                    warn!(
471                        "The recipient_device_ids list for {user_id} contains both `AllDevices` and explicit `DeviceId` entries. Only consider `AllDevices`",
472                    );
473                }
474                devices
475            } else {
476                // If the user wants to send to only some devices, filter out any devices that
477                // aren't part of the recipient_device_ids list.
478                let filtered_devices = user_devices
479                    .devices()
480                    .map(|device| (device.device_id().to_owned(), device))
481                    .filter(|(device_id, _)| {
482                        recipient_device_ids
483                            .contains(&DeviceIdOrAllDevices::DeviceId(device_id.clone()))
484                    });
485
486                let (found_device_ids, devices): (BTreeSet<_>, Vec<_>) = filtered_devices.unzip();
487
488                let list_of_devices: BTreeSet<_> = recipient_device_ids
489                    .into_iter()
490                    .filter_map(|d| as_variant!(d, DeviceIdOrAllDevices::DeviceId))
491                    .collect();
492
493                // Let's now find any devices that are part of the recipient_device_ids list but
494                // were not found in our store.
495                let missing_devices: Vec<_> =
496                    list_of_devices.difference(&found_device_ids).map(|d| d.to_owned()).collect();
497                if !missing_devices.is_empty() {
498                    failures.insert(user_id, missing_devices);
499                }
500                devices
501            };
502
503            recipient_devices.extend(user_devices);
504        }
505
506        if !recipient_devices.is_empty() {
507            // need to group by content
508            let encrypt_and_send_failures = client
509                .encryption()
510                .encrypt_and_send_raw_to_device(
511                    recipient_devices.iter().collect(),
512                    &event_type.to_string(),
513                    Raw::from_json_string(content.to_owned())?,
514                    CollectStrategy::AllDevices,
515                )
516                .await?;
517
518            for (user_id, device_id) in encrypt_and_send_failures {
519                failures.entry(user_id).or_default().push(device_id)
520            }
521        }
522
523        Ok(())
524    }
525}
526
527/// A simple entity that wraps an `UnboundedReceiver`
528/// along with the drop guard for the room event handler.
529pub(crate) struct EventReceiver<E> {
530    rx: UnboundedReceiver<E>,
531    _drop_guard: EventHandlerDropGuard,
532}
533
534impl<T> EventReceiver<T> {
535    pub(crate) async fn recv(&mut self) -> Option<T> {
536        self.rx.recv().await
537    }
538}
539
540/// A simple entity that wraps an `UnboundedReceiver` for the room state update
541/// handler.
542pub(crate) struct StateUpdateReceiver {
543    room_updates: Receiver<RoomUpdate>,
544}
545
546impl StateUpdateReceiver {
547    pub(crate) async fn recv(&mut self) -> Result<Vec<Raw<AnyStateEvent>>, RecvError> {
548        loop {
549            match self.room_updates.recv().await? {
550                RoomUpdate::Joined { room, updates } => {
551                    let state_events = match updates.state {
552                        State::Before(events) => events,
553                        State::After(events) => events,
554                    };
555
556                    if !state_events.is_empty() {
557                        return Ok(state_events
558                            .into_iter()
559                            .map(|ev| attach_room_id_state(&ev, room.room_id()))
560                            .collect());
561                    }
562                }
563                _ => {
564                    error!("MatrixDriver can only operate in joined rooms");
565                    return Err(RecvError::Closed);
566                }
567            }
568        }
569    }
570}
571
572fn attach_room_id(raw_ev: &Raw<AnySyncTimelineEvent>, room_id: &RoomId) -> Raw<AnyTimelineEvent> {
573    let mut ev_obj =
574        raw_ev.deserialize_as_unchecked::<BTreeMap<String, Box<RawJsonValue>>>().unwrap();
575    ev_obj.insert("room_id".to_owned(), serde_json::value::to_raw_value(room_id).unwrap());
576    Raw::new(&ev_obj).unwrap().cast_unchecked()
577}
578
579fn attach_room_id_state(raw_ev: &Raw<AnySyncStateEvent>, room_id: &RoomId) -> Raw<AnyStateEvent> {
580    attach_room_id(raw_ev.cast_ref(), room_id).cast_unchecked()
581}
582
583#[cfg(test)]
584mod tests {
585    use insta;
586    use ruma::{events::AnyTimelineEvent, room_id, serde::Raw};
587    use serde_json::{Value, json};
588
589    use super::attach_room_id;
590
591    #[test]
592    fn test_add_room_id_to_raw() {
593        let raw = Raw::new(&json!({
594            "type": "m.room.message",
595            "event_id": "$1676512345:example.org",
596            "sender": "@user:example.org",
597            "origin_server_ts": 1676512345,
598            "content": {
599                "msgtype": "m.text",
600                "body": "Hello world"
601            }
602        }))
603        .unwrap()
604        .cast_unchecked();
605        let room_id = room_id!("!my_id:example.org");
606        let new = attach_room_id(&raw, room_id);
607
608        insta::with_settings!({prepend_module_to_snapshot => false}, {
609            insta::assert_json_snapshot!(new.deserialize_as::<Value>().unwrap())
610        });
611
612        let attached: AnyTimelineEvent = new.deserialize().unwrap();
613        assert_eq!(attached.room_id(), room_id);
614    }
615
616    #[test]
617    fn test_add_room_id_to_raw_override() {
618        // What would happen if there is already a room_id in the raw content?
619        // Ensure it is overridden with the given value
620        let raw = Raw::new(&json!({
621            "type": "m.room.message",
622            "event_id": "$1676512345:example.org",
623            "room_id": "!override_me:example.org",
624            "sender": "@user:example.org",
625            "origin_server_ts": 1676512345,
626            "content": {
627                "msgtype": "m.text",
628                "body": "Hello world"
629            }
630        }))
631        .unwrap()
632        .cast_unchecked();
633        let room_id = room_id!("!my_id:example.org");
634        let new = attach_room_id(&raw, room_id);
635
636        insta::with_settings!({prepend_module_to_snapshot => false}, {
637            insta::assert_json_snapshot!(new.deserialize_as::<Value>().unwrap())
638        });
639
640        let attached: AnyTimelineEvent = new.deserialize().unwrap();
641        assert_eq!(attached.room_id(), room_id);
642    }
643}