matrix_sdk/widget/machine/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! No I/O logic of the [`WidgetDriver`].
16
17use std::{iter, time::Duration};
18
19use driver_req::UpdateDelayedEventRequest;
20use from_widget::UpdateDelayedEventResponse;
21use indexmap::IndexMap;
22use ruma::{
23    serde::{JsonObject, Raw},
24    OwnedRoomId,
25};
26use serde::Serialize;
27use serde_json::value::RawValue as RawJsonValue;
28use tracing::{debug, error, info, instrument, warn};
29use uuid::Uuid;
30
31use self::{
32    driver_req::{
33        AcquireCapabilities, MatrixDriverRequest, MatrixDriverRequestHandle,
34        ReadMessageLikeEventRequest, RequestOpenId,
35    },
36    from_widget::{
37        FromWidgetErrorResponse, FromWidgetRequest, ReadEventRequest, ReadEventResponse,
38        SupportedApiVersionsResponse,
39    },
40    incoming::{IncomingWidgetMessage, IncomingWidgetMessageKind},
41    openid::{OpenIdResponse, OpenIdState},
42    pending::{PendingRequests, RequestLimits},
43    to_widget::{
44        NotifyCapabilitiesChanged, NotifyNewMatrixEvent, NotifyOpenIdChanged, RequestCapabilities,
45        ToWidgetRequest, ToWidgetRequestHandle, ToWidgetResponse,
46    },
47};
48#[cfg(doc)]
49use super::WidgetDriver;
50use super::{
51    capabilities::{SEND_DELAYED_EVENT, UPDATE_DELAYED_EVENT},
52    filter::{MatrixEventContent, MatrixEventFilterInput},
53    Capabilities, StateKeySelector,
54};
55use crate::Result;
56
57mod driver_req;
58mod from_widget;
59mod incoming;
60mod openid;
61mod pending;
62#[cfg(test)]
63mod tests;
64mod to_widget;
65
66pub(crate) use self::{
67    driver_req::{MatrixDriverRequestData, ReadStateEventRequest, SendEventRequest},
68    from_widget::SendEventResponse,
69    incoming::{IncomingMessage, MatrixDriverResponse},
70};
71
72/// A command to perform in reaction to an [`IncomingMessage`].
73///
74/// There are also initial actions that may be performed at the creation of a
75/// [`WidgetMachine`].
76#[derive(Debug)]
77pub(crate) enum Action {
78    /// Send a raw message to the widget.
79    SendToWidget(String),
80
81    /// Command that is sent from the client widget API state machine to the
82    /// client (driver) that must be performed. Once the command is executed,
83    /// the client will typically generate an `Event` with the result of it.
84    MatrixDriverRequest {
85        /// Certain commands are typically answered with certain event once the
86        /// command is performed. The api state machine will "tag" each command
87        /// with some "cookie" (in this case just an ID), so that once the
88        /// result of the execution of this command is received, it could be
89        /// matched.
90        request_id: Uuid,
91
92        /// Data associated with this command.
93        data: MatrixDriverRequestData,
94    },
95
96    /// Subscribe to the events in the *current* room, i.e. a room which this
97    /// widget is instantiated with. The client is aware of the room.
98    Subscribe,
99
100    /// Unsuscribe from the events in the *current* room. Symmetrical to
101    /// `Subscribe`.
102    Unsubscribe,
103}
104
105/// No I/O state machine.
106///
107/// Handles interactions with the widget as well as the
108/// [`crate::widget::MatrixDriver`].
109pub(crate) struct WidgetMachine {
110    /// Unique identifier for the widget.
111    ///
112    /// Allows distinguishing different widgets.
113    widget_id: String,
114
115    /// The room to which this widget machine is attached.
116    room_id: OwnedRoomId,
117
118    /// Outstanding requests sent to the widget (mapped by uuid).
119    pending_to_widget_requests: PendingRequests<ToWidgetRequestMeta>,
120
121    /// Outstanding requests sent to the matrix driver (mapped by uuid).
122    pending_matrix_driver_requests: PendingRequests<MatrixDriverRequestMeta>,
123
124    /// Current negotiation state for capabilities.
125    capabilities: CapabilitiesState,
126}
127
128impl WidgetMachine {
129    /// Creates a new instance of a client widget API state machine.
130    /// Returns the client api handler as well as the channel to receive
131    /// actions (commands) from the client.
132    pub(crate) fn new(
133        widget_id: String,
134        room_id: OwnedRoomId,
135        init_on_content_load: bool,
136    ) -> (Self, Vec<Action>) {
137        let limits =
138            RequestLimits { max_pending_requests: 15, response_timeout: Duration::from_secs(10) };
139
140        let mut machine = Self {
141            widget_id,
142            room_id,
143            pending_to_widget_requests: PendingRequests::new(limits.clone()),
144            pending_matrix_driver_requests: PendingRequests::new(limits),
145            capabilities: CapabilitiesState::Unset,
146        };
147
148        let initial_actions =
149            if init_on_content_load { Vec::new() } else { machine.negotiate_capabilities() };
150
151        (machine, initial_actions)
152    }
153
154    /// Main entry point to drive the state machine.
155    pub(crate) fn process(&mut self, event: IncomingMessage) -> Vec<Action> {
156        // Clean up stale requests.
157        self.pending_to_widget_requests.remove_expired();
158        self.pending_matrix_driver_requests.remove_expired();
159
160        match event {
161            IncomingMessage::WidgetMessage(raw) => self.process_widget_message(&raw),
162
163            IncomingMessage::MatrixDriverResponse { request_id, response } => {
164                self.process_matrix_driver_response(request_id, response)
165            }
166
167            IncomingMessage::MatrixEventReceived(event) => {
168                let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
169                    error!("Received matrix event before capabilities negotiation");
170                    return Vec::new();
171                };
172
173                capabilities
174                    .raw_event_matches_read_filter(&event)
175                    .then(|| {
176                        let action = self.send_to_widget_request(NotifyNewMatrixEvent(event)).1;
177                        action.map(|a| vec![a]).unwrap_or_default()
178                    })
179                    .unwrap_or_default()
180            }
181        }
182    }
183
184    fn process_widget_message(&mut self, raw: &str) -> Vec<Action> {
185        let message = match serde_json::from_str::<IncomingWidgetMessage>(raw) {
186            Ok(msg) => msg,
187            Err(error) => {
188                error!("couldn't deserialize incoming widget message: {error}");
189                return Vec::new();
190            }
191        };
192
193        if message.widget_id != self.widget_id {
194            error!("Received a message from a wrong widget, ignoring");
195            return Vec::new();
196        }
197
198        match message.kind {
199            IncomingWidgetMessageKind::Request(request) => {
200                self.process_from_widget_request(message.request_id, request)
201            }
202            IncomingWidgetMessageKind::Response(response) => {
203                self.process_to_widget_response(message.request_id, response)
204            }
205        }
206    }
207
208    #[instrument(skip_all, fields(?request_id))]
209    fn process_from_widget_request(
210        &mut self,
211        request_id: String,
212        raw_request: Raw<FromWidgetRequest>,
213    ) -> Vec<Action> {
214        let request = match raw_request.deserialize() {
215            Ok(r) => r,
216            Err(e) => {
217                return vec![Self::send_from_widget_err_response(
218                    raw_request,
219                    FromWidgetErrorResponse::from_error(crate::Error::SerdeJson(e)),
220                )]
221            }
222        };
223
224        match request {
225            FromWidgetRequest::SupportedApiVersions {} => {
226                let response = SupportedApiVersionsResponse::new();
227                vec![Self::send_from_widget_response(raw_request, Ok(response))]
228            }
229
230            FromWidgetRequest::ContentLoaded {} => {
231                let mut response =
232                    vec![Self::send_from_widget_response(raw_request, Ok(JsonObject::new()))];
233                if self.capabilities.is_unset() {
234                    response.append(&mut self.negotiate_capabilities());
235                }
236                response
237            }
238
239            FromWidgetRequest::ReadEvent(req) => self
240                .process_read_event_request(req, raw_request)
241                .map(|a| vec![a])
242                .unwrap_or_default(),
243
244            FromWidgetRequest::SendEvent(req) => self
245                .process_send_event_request(req, raw_request)
246                .map(|a| vec![a])
247                .unwrap_or_default(),
248
249            FromWidgetRequest::GetOpenId {} => {
250                let (request, request_action) = self.send_matrix_driver_request(RequestOpenId);
251                request.then(|res, machine| {
252                    let response = match res {
253                        Ok(res) => OpenIdResponse::Allowed(OpenIdState::new(request_id, res)),
254                        Err(msg) => {
255                            info!("OpenID request failed: {msg}");
256                            OpenIdResponse::Blocked { original_request_id: request_id }
257                        }
258                    };
259
260                    let action = machine.send_to_widget_request(NotifyOpenIdChanged(response)).1;
261                    action.map(|a| vec![a]).unwrap_or_default()
262                });
263
264                let response =
265                    Self::send_from_widget_response(raw_request, Ok(OpenIdResponse::Pending));
266                iter::once(response).chain(request_action).collect()
267            }
268
269            FromWidgetRequest::DelayedEventUpdate(req) => {
270                let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
271                    return vec![Self::send_from_widget_error_string_response(
272                        raw_request,
273                        "Received send update delayed event request before capabilities were negotiated"
274                    )];
275                };
276
277                if !capabilities.update_delayed_event {
278                    return vec![Self::send_from_widget_error_string_response(
279                        raw_request,
280                        format!("Not allowed: missing the {UPDATE_DELAYED_EVENT} capability."),
281                    )];
282                }
283
284                let (request, request_action) =
285                    self.send_matrix_driver_request(UpdateDelayedEventRequest {
286                        action: req.action,
287                        delay_id: req.delay_id,
288                    });
289                request.then(|result, _machine| {
290                    vec![Self::send_from_widget_response(
291                        raw_request,
292                        // This is mapped to another type because the update_delay_event::Response
293                        // does not impl Serialize
294                        result
295                            .map(Into::<UpdateDelayedEventResponse>::into)
296                            .map_err(FromWidgetErrorResponse::from_error),
297                    )]
298                });
299
300                request_action.map(|a| vec![a]).unwrap_or_default()
301            }
302        }
303    }
304
305    fn process_read_event_request(
306        &mut self,
307        request: ReadEventRequest,
308        raw_request: Raw<FromWidgetRequest>,
309    ) -> Option<Action> {
310        let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
311            return Some(Self::send_from_widget_error_string_response(
312                raw_request,
313                "Received read event request before capabilities were negotiated",
314            ));
315        };
316
317        match request {
318            ReadEventRequest::ReadMessageLikeEvent { event_type, limit } => {
319                if !capabilities.read.iter().any(|f| f.matches_message_like_event_type(&event_type))
320                {
321                    return Some(Self::send_from_widget_error_string_response(
322                        raw_request,
323                        "Not allowed to read message like event",
324                    ));
325                }
326
327                const DEFAULT_EVENT_LIMIT: u32 = 50;
328                let limit = limit.unwrap_or(DEFAULT_EVENT_LIMIT);
329                let request = ReadMessageLikeEventRequest { event_type, limit };
330
331                let (request, action) = self.send_matrix_driver_request(request);
332
333                request.then(|result, machine| {
334                    let response = match &machine.capabilities {
335                        CapabilitiesState::Unset => Err(FromWidgetErrorResponse::from_string(
336                            "Received read event request before capabilities negotiation",
337                        )),
338                        CapabilitiesState::Negotiating => {
339                            Err(FromWidgetErrorResponse::from_string(
340                                "Received read event request while capabilities were negotiating",
341                            ))
342                        }
343                        CapabilitiesState::Negotiated(capabilities) => result
344                            .map(|mut events| {
345                                events.retain(|e| capabilities.raw_event_matches_read_filter(e));
346                                ReadEventResponse { events }
347                            })
348                            .map_err(FromWidgetErrorResponse::from_error),
349                    };
350
351                    vec![Self::send_from_widget_response(raw_request, response)]
352                });
353
354                action
355            }
356
357            ReadEventRequest::ReadStateEvent { event_type, state_key } => {
358                let allowed = match &state_key {
359                    StateKeySelector::Any => capabilities
360                        .read
361                        .iter()
362                        .any(|filter| filter.matches_state_event_with_any_state_key(&event_type)),
363
364                    StateKeySelector::Key(state_key) => {
365                        let filter_in = MatrixEventFilterInput {
366                            event_type: event_type.to_string().into(),
367                            state_key: Some(state_key.clone()),
368                            // content doesn't matter for state events
369                            content: MatrixEventContent::default(),
370                        };
371
372                        capabilities.read.iter().any(|filter| filter.matches(&filter_in))
373                    }
374                };
375
376                if allowed {
377                    let request = ReadStateEventRequest { event_type, state_key };
378                    let (request, action) = self.send_matrix_driver_request(request);
379                    request.then(|result, _machine| {
380                        let response = result
381                            .map(|events| ReadEventResponse { events })
382                            .map_err(FromWidgetErrorResponse::from_error);
383                        vec![Self::send_from_widget_response(raw_request, response)]
384                    });
385                    action
386                } else {
387                    Some(Self::send_from_widget_error_string_response(
388                        raw_request,
389                        "Not allowed to read state event",
390                    ))
391                }
392            }
393        }
394    }
395
396    fn process_send_event_request(
397        &mut self,
398        request: SendEventRequest,
399        raw_request: Raw<FromWidgetRequest>,
400    ) -> Option<Action> {
401        let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
402            error!("Received send event request before capabilities negotiation");
403            return None;
404        };
405
406        let filter_in = MatrixEventFilterInput {
407            event_type: request.event_type.clone(),
408            state_key: request.state_key.clone(),
409            content: serde_json::from_str(request.content.get()).unwrap_or_else(|e| {
410                debug!("Failed to deserialize event content for filter: {e}");
411                // Fallback to empty content is safe because there is no filter
412                // that matches with it when it otherwise wouldn't.
413                Default::default()
414            }),
415        };
416
417        if !capabilities.send_delayed_event && request.delay.is_some() {
418            return Some(Self::send_from_widget_error_string_response(
419                raw_request,
420                format!("Not allowed: missing the {SEND_DELAYED_EVENT} capability."),
421            ));
422        }
423
424        if !capabilities.send.iter().any(|filter| filter.matches(&filter_in)) {
425            return Some(Self::send_from_widget_error_string_response(
426                raw_request,
427                "Not allowed to send event",
428            ));
429        }
430
431        let (request, action) = self.send_matrix_driver_request(request);
432
433        request.then(|mut result, machine| {
434            if let Ok(r) = result.as_mut() {
435                r.set_room_id(machine.room_id.clone());
436            }
437            vec![Self::send_from_widget_response(
438                raw_request,
439                result.map_err(FromWidgetErrorResponse::from_error),
440            )]
441        });
442
443        action
444    }
445
446    #[instrument(skip_all, fields(?request_id))]
447    fn process_to_widget_response(
448        &mut self,
449        request_id: String,
450        response: ToWidgetResponse,
451    ) -> Vec<Action> {
452        let Ok(request_id) = Uuid::parse_str(&request_id) else {
453            error!("Response's request_id is not a valid UUID");
454            return Vec::new();
455        };
456
457        let request = match self.pending_to_widget_requests.extract(&request_id) {
458            Ok(r) => r,
459            Err(e) => {
460                warn!("{e}");
461                return Vec::new();
462            }
463        };
464
465        if response.action != request.action {
466            error!(
467                ?request.action, ?response.action,
468                "Received response with different `action` than request"
469            );
470            return Vec::new();
471        }
472
473        request
474            .response_fn
475            .map(|response_fn| response_fn(response.response_data, self))
476            .unwrap_or_default()
477    }
478
479    #[instrument(skip_all, fields(?request_id))]
480    fn process_matrix_driver_response(
481        &mut self,
482        request_id: Uuid,
483        response: Result<MatrixDriverResponse>,
484    ) -> Vec<Action> {
485        match self.pending_matrix_driver_requests.extract(&request_id) {
486            Ok(request) => request
487                .response_fn
488                .map(|response_fn| response_fn(response, self))
489                .unwrap_or_default(),
490            Err(e) => {
491                warn!("Could not find matching pending response: {e}");
492                Vec::new()
493            }
494        }
495    }
496
497    fn send_from_widget_error_string_response(
498        raw_request: Raw<FromWidgetRequest>,
499        error: impl Into<String>,
500    ) -> Action {
501        Self::send_from_widget_err_response(
502            raw_request,
503            FromWidgetErrorResponse::from_string(error),
504        )
505    }
506
507    fn send_from_widget_err_response(
508        raw_request: Raw<FromWidgetRequest>,
509        error: FromWidgetErrorResponse,
510    ) -> Action {
511        Self::send_from_widget_response(
512            raw_request,
513            Err::<serde_json::Value, FromWidgetErrorResponse>(error),
514        )
515    }
516
517    fn send_from_widget_response(
518        raw_request: Raw<FromWidgetRequest>,
519        result: Result<impl Serialize, FromWidgetErrorResponse>,
520    ) -> Action {
521        // we do not want tho expose this to never allow sending arbitrary errors.
522        // Errors always need to be `FromWidgetErrorResponse`.
523        #[instrument(skip_all)]
524        fn send_response_data(
525            raw_request: Raw<FromWidgetRequest>,
526            response_data: impl Serialize,
527        ) -> Action {
528            let f = || {
529                let mut object =
530                    raw_request.deserialize_as::<IndexMap<String, Box<RawJsonValue>>>()?;
531                let response_data = serde_json::value::to_raw_value(&response_data)?;
532                object.insert("response".to_owned(), response_data);
533                serde_json::to_string(&object)
534            };
535
536            // SAFETY: we expect the raw request to be a valid JSON map, to which we add a
537            // new field.
538            let serialized = f().expect("error when attaching response to incoming request");
539
540            Action::SendToWidget(serialized)
541        }
542
543        match result {
544            Ok(res) => send_response_data(raw_request, res),
545            Err(error_response) => send_response_data(raw_request, error_response),
546        }
547    }
548
549    #[instrument(skip_all, fields(action = T::ACTION))]
550    fn send_to_widget_request<T: ToWidgetRequest>(
551        &mut self,
552        to_widget_request: T,
553    ) -> (ToWidgetRequestHandle<'_, T::ResponseData>, Option<Action>) {
554        #[derive(Serialize)]
555        #[serde(tag = "api", rename = "toWidget", rename_all = "camelCase")]
556        struct ToWidgetRequestSerdeHelper<'a, T> {
557            widget_id: &'a str,
558            request_id: Uuid,
559            action: &'static str,
560            data: T,
561        }
562
563        let request_id = Uuid::new_v4();
564        let full_request = ToWidgetRequestSerdeHelper {
565            widget_id: &self.widget_id,
566            request_id,
567            action: T::ACTION,
568            data: to_widget_request,
569        };
570
571        let request_meta = ToWidgetRequestMeta::new(T::ACTION);
572        let Some(meta) = self.pending_to_widget_requests.insert(request_id, request_meta) else {
573            warn!("Reached limits of pending requests for toWidget requests");
574            return (ToWidgetRequestHandle::null(), None);
575        };
576
577        let serialized = serde_json::to_string(&full_request).expect("Failed to serialize request");
578        (ToWidgetRequestHandle::new(meta), Some(Action::SendToWidget(serialized)))
579    }
580
581    #[instrument(skip_all)]
582    fn send_matrix_driver_request<T: MatrixDriverRequest>(
583        &mut self,
584        request: T,
585    ) -> (MatrixDriverRequestHandle<'_, T::Response>, Option<Action>) {
586        let request_id = Uuid::new_v4();
587        let request_meta = MatrixDriverRequestMeta::new();
588        let Some(meta) = self.pending_matrix_driver_requests.insert(request_id, request_meta)
589        else {
590            warn!("Reached limits of pending requests for matrix driver requests");
591            return (MatrixDriverRequestHandle::null(), None);
592        };
593
594        let action = Action::MatrixDriverRequest { request_id, data: request.into() };
595        (MatrixDriverRequestHandle::new(meta), Some(action))
596    }
597
598    fn negotiate_capabilities(&mut self) -> Vec<Action> {
599        let unsubscribe_required =
600            matches!(&self.capabilities, CapabilitiesState::Negotiated(c) if !c.read.is_empty());
601        self.capabilities = CapabilitiesState::Negotiating;
602
603        let (request, action) = self.send_to_widget_request(RequestCapabilities {});
604        request.then(|response, machine| {
605            let requested = response.capabilities;
606            let (request, action) = machine.send_matrix_driver_request(AcquireCapabilities {
607                desired_capabilities: requested.clone(),
608            });
609
610            request.then(|result, machine| {
611                let approved = result.unwrap_or_else(|e| {
612                    error!("Acquiring capabilities failed: {e}");
613                    Capabilities::default()
614                });
615
616                let subscribe_required = !approved.read.is_empty();
617                machine.capabilities = CapabilitiesState::Negotiated(approved.clone());
618
619                let update = NotifyCapabilitiesChanged { approved, requested };
620                let (_request, action) = machine.send_to_widget_request(update);
621
622                subscribe_required.then_some(Action::Subscribe).into_iter().chain(action).collect()
623            });
624
625            action.map(|a| vec![a]).unwrap_or_default()
626        });
627
628        unsubscribe_required.then_some(Action::Unsubscribe).into_iter().chain(action).collect()
629    }
630}
631
632type ToWidgetResponseFn =
633    Box<dyn FnOnce(Box<RawJsonValue>, &mut WidgetMachine) -> Vec<Action> + Send>;
634
635pub(crate) struct ToWidgetRequestMeta {
636    action: &'static str,
637    response_fn: Option<ToWidgetResponseFn>,
638}
639
640impl ToWidgetRequestMeta {
641    fn new(action: &'static str) -> Self {
642        Self { action, response_fn: None }
643    }
644}
645
646type MatrixDriverResponseFn =
647    Box<dyn FnOnce(Result<MatrixDriverResponse>, &mut WidgetMachine) -> Vec<Action> + Send>;
648
649pub(crate) struct MatrixDriverRequestMeta {
650    response_fn: Option<MatrixDriverResponseFn>,
651}
652
653impl MatrixDriverRequestMeta {
654    fn new() -> Self {
655        Self { response_fn: None }
656    }
657}
658
659/// Current negotiation state for capabilities.
660enum CapabilitiesState {
661    /// Capabilities have never been defined.
662    Unset,
663    /// We're currently negotiating capabilities.
664    Negotiating,
665    /// The capabilities have already been negotiated.
666    Negotiated(Capabilities),
667}
668
669impl CapabilitiesState {
670    #[must_use]
671    fn is_unset(&self) -> bool {
672        matches!(self, Self::Unset)
673    }
674}