matrix_sdk/widget/machine/
mod.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! No I/O logic of the [`WidgetDriver`].
16
17use std::time::Duration;
18
19use driver_req::{ReadStateRequest, UpdateDelayedEventRequest};
20use from_widget::UpdateDelayedEventResponse;
21use indexmap::IndexMap;
22use ruma::{
23    OwnedRoomId,
24    events::{AnyStateEvent, AnyTimelineEvent},
25    serde::{JsonObject, Raw},
26};
27use serde::Serialize;
28use serde_json::value::RawValue as RawJsonValue;
29use to_widget::NotifyNewToDeviceMessage;
30use tracing::{error, info, instrument, warn};
31use uuid::Uuid;
32
33use self::{
34    driver_req::{
35        AcquireCapabilities, MatrixDriverRequest, MatrixDriverRequestHandle, RequestOpenId,
36    },
37    from_widget::{
38        FromWidgetErrorResponse, FromWidgetRequest, ReadEventsResponse,
39        SupportedApiVersionsResponse,
40    },
41    incoming::{IncomingWidgetMessage, IncomingWidgetMessageKind},
42    openid::{OpenIdResponse, OpenIdState},
43    pending::{PendingRequests, RequestLimits},
44    to_widget::{
45        NotifyCapabilitiesChanged, NotifyNewMatrixEvent, NotifyOpenIdChanged, NotifyStateUpdate,
46        RequestCapabilities, ToWidgetRequest, ToWidgetRequestHandle, ToWidgetResponse,
47    },
48};
49#[cfg(doc)]
50use super::WidgetDriver;
51use super::{
52    Capabilities, StateEventFilter, StateKeySelector,
53    capabilities::{SEND_DELAYED_EVENT, UPDATE_DELAYED_EVENT},
54    filter::FilterInput,
55};
56use crate::{Error, Result, widget::Filter};
57
58mod driver_req;
59mod from_widget;
60mod incoming;
61mod openid;
62mod pending;
63#[cfg(test)]
64mod tests;
65mod to_widget;
66
67pub(crate) use self::{
68    driver_req::{MatrixDriverRequestData, SendEventRequest, SendToDeviceRequest},
69    from_widget::{SendEventResponse, SendToDeviceEventResponse},
70    incoming::{IncomingMessage, MatrixDriverResponse},
71};
72
73/// A command to perform in reaction to an [`IncomingMessage`].
74///
75/// There are also initial actions that may be performed at the creation of a
76/// [`WidgetMachine`].
77#[derive(Debug)]
78pub(crate) enum Action {
79    /// Send a raw message to the widget.
80    SendToWidget(String),
81
82    /// Command that is sent from the client widget API state machine to the
83    /// client (driver) that must be performed. Once the command is executed,
84    /// the client will typically generate an `Event` with the result of it.
85    MatrixDriverRequest {
86        /// Certain commands are typically answered with certain event once the
87        /// command is performed. The api state machine will "tag" each command
88        /// with some "cookie" (in this case just an ID), so that once the
89        /// result of the execution of this command is received, it could be
90        /// matched.
91        request_id: Uuid,
92
93        /// Data associated with this command.
94        data: MatrixDriverRequestData,
95    },
96
97    /// Subscribe to the events that the widget capabilities allow,
98    /// in the _current_ room, i.e. a room which this widget is instantiated
99    /// with. The client is aware of the room.
100    Subscribe,
101
102    /// Unsubscribe from the events that the widget capabilities allow,
103    /// in the _current_ room. Symmetrical to `Subscribe`.
104    Unsubscribe,
105}
106
107/// An initial state update which is in the process of being computed.
108#[derive(Debug)]
109struct InitialStateUpdate {
110    /// The results of the read state requests which establish the initial state
111    /// to be pushed to the widget.
112    initial_state: Vec<Vec<Raw<AnyStateEvent>>>,
113    /// The total number of read state requests that `initial_state` must hold
114    /// for this update to be considered complete.
115    request_count: usize,
116    /// The data carried by any state updates which raced with the requests to
117    /// read the initial state. These should be pushed to the widget
118    /// immediately after pushing the initial state to ensure no data is
119    /// lost.
120    postponed_updates: Vec<Vec<Raw<AnyStateEvent>>>,
121}
122
123/// No I/O state machine.
124///
125/// Handles interactions with the widget as well as the
126/// [`crate::widget::MatrixDriver`].
127pub(crate) struct WidgetMachine {
128    /// Unique identifier for the widget.
129    ///
130    /// Allows distinguishing different widgets.
131    widget_id: String,
132
133    /// The room to which this widget machine is attached.
134    room_id: OwnedRoomId,
135
136    /// Outstanding requests sent to the widget (mapped by uuid).
137    pending_to_widget_requests: PendingRequests<ToWidgetRequestMeta>,
138
139    /// Outstanding requests sent to the Matrix driver (mapped by uuid).
140    pending_matrix_driver_requests: PendingRequests<MatrixDriverRequestMeta>,
141
142    /// Outstanding state updates waiting to be sent to the widget.
143    ///
144    /// Whenever the widget is approved to read a set of room state entries, we
145    /// want to push an initial state to the widget in a single
146    /// [`NotifyStateUpdate`] action. However, multiple asynchronous
147    /// requests must be sent to the driver to gather this data. Therefore
148    /// we use this field to hold the responses to the driver requests while
149    /// some of them are still in flight. It is set to `Some` whenever the
150    /// widget is approved to read some room state, and reset to `None` as
151    /// soon as the [`NotifyStateUpdate`] action is emitted.
152    pending_state_updates: Option<InitialStateUpdate>,
153
154    /// Current negotiation state for capabilities.
155    capabilities: CapabilitiesState,
156}
157
158impl WidgetMachine {
159    /// Creates a new instance of a client widget API state machine.
160    /// Returns the client api handler as well as the channel to receive
161    /// actions (commands) from the client.
162    pub(crate) fn new(
163        widget_id: String,
164        room_id: OwnedRoomId,
165        init_on_content_load: bool,
166    ) -> (Self, Vec<Action>) {
167        let limits =
168            RequestLimits { max_pending_requests: 15, response_timeout: Duration::from_secs(10) };
169
170        let mut machine = Self {
171            widget_id,
172            room_id,
173            pending_to_widget_requests: PendingRequests::new(limits.clone()),
174            pending_matrix_driver_requests: PendingRequests::new(limits),
175            pending_state_updates: None,
176            capabilities: CapabilitiesState::Unset,
177        };
178
179        let initial_actions =
180            if init_on_content_load { Vec::new() } else { machine.negotiate_capabilities() };
181
182        (machine, initial_actions)
183    }
184
185    /// Main entry point to drive the state machine.
186    pub(crate) fn process(&mut self, event: IncomingMessage) -> Vec<Action> {
187        // Clean up stale requests.
188        self.pending_to_widget_requests.remove_expired();
189        self.pending_matrix_driver_requests.remove_expired();
190
191        match event {
192            IncomingMessage::WidgetMessage(widget_message_raw) => {
193                self.process_widget_message(&widget_message_raw)
194            }
195            IncomingMessage::MatrixDriverResponse { request_id, response } => {
196                self.process_matrix_driver_response(request_id, response)
197            }
198            IncomingMessage::MatrixEventReceived(event_raw) => {
199                let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
200                    error!("Received Matrix event before capabilities negotiation");
201                    return Vec::new();
202                };
203
204                if capabilities.allow_reading(&event_raw) {
205                    self.send_to_widget_request(NotifyNewMatrixEvent(event_raw))
206                        .map(|(_request, action)| vec![action])
207                        .unwrap_or_default()
208                } else {
209                    vec![]
210                }
211            }
212            IncomingMessage::ToDeviceReceived(to_device_raw) => {
213                let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
214                    error!("Received to device event before capabilities negotiation");
215                    return Vec::new();
216                };
217
218                if capabilities.allow_reading(&to_device_raw) {
219                    self.send_to_widget_request(NotifyNewToDeviceMessage(to_device_raw))
220                        .map(|(_request, action)| vec![action])
221                        .unwrap_or_default()
222                } else {
223                    vec![]
224                }
225            }
226            IncomingMessage::StateUpdateReceived(mut state) => {
227                let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
228                    error!("Received state update before capabilities negotiation");
229                    return Vec::new();
230                };
231
232                state.retain(|event| capabilities.allow_reading(event));
233
234                match &mut self.pending_state_updates {
235                    Some(InitialStateUpdate { postponed_updates, .. }) => {
236                        // This state update is racing with the read requests used to calculate the
237                        // initial state; postpone it
238                        postponed_updates.push(state);
239                        Vec::new()
240                    }
241                    None => self.send_state_update(state).into_iter().collect(),
242                }
243            }
244        }
245    }
246
247    fn process_widget_message(&mut self, raw: &str) -> Vec<Action> {
248        let message = match serde_json::from_str::<IncomingWidgetMessage>(raw) {
249            Ok(msg) => msg,
250            Err(error) => {
251                error!("couldn't deserialize incoming widget message: {error}");
252                return Vec::new();
253            }
254        };
255
256        if message.widget_id != self.widget_id {
257            error!("Received a message from a wrong widget, ignoring");
258            return Vec::new();
259        }
260
261        match message.kind {
262            IncomingWidgetMessageKind::Request(request) => {
263                self.process_from_widget_request(message.request_id, request)
264            }
265            IncomingWidgetMessageKind::Response(response) => {
266                self.process_to_widget_response(message.request_id, response)
267            }
268        }
269    }
270
271    #[instrument(skip_all, fields(?request_id))]
272    fn process_from_widget_request(
273        &mut self,
274        request_id: String,
275        raw_request: Raw<FromWidgetRequest>,
276    ) -> Vec<Action> {
277        let request = match raw_request.deserialize() {
278            Ok(r) => r,
279            Err(e) => {
280                return vec![Self::send_from_widget_err_response(
281                    raw_request,
282                    FromWidgetErrorResponse::from_error(Error::SerdeJson(e)),
283                )];
284            }
285        };
286
287        match request {
288            FromWidgetRequest::SupportedApiVersions {} => {
289                let response = SupportedApiVersionsResponse::new();
290                vec![Self::send_from_widget_response(raw_request, Ok(response))]
291            }
292
293            FromWidgetRequest::ContentLoaded {} => {
294                let mut response =
295                    vec![Self::send_from_widget_response(raw_request, Ok(JsonObject::new()))];
296                if matches!(self.capabilities, CapabilitiesState::Unset) {
297                    response.append(&mut self.negotiate_capabilities());
298                }
299                response
300            }
301
302            FromWidgetRequest::ReadEvent(req) => self
303                .process_read_event_request(req, raw_request)
304                .map(|a| vec![a])
305                .unwrap_or_default(),
306
307            FromWidgetRequest::SendEvent(req) => self
308                .process_send_event_request(req, raw_request)
309                .map(|a| vec![a])
310                .unwrap_or_default(),
311
312            FromWidgetRequest::SendToDevice(req) => self
313                .process_to_device_request(req, raw_request)
314                .map(|a| vec![a])
315                .unwrap_or_default(),
316
317            FromWidgetRequest::GetOpenId {} => {
318                let mut actions =
319                    vec![Self::send_from_widget_response(raw_request, Ok(OpenIdResponse::Pending))];
320
321                let Some((request, request_action)) =
322                    self.send_matrix_driver_request(RequestOpenId)
323                else {
324                    // We're done, return early.
325                    return actions;
326                };
327
328                request.add_response_handler(|res, machine| {
329                    let response = match res {
330                        Ok(res) => OpenIdResponse::Allowed(OpenIdState::new(request_id, res)),
331                        Err(msg) => {
332                            info!("OpenID request failed: {msg}");
333                            OpenIdResponse::Blocked { original_request_id: request_id }
334                        }
335                    };
336
337                    machine
338                        .send_to_widget_request(NotifyOpenIdChanged(response))
339                        .map(|(_request, action)| vec![action])
340                        .unwrap_or_default()
341                });
342
343                actions.push(request_action);
344                actions
345            }
346
347            FromWidgetRequest::DelayedEventUpdate(req) => {
348                let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
349                    return vec![Self::send_from_widget_error_string_response(
350                        raw_request,
351                        "Received send update delayed event request \
352                         before capabilities were negotiated",
353                    )];
354                };
355
356                if !capabilities.update_delayed_event {
357                    return vec![Self::send_from_widget_error_string_response(
358                        raw_request,
359                        format!("Not allowed: missing the {UPDATE_DELAYED_EVENT} capability."),
360                    )];
361                }
362
363                self.send_matrix_driver_request(UpdateDelayedEventRequest {
364                    action: req.action,
365                    delay_id: req.delay_id,
366                })
367                .map(|(request, request_action)| {
368                    request.add_response_handler(|result, _machine| {
369                        vec![Self::send_from_widget_response(
370                            raw_request,
371                            // This is mapped to another type because the
372                            // update_delay_event::Response
373                            // does not impl Serialize
374                            result
375                                .map(Into::<UpdateDelayedEventResponse>::into)
376                                .map_err(FromWidgetErrorResponse::from_error),
377                        )]
378                    });
379
380                    vec![request_action]
381                })
382                .unwrap_or_default()
383            }
384        }
385    }
386
387    /// Send a response to a request to read events.
388    ///
389    /// `events` represents the message-like events provided by the
390    /// [`crate::widget::MatrixDriver`].
391    fn send_read_events_response(
392        &self,
393        request: Raw<FromWidgetRequest>,
394        events: Result<Vec<Raw<AnyTimelineEvent>>, Error>,
395    ) -> Vec<Action> {
396        let response = match &self.capabilities {
397            CapabilitiesState::Unset => Err(FromWidgetErrorResponse::from_string(
398                "Received read events request before capabilities negotiation",
399            )),
400            CapabilitiesState::Negotiating => Err(FromWidgetErrorResponse::from_string(
401                "Received read events request while capabilities were negotiating",
402            )),
403            CapabilitiesState::Negotiated(capabilities) => events
404                .map(|mut events| {
405                    events.retain(|e| capabilities.allow_reading(e));
406                    ReadEventsResponse { events }
407                })
408                .map_err(FromWidgetErrorResponse::from_error),
409        };
410
411        vec![WidgetMachine::send_from_widget_response(request, response)]
412    }
413
414    fn process_read_event_request(
415        &mut self,
416        request: from_widget::ReadEventsRequest,
417        raw_request: Raw<FromWidgetRequest>,
418    ) -> Option<Action> {
419        let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
420            return Some(Self::send_from_widget_error_string_response(
421                raw_request,
422                "Received read event request before capabilities were negotiated",
423            ));
424        };
425
426        // Check the event type and state key filter against the capabilities
427        match &request.state_key {
428            None => {
429                if !capabilities.has_read_filter_for_type(&request.event_type) {
430                    return Some(Self::send_from_widget_error_string_response(
431                        raw_request,
432                        "Not allowed to read message-like event",
433                    ));
434                }
435            }
436            Some(state_key) => {
437                let allowed = match state_key.clone() {
438                    // If the widget tries to read any state event we can only skip sending the
439                    // request, if the widget does not have any capability for
440                    // the requested event type.
441                    StateKeySelector::Any => {
442                        capabilities.has_read_filter_for_type(&request.event_type)
443                    }
444                    // If we have a specific state key we will check if the widget has
445                    // the capability to read this specific state key and otherwise
446                    // skip sending the request.
447                    StateKeySelector::Key(state_key) => capabilities
448                        .allow_reading(FilterInput::state(&request.event_type, &state_key)),
449                };
450
451                if !allowed {
452                    return Some(Self::send_from_widget_error_string_response(
453                        raw_request,
454                        "Not allowed to read state event",
455                    ));
456                }
457            }
458        }
459
460        const DEFAULT_EVENT_LIMIT: u32 = 50;
461        let limit = request.limit.unwrap_or(DEFAULT_EVENT_LIMIT);
462        let request = driver_req::ReadEventsRequest {
463            event_type: request.event_type,
464            state_key: request.state_key,
465            limit,
466        };
467
468        self.send_matrix_driver_request(request).map(|(request, action)| {
469            request.add_response_handler(|result, machine| {
470                machine.send_read_events_response(raw_request, result)
471            });
472            action
473        })
474    }
475
476    fn process_send_event_request(
477        &mut self,
478        request: SendEventRequest,
479        raw_request: Raw<FromWidgetRequest>,
480    ) -> Option<Action> {
481        let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
482            error!("Received send event request before capabilities negotiation");
483            return None;
484        };
485
486        if !capabilities.send_delayed_event && request.delay.is_some() {
487            return Some(Self::send_from_widget_error_string_response(
488                raw_request,
489                format!("Not allowed: missing the {SEND_DELAYED_EVENT} capability."),
490            ));
491        }
492
493        if !capabilities.allow_sending(&request) {
494            return Some(Self::send_from_widget_error_string_response(
495                raw_request,
496                "Not allowed to send event",
497            ));
498        }
499
500        let (request, action) = self.send_matrix_driver_request(request)?;
501
502        request.add_response_handler(|mut result, machine| {
503            if let Ok(r) = result.as_mut() {
504                r.set_room_id(machine.room_id.clone());
505            }
506            vec![Self::send_from_widget_response(
507                raw_request,
508                result.map_err(FromWidgetErrorResponse::from_error),
509            )]
510        });
511        Some(action)
512    }
513
514    fn process_to_device_request(
515        &mut self,
516        request: SendToDeviceRequest,
517        raw_request: Raw<FromWidgetRequest>,
518    ) -> Option<Action> {
519        let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
520            error!("Received send event request before capabilities negotiation");
521            return None;
522        };
523
524        if !capabilities.allow_sending(&request) {
525            return Some(Self::send_from_widget_error_string_response(
526                raw_request,
527                format!("Not allowed to send to-device message of type: {}", request.event_type),
528            ));
529        }
530
531        let (request, action) = self.send_matrix_driver_request(request)?;
532        request.add_response_handler(|result, _| {
533            vec![Self::send_from_widget_response(
534                raw_request,
535                result.map_err(FromWidgetErrorResponse::from_error),
536            )]
537        });
538        Some(action)
539    }
540
541    #[instrument(skip_all, fields(?request_id))]
542    fn process_to_widget_response(
543        &mut self,
544        request_id: String,
545        response: ToWidgetResponse,
546    ) -> Vec<Action> {
547        let Ok(request_id) = Uuid::parse_str(&request_id) else {
548            error!("Response's request_id is not a valid UUID");
549            return Vec::new();
550        };
551
552        let request = match self.pending_to_widget_requests.extract(&request_id) {
553            Ok(r) => r,
554            Err(e) => {
555                warn!("{e}");
556                return Vec::new();
557            }
558        };
559
560        if response.action != request.action {
561            error!(
562                ?request.action, ?response.action,
563                "Received response with different `action` than request"
564            );
565            return Vec::new();
566        }
567
568        request
569            .response_fn
570            .map(|response_fn| response_fn(response.response_data, self))
571            .unwrap_or_default()
572    }
573
574    #[instrument(skip_all, fields(?request_id))]
575    fn process_matrix_driver_response(
576        &mut self,
577        request_id: Uuid,
578        response: Result<MatrixDriverResponse>,
579    ) -> Vec<Action> {
580        match self.pending_matrix_driver_requests.extract(&request_id) {
581            Ok(request) => request
582                .response_fn
583                .map(|response_fn| response_fn(response, self))
584                .unwrap_or_default(),
585            Err(e) => {
586                warn!("Could not find matching pending response: {e}");
587                Vec::new()
588            }
589        }
590    }
591
592    fn send_from_widget_error_string_response(
593        raw_request: Raw<FromWidgetRequest>,
594        error: impl Into<String>,
595    ) -> Action {
596        Self::send_from_widget_err_response(
597            raw_request,
598            FromWidgetErrorResponse::from_string(error),
599        )
600    }
601
602    fn send_from_widget_err_response(
603        raw_request: Raw<FromWidgetRequest>,
604        error: FromWidgetErrorResponse,
605    ) -> Action {
606        Self::send_from_widget_response(
607            raw_request,
608            Err::<serde_json::Value, FromWidgetErrorResponse>(error),
609        )
610    }
611
612    fn send_from_widget_response(
613        raw_request: Raw<FromWidgetRequest>,
614        result: Result<impl Serialize, FromWidgetErrorResponse>,
615    ) -> Action {
616        // we do not want tho expose this to never allow sending arbitrary errors.
617        // Errors always need to be `FromWidgetErrorResponse`.
618        #[instrument(skip_all)]
619        fn send_response_data(
620            raw_request: Raw<FromWidgetRequest>,
621            response_data: impl Serialize,
622        ) -> Action {
623            let f = || {
624                let mut object = raw_request
625                    .deserialize_as_unchecked::<IndexMap<String, Box<RawJsonValue>>>()?;
626                let response_data = serde_json::value::to_raw_value(&response_data)?;
627                object.insert("response".to_owned(), response_data);
628                serde_json::to_string(&object)
629            };
630
631            // SAFETY: we expect the raw request to be a valid JSON map, to which we add a
632            // new field.
633            let serialized = f().expect("error when attaching response to incoming request");
634
635            Action::SendToWidget(serialized)
636        }
637
638        match result {
639            Ok(res) => send_response_data(raw_request, res),
640            Err(error_response) => send_response_data(raw_request, error_response),
641        }
642    }
643
644    #[instrument(skip_all, fields(action = T::ACTION))]
645    fn send_to_widget_request<T: ToWidgetRequest>(
646        &mut self,
647        to_widget_request: T,
648    ) -> Option<(ToWidgetRequestHandle<'_, T::ResponseData>, Action)> {
649        #[derive(Serialize)]
650        #[serde(tag = "api", rename = "toWidget", rename_all = "camelCase")]
651        struct ToWidgetRequestSerdeHelper<'a, T> {
652            widget_id: &'a str,
653            request_id: Uuid,
654            action: &'static str,
655            data: T,
656        }
657
658        let request_id = Uuid::new_v4();
659        let full_request = ToWidgetRequestSerdeHelper {
660            widget_id: &self.widget_id,
661            request_id,
662            action: T::ACTION,
663            data: to_widget_request,
664        };
665
666        let request_meta = ToWidgetRequestMeta::new(T::ACTION);
667        let Some(meta) = self.pending_to_widget_requests.insert(request_id, request_meta) else {
668            warn!("Reached limits of pending requests for toWidget requests");
669            return None;
670        };
671
672        let serialized = serde_json::to_string(&full_request).expect("Failed to serialize request");
673        Some((ToWidgetRequestHandle::new(meta), Action::SendToWidget(serialized)))
674    }
675
676    #[instrument(skip_all)]
677    fn send_matrix_driver_request<T: MatrixDriverRequest>(
678        &mut self,
679        request: T,
680    ) -> Option<(MatrixDriverRequestHandle<'_, T::Response>, Action)> {
681        let request_id = Uuid::new_v4();
682        let request_meta = MatrixDriverRequestMeta::new();
683
684        let Some(meta) = self.pending_matrix_driver_requests.insert(request_id, request_meta)
685        else {
686            warn!("Reached limits of pending requests for Matrix driver requests");
687            return None;
688        };
689
690        Some((
691            MatrixDriverRequestHandle::new(meta),
692            Action::MatrixDriverRequest { request_id, data: request.into() },
693        ))
694    }
695
696    /// Sends a [`NotifyStateUpdate`] action to the widget. The `events`
697    /// indicate which room state entries (may) have changed.
698    fn send_state_update(&mut self, events: Vec<Raw<AnyStateEvent>>) -> Option<Action> {
699        self.send_to_widget_request(NotifyStateUpdate { state: events })
700            .map(|(_request, action)| action)
701    }
702
703    /// Processes a response to one of the read state requests sent to the
704    /// [`crate::widget::MatrixDriver`] to compute an initial state update. If
705    /// the update is complete, this will send it off to the widget in a
706    /// [`NotifyStateUpdate`] action.
707    fn process_read_initial_state_response(
708        &mut self,
709        events: Vec<Raw<AnyStateEvent>>,
710    ) -> Option<Vec<Action>> {
711        // Pull the updates struct out of the machine temporarily so that we can match
712        // on it in one place, mutate it, and still be able to call
713        // `send_to_widget_request` later in this block (which borrows the machine
714        // mutably)
715        match self.pending_state_updates.take() {
716            None => {
717                error!(
718                    "Initial state updates must only be set to `None` once all requests \
719                     are complete; dropping state response"
720                );
721                None
722            }
723
724            Some(mut updates) => {
725                updates.initial_state.push(events);
726
727                if updates.initial_state.len() != updates.request_count {
728                    // Not all of the initial state requests have completed yet; put the updates
729                    // struct back so we can continue accumulating the initial state.
730                    self.pending_state_updates = Some(updates);
731                    return None;
732                }
733
734                // The initial state is complete; combine the data and push it to the widget in
735                // a single action.
736                let initial =
737                    self.send_state_update(updates.initial_state.into_iter().flatten().collect());
738                // Also flush any state updates that had been postponed until after the initial
739                // state push. We deliberately do not bundle these updates into a single action,
740                // since they might contain some repeated updates to the same room state entry
741                // which could confuse the widget if included in the same `events` array. It's
742                // easiest to let the widget process each update sequentially rather than put
743                // effort into coalescing them - this is for an edge case after all.
744                let postponed = updates
745                    .postponed_updates
746                    .into_iter()
747                    .map(|state| self.send_state_update(state));
748
749                // The postponed updates should come after the initial update
750                Some(initial.into_iter().chain(postponed.flatten()).collect())
751            }
752        }
753    }
754
755    /// Processes a response from the [`crate::widget::MatrixDriver`] saying
756    /// that the widget is approved to acquire some capabilities. This will
757    /// store those capabilities in the state machine, notify the widget,
758    /// and then begin computing an initial state update if the widget
759    /// was approved to read room state.
760    fn process_acquired_capabilities(
761        &mut self,
762        approved: Result<Capabilities, Error>,
763        requested: Capabilities,
764    ) -> Vec<Action> {
765        let approved = approved.unwrap_or_else(|e| {
766            error!("Acquiring capabilities failed: {e}");
767            Capabilities::default()
768        });
769
770        let mut actions = Vec::new();
771        if !approved.read.is_empty() {
772            actions.push(Action::Subscribe);
773        }
774
775        self.capabilities = CapabilitiesState::Negotiated(approved.clone());
776
777        let state_filters: Vec<_> = approved
778            .read
779            .iter()
780            .filter_map(|f| match f {
781                Filter::State(f) => Some(f),
782                _ => None,
783            })
784            .cloned()
785            .collect();
786
787        if !state_filters.is_empty() {
788            // Begin accumulating the initial state to be pushed to the widget. Since this
789            // widget driver currently doesn't implement capability
790            // renegotiation, we can be sure that we aren't overwriting another
791            // in-progress update.
792            if self.pending_state_updates.is_some() {
793                // Or so we should be. Let's at least log something if we ever break that
794                // invariant.
795                error!("Another initial state update is in progress; overwriting it");
796            }
797            self.pending_state_updates = Some(InitialStateUpdate {
798                initial_state: Vec::with_capacity(state_filters.len()),
799                request_count: state_filters.len(),
800                postponed_updates: Vec::new(),
801            })
802        }
803
804        // For each room state filter that the widget has been approved to read, fire
805        // off a request to the driver to determine the initial values of the
806        // matching room state entries
807        let initial_state_actions = state_filters.iter().flat_map(|filter| {
808            self.send_matrix_driver_request(match filter {
809                StateEventFilter::WithType(event_type) => ReadStateRequest {
810                    event_type: event_type.to_string(),
811                    state_key: StateKeySelector::Any,
812                },
813                StateEventFilter::WithTypeAndStateKey(event_type, state_key) => ReadStateRequest {
814                    event_type: event_type.to_string(),
815                    state_key: StateKeySelector::Key(state_key.clone()),
816                },
817            })
818            .map(|(request, action)| {
819                request.add_response_handler(move |result, machine| {
820                    machine
821                        .process_read_initial_state_response(result.unwrap_or_else(|e| {
822                            error!("Reading initial room state failed: {e}");
823                            // Pretend that we just got an empty response so the initial state
824                            // update won't be completely blocked on this one bit of missing data
825                            Vec::new()
826                        }))
827                        .unwrap_or_default()
828                });
829                action
830            })
831        });
832
833        actions.extend(initial_state_actions);
834
835        let notify_caps_changed = NotifyCapabilitiesChanged { approved, requested };
836        if let Some((_request, action)) = self.send_to_widget_request(notify_caps_changed) {
837            actions.push(action);
838        }
839
840        actions
841    }
842
843    /// Attempts to acquire capabilities that have been requested by the widget
844    /// during the initial capability negotiation handshake.
845    fn process_requested_capabilities(&mut self, requested: Capabilities) -> Vec<Action> {
846        match self.send_matrix_driver_request(AcquireCapabilities {
847            desired_capabilities: requested.clone(),
848        }) {
849            None => Vec::new(),
850            Some((request, action)) => {
851                request.add_response_handler(|result, machine| {
852                    machine.process_acquired_capabilities(result, requested)
853                });
854                vec![action]
855            }
856        }
857    }
858
859    /// Performs an initial capability negotiation handshake.
860    ///
861    /// The sequence is as follows: the machine sends a [`RequestCapabilities`]
862    /// `toWidget` action, the widget responds with its requested
863    /// capabilities, the machine attempts to acquire the
864    /// requested capabilities from the driver, then it sends a
865    /// [`NotifyCapabilitiesChanged`] `toWidget` action to tell the widget
866    /// which capabilities were approved.
867    fn negotiate_capabilities(&mut self) -> Vec<Action> {
868        let mut actions = Vec::new();
869
870        // XXX: This branch appears to be accounting for capability **re**negotiation
871        // (MSC2974), which isn't implemented yet
872        if matches!(&self.capabilities, CapabilitiesState::Negotiated(c) if !c.read.is_empty()) {
873            actions.push(Action::Unsubscribe);
874        }
875
876        self.capabilities = CapabilitiesState::Negotiating;
877
878        if let Some((request, action)) = self.send_to_widget_request(RequestCapabilities {}) {
879            request.add_response_handler(|result, machine| {
880                machine.process_requested_capabilities(result.capabilities)
881            });
882            actions.push(action);
883        }
884
885        actions
886    }
887}
888
889type ToWidgetResponseFn =
890    Box<dyn FnOnce(Box<RawJsonValue>, &mut WidgetMachine) -> Vec<Action> + Send>;
891
892pub(crate) struct ToWidgetRequestMeta {
893    action: &'static str,
894    response_fn: Option<ToWidgetResponseFn>,
895}
896
897impl ToWidgetRequestMeta {
898    fn new(action: &'static str) -> Self {
899        Self { action, response_fn: None }
900    }
901}
902
903type MatrixDriverResponseFn =
904    Box<dyn FnOnce(Result<MatrixDriverResponse>, &mut WidgetMachine) -> Vec<Action> + Send>;
905
906pub(crate) struct MatrixDriverRequestMeta {
907    response_fn: Option<MatrixDriverResponseFn>,
908}
909
910impl MatrixDriverRequestMeta {
911    fn new() -> Self {
912        Self { response_fn: None }
913    }
914}
915
916/// Current negotiation state for capabilities.
917enum CapabilitiesState {
918    /// Capabilities have never been defined.
919    Unset,
920    /// We're currently negotiating capabilities.
921    Negotiating,
922    /// The capabilities have already been negotiated.
923    Negotiated(Capabilities),
924}