1use std::{iter, time::Duration};
18
19use driver_req::UpdateDelayedEventRequest;
20use from_widget::UpdateDelayedEventResponse;
21use indexmap::IndexMap;
22use ruma::{
23 serde::{JsonObject, Raw},
24 OwnedRoomId,
25};
26use serde::Serialize;
27use serde_json::value::RawValue as RawJsonValue;
28use tracing::{debug, error, info, instrument, warn};
29use uuid::Uuid;
30
31use self::{
32 driver_req::{
33 AcquireCapabilities, MatrixDriverRequest, MatrixDriverRequestHandle,
34 ReadMessageLikeEventRequest, RequestOpenId,
35 },
36 from_widget::{
37 FromWidgetErrorResponse, FromWidgetRequest, ReadEventRequest, ReadEventResponse,
38 SupportedApiVersionsResponse,
39 },
40 incoming::{IncomingWidgetMessage, IncomingWidgetMessageKind},
41 openid::{OpenIdResponse, OpenIdState},
42 pending::{PendingRequests, RequestLimits},
43 to_widget::{
44 NotifyCapabilitiesChanged, NotifyNewMatrixEvent, NotifyOpenIdChanged, RequestCapabilities,
45 ToWidgetRequest, ToWidgetRequestHandle, ToWidgetResponse,
46 },
47};
48#[cfg(doc)]
49use super::WidgetDriver;
50use super::{
51 capabilities::{SEND_DELAYED_EVENT, UPDATE_DELAYED_EVENT},
52 filter::{MatrixEventContent, MatrixEventFilterInput},
53 Capabilities, StateKeySelector,
54};
55use crate::Result;
56
57mod driver_req;
58mod from_widget;
59mod incoming;
60mod openid;
61mod pending;
62#[cfg(test)]
63mod tests;
64mod to_widget;
65
66pub(crate) use self::{
67 driver_req::{MatrixDriverRequestData, ReadStateEventRequest, SendEventRequest},
68 from_widget::SendEventResponse,
69 incoming::{IncomingMessage, MatrixDriverResponse},
70};
71
72#[derive(Debug)]
77pub(crate) enum Action {
78 SendToWidget(String),
80
81 MatrixDriverRequest {
85 request_id: Uuid,
91
92 data: MatrixDriverRequestData,
94 },
95
96 Subscribe,
99
100 Unsubscribe,
103}
104
105pub(crate) struct WidgetMachine {
110 widget_id: String,
114
115 room_id: OwnedRoomId,
117
118 pending_to_widget_requests: PendingRequests<ToWidgetRequestMeta>,
120
121 pending_matrix_driver_requests: PendingRequests<MatrixDriverRequestMeta>,
123
124 capabilities: CapabilitiesState,
126}
127
128impl WidgetMachine {
129 pub(crate) fn new(
133 widget_id: String,
134 room_id: OwnedRoomId,
135 init_on_content_load: bool,
136 ) -> (Self, Vec<Action>) {
137 let limits =
138 RequestLimits { max_pending_requests: 15, response_timeout: Duration::from_secs(10) };
139
140 let mut machine = Self {
141 widget_id,
142 room_id,
143 pending_to_widget_requests: PendingRequests::new(limits.clone()),
144 pending_matrix_driver_requests: PendingRequests::new(limits),
145 capabilities: CapabilitiesState::Unset,
146 };
147
148 let initial_actions =
149 if init_on_content_load { Vec::new() } else { machine.negotiate_capabilities() };
150
151 (machine, initial_actions)
152 }
153
154 pub(crate) fn process(&mut self, event: IncomingMessage) -> Vec<Action> {
156 self.pending_to_widget_requests.remove_expired();
158 self.pending_matrix_driver_requests.remove_expired();
159
160 match event {
161 IncomingMessage::WidgetMessage(raw) => self.process_widget_message(&raw),
162
163 IncomingMessage::MatrixDriverResponse { request_id, response } => {
164 self.process_matrix_driver_response(request_id, response)
165 }
166
167 IncomingMessage::MatrixEventReceived(event) => {
168 let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
169 error!("Received matrix event before capabilities negotiation");
170 return Vec::new();
171 };
172
173 capabilities
174 .raw_event_matches_read_filter(&event)
175 .then(|| {
176 let action = self.send_to_widget_request(NotifyNewMatrixEvent(event)).1;
177 action.map(|a| vec![a]).unwrap_or_default()
178 })
179 .unwrap_or_default()
180 }
181 }
182 }
183
184 fn process_widget_message(&mut self, raw: &str) -> Vec<Action> {
185 let message = match serde_json::from_str::<IncomingWidgetMessage>(raw) {
186 Ok(msg) => msg,
187 Err(error) => {
188 error!("couldn't deserialize incoming widget message: {error}");
189 return Vec::new();
190 }
191 };
192
193 if message.widget_id != self.widget_id {
194 error!("Received a message from a wrong widget, ignoring");
195 return Vec::new();
196 }
197
198 match message.kind {
199 IncomingWidgetMessageKind::Request(request) => {
200 self.process_from_widget_request(message.request_id, request)
201 }
202 IncomingWidgetMessageKind::Response(response) => {
203 self.process_to_widget_response(message.request_id, response)
204 }
205 }
206 }
207
208 #[instrument(skip_all, fields(?request_id))]
209 fn process_from_widget_request(
210 &mut self,
211 request_id: String,
212 raw_request: Raw<FromWidgetRequest>,
213 ) -> Vec<Action> {
214 let request = match raw_request.deserialize() {
215 Ok(r) => r,
216 Err(e) => {
217 return vec![Self::send_from_widget_err_response(
218 raw_request,
219 FromWidgetErrorResponse::from_error(crate::Error::SerdeJson(e)),
220 )]
221 }
222 };
223
224 match request {
225 FromWidgetRequest::SupportedApiVersions {} => {
226 let response = SupportedApiVersionsResponse::new();
227 vec![Self::send_from_widget_response(raw_request, Ok(response))]
228 }
229
230 FromWidgetRequest::ContentLoaded {} => {
231 let mut response =
232 vec![Self::send_from_widget_response(raw_request, Ok(JsonObject::new()))];
233 if self.capabilities.is_unset() {
234 response.append(&mut self.negotiate_capabilities());
235 }
236 response
237 }
238
239 FromWidgetRequest::ReadEvent(req) => self
240 .process_read_event_request(req, raw_request)
241 .map(|a| vec![a])
242 .unwrap_or_default(),
243
244 FromWidgetRequest::SendEvent(req) => self
245 .process_send_event_request(req, raw_request)
246 .map(|a| vec![a])
247 .unwrap_or_default(),
248
249 FromWidgetRequest::GetOpenId {} => {
250 let (request, request_action) = self.send_matrix_driver_request(RequestOpenId);
251 request.then(|res, machine| {
252 let response = match res {
253 Ok(res) => OpenIdResponse::Allowed(OpenIdState::new(request_id, res)),
254 Err(msg) => {
255 info!("OpenID request failed: {msg}");
256 OpenIdResponse::Blocked { original_request_id: request_id }
257 }
258 };
259
260 let action = machine.send_to_widget_request(NotifyOpenIdChanged(response)).1;
261 action.map(|a| vec![a]).unwrap_or_default()
262 });
263
264 let response =
265 Self::send_from_widget_response(raw_request, Ok(OpenIdResponse::Pending));
266 iter::once(response).chain(request_action).collect()
267 }
268
269 FromWidgetRequest::DelayedEventUpdate(req) => {
270 let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
271 return vec![Self::send_from_widget_error_string_response(
272 raw_request,
273 "Received send update delayed event request before capabilities were negotiated"
274 )];
275 };
276
277 if !capabilities.update_delayed_event {
278 return vec![Self::send_from_widget_error_string_response(
279 raw_request,
280 format!("Not allowed: missing the {UPDATE_DELAYED_EVENT} capability."),
281 )];
282 }
283
284 let (request, request_action) =
285 self.send_matrix_driver_request(UpdateDelayedEventRequest {
286 action: req.action,
287 delay_id: req.delay_id,
288 });
289 request.then(|result, _machine| {
290 vec![Self::send_from_widget_response(
291 raw_request,
292 result
295 .map(Into::<UpdateDelayedEventResponse>::into)
296 .map_err(FromWidgetErrorResponse::from_error),
297 )]
298 });
299
300 request_action.map(|a| vec![a]).unwrap_or_default()
301 }
302 }
303 }
304
305 fn process_read_event_request(
306 &mut self,
307 request: ReadEventRequest,
308 raw_request: Raw<FromWidgetRequest>,
309 ) -> Option<Action> {
310 let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
311 return Some(Self::send_from_widget_error_string_response(
312 raw_request,
313 "Received read event request before capabilities were negotiated",
314 ));
315 };
316
317 match request {
318 ReadEventRequest::ReadMessageLikeEvent { event_type, limit } => {
319 if !capabilities.read.iter().any(|f| f.matches_message_like_event_type(&event_type))
320 {
321 return Some(Self::send_from_widget_error_string_response(
322 raw_request,
323 "Not allowed to read message like event",
324 ));
325 }
326
327 const DEFAULT_EVENT_LIMIT: u32 = 50;
328 let limit = limit.unwrap_or(DEFAULT_EVENT_LIMIT);
329 let request = ReadMessageLikeEventRequest { event_type, limit };
330
331 let (request, action) = self.send_matrix_driver_request(request);
332
333 request.then(|result, machine| {
334 let response = match &machine.capabilities {
335 CapabilitiesState::Unset => Err(FromWidgetErrorResponse::from_string(
336 "Received read event request before capabilities negotiation",
337 )),
338 CapabilitiesState::Negotiating => {
339 Err(FromWidgetErrorResponse::from_string(
340 "Received read event request while capabilities were negotiating",
341 ))
342 }
343 CapabilitiesState::Negotiated(capabilities) => result
344 .map(|mut events| {
345 events.retain(|e| capabilities.raw_event_matches_read_filter(e));
346 ReadEventResponse { events }
347 })
348 .map_err(FromWidgetErrorResponse::from_error),
349 };
350
351 vec![Self::send_from_widget_response(raw_request, response)]
352 });
353
354 action
355 }
356
357 ReadEventRequest::ReadStateEvent { event_type, state_key } => {
358 let allowed = match &state_key {
359 StateKeySelector::Any => capabilities
360 .read
361 .iter()
362 .any(|filter| filter.matches_state_event_with_any_state_key(&event_type)),
363
364 StateKeySelector::Key(state_key) => {
365 let filter_in = MatrixEventFilterInput {
366 event_type: event_type.to_string().into(),
367 state_key: Some(state_key.clone()),
368 content: MatrixEventContent::default(),
370 };
371
372 capabilities.read.iter().any(|filter| filter.matches(&filter_in))
373 }
374 };
375
376 if allowed {
377 let request = ReadStateEventRequest { event_type, state_key };
378 let (request, action) = self.send_matrix_driver_request(request);
379 request.then(|result, _machine| {
380 let response = result
381 .map(|events| ReadEventResponse { events })
382 .map_err(FromWidgetErrorResponse::from_error);
383 vec![Self::send_from_widget_response(raw_request, response)]
384 });
385 action
386 } else {
387 Some(Self::send_from_widget_error_string_response(
388 raw_request,
389 "Not allowed to read state event",
390 ))
391 }
392 }
393 }
394 }
395
396 fn process_send_event_request(
397 &mut self,
398 request: SendEventRequest,
399 raw_request: Raw<FromWidgetRequest>,
400 ) -> Option<Action> {
401 let CapabilitiesState::Negotiated(capabilities) = &self.capabilities else {
402 error!("Received send event request before capabilities negotiation");
403 return None;
404 };
405
406 let filter_in = MatrixEventFilterInput {
407 event_type: request.event_type.clone(),
408 state_key: request.state_key.clone(),
409 content: serde_json::from_str(request.content.get()).unwrap_or_else(|e| {
410 debug!("Failed to deserialize event content for filter: {e}");
411 Default::default()
414 }),
415 };
416
417 if !capabilities.send_delayed_event && request.delay.is_some() {
418 return Some(Self::send_from_widget_error_string_response(
419 raw_request,
420 format!("Not allowed: missing the {SEND_DELAYED_EVENT} capability."),
421 ));
422 }
423
424 if !capabilities.send.iter().any(|filter| filter.matches(&filter_in)) {
425 return Some(Self::send_from_widget_error_string_response(
426 raw_request,
427 "Not allowed to send event",
428 ));
429 }
430
431 let (request, action) = self.send_matrix_driver_request(request);
432
433 request.then(|mut result, machine| {
434 if let Ok(r) = result.as_mut() {
435 r.set_room_id(machine.room_id.clone());
436 }
437 vec![Self::send_from_widget_response(
438 raw_request,
439 result.map_err(FromWidgetErrorResponse::from_error),
440 )]
441 });
442
443 action
444 }
445
446 #[instrument(skip_all, fields(?request_id))]
447 fn process_to_widget_response(
448 &mut self,
449 request_id: String,
450 response: ToWidgetResponse,
451 ) -> Vec<Action> {
452 let Ok(request_id) = Uuid::parse_str(&request_id) else {
453 error!("Response's request_id is not a valid UUID");
454 return Vec::new();
455 };
456
457 let request = match self.pending_to_widget_requests.extract(&request_id) {
458 Ok(r) => r,
459 Err(e) => {
460 warn!("{e}");
461 return Vec::new();
462 }
463 };
464
465 if response.action != request.action {
466 error!(
467 ?request.action, ?response.action,
468 "Received response with different `action` than request"
469 );
470 return Vec::new();
471 }
472
473 request
474 .response_fn
475 .map(|response_fn| response_fn(response.response_data, self))
476 .unwrap_or_default()
477 }
478
479 #[instrument(skip_all, fields(?request_id))]
480 fn process_matrix_driver_response(
481 &mut self,
482 request_id: Uuid,
483 response: Result<MatrixDriverResponse>,
484 ) -> Vec<Action> {
485 match self.pending_matrix_driver_requests.extract(&request_id) {
486 Ok(request) => request
487 .response_fn
488 .map(|response_fn| response_fn(response, self))
489 .unwrap_or_default(),
490 Err(e) => {
491 warn!("Could not find matching pending response: {e}");
492 Vec::new()
493 }
494 }
495 }
496
497 fn send_from_widget_error_string_response(
498 raw_request: Raw<FromWidgetRequest>,
499 error: impl Into<String>,
500 ) -> Action {
501 Self::send_from_widget_err_response(
502 raw_request,
503 FromWidgetErrorResponse::from_string(error),
504 )
505 }
506
507 fn send_from_widget_err_response(
508 raw_request: Raw<FromWidgetRequest>,
509 error: FromWidgetErrorResponse,
510 ) -> Action {
511 Self::send_from_widget_response(
512 raw_request,
513 Err::<serde_json::Value, FromWidgetErrorResponse>(error),
514 )
515 }
516
517 fn send_from_widget_response(
518 raw_request: Raw<FromWidgetRequest>,
519 result: Result<impl Serialize, FromWidgetErrorResponse>,
520 ) -> Action {
521 #[instrument(skip_all)]
524 fn send_response_data(
525 raw_request: Raw<FromWidgetRequest>,
526 response_data: impl Serialize,
527 ) -> Action {
528 let f = || {
529 let mut object =
530 raw_request.deserialize_as::<IndexMap<String, Box<RawJsonValue>>>()?;
531 let response_data = serde_json::value::to_raw_value(&response_data)?;
532 object.insert("response".to_owned(), response_data);
533 serde_json::to_string(&object)
534 };
535
536 let serialized = f().expect("error when attaching response to incoming request");
539
540 Action::SendToWidget(serialized)
541 }
542
543 match result {
544 Ok(res) => send_response_data(raw_request, res),
545 Err(error_response) => send_response_data(raw_request, error_response),
546 }
547 }
548
549 #[instrument(skip_all, fields(action = T::ACTION))]
550 fn send_to_widget_request<T: ToWidgetRequest>(
551 &mut self,
552 to_widget_request: T,
553 ) -> (ToWidgetRequestHandle<'_, T::ResponseData>, Option<Action>) {
554 #[derive(Serialize)]
555 #[serde(tag = "api", rename = "toWidget", rename_all = "camelCase")]
556 struct ToWidgetRequestSerdeHelper<'a, T> {
557 widget_id: &'a str,
558 request_id: Uuid,
559 action: &'static str,
560 data: T,
561 }
562
563 let request_id = Uuid::new_v4();
564 let full_request = ToWidgetRequestSerdeHelper {
565 widget_id: &self.widget_id,
566 request_id,
567 action: T::ACTION,
568 data: to_widget_request,
569 };
570
571 let request_meta = ToWidgetRequestMeta::new(T::ACTION);
572 let Some(meta) = self.pending_to_widget_requests.insert(request_id, request_meta) else {
573 warn!("Reached limits of pending requests for toWidget requests");
574 return (ToWidgetRequestHandle::null(), None);
575 };
576
577 let serialized = serde_json::to_string(&full_request).expect("Failed to serialize request");
578 (ToWidgetRequestHandle::new(meta), Some(Action::SendToWidget(serialized)))
579 }
580
581 #[instrument(skip_all)]
582 fn send_matrix_driver_request<T: MatrixDriverRequest>(
583 &mut self,
584 request: T,
585 ) -> (MatrixDriverRequestHandle<'_, T::Response>, Option<Action>) {
586 let request_id = Uuid::new_v4();
587 let request_meta = MatrixDriverRequestMeta::new();
588 let Some(meta) = self.pending_matrix_driver_requests.insert(request_id, request_meta)
589 else {
590 warn!("Reached limits of pending requests for matrix driver requests");
591 return (MatrixDriverRequestHandle::null(), None);
592 };
593
594 let action = Action::MatrixDriverRequest { request_id, data: request.into() };
595 (MatrixDriverRequestHandle::new(meta), Some(action))
596 }
597
598 fn negotiate_capabilities(&mut self) -> Vec<Action> {
599 let unsubscribe_required =
600 matches!(&self.capabilities, CapabilitiesState::Negotiated(c) if !c.read.is_empty());
601 self.capabilities = CapabilitiesState::Negotiating;
602
603 let (request, action) = self.send_to_widget_request(RequestCapabilities {});
604 request.then(|response, machine| {
605 let requested = response.capabilities;
606 let (request, action) = machine.send_matrix_driver_request(AcquireCapabilities {
607 desired_capabilities: requested.clone(),
608 });
609
610 request.then(|result, machine| {
611 let approved = result.unwrap_or_else(|e| {
612 error!("Acquiring capabilities failed: {e}");
613 Capabilities::default()
614 });
615
616 let subscribe_required = !approved.read.is_empty();
617 machine.capabilities = CapabilitiesState::Negotiated(approved.clone());
618
619 let update = NotifyCapabilitiesChanged { approved, requested };
620 let (_request, action) = machine.send_to_widget_request(update);
621
622 subscribe_required.then_some(Action::Subscribe).into_iter().chain(action).collect()
623 });
624
625 action.map(|a| vec![a]).unwrap_or_default()
626 });
627
628 unsubscribe_required.then_some(Action::Unsubscribe).into_iter().chain(action).collect()
629 }
630}
631
632type ToWidgetResponseFn =
633 Box<dyn FnOnce(Box<RawJsonValue>, &mut WidgetMachine) -> Vec<Action> + Send>;
634
635pub(crate) struct ToWidgetRequestMeta {
636 action: &'static str,
637 response_fn: Option<ToWidgetResponseFn>,
638}
639
640impl ToWidgetRequestMeta {
641 fn new(action: &'static str) -> Self {
642 Self { action, response_fn: None }
643 }
644}
645
646type MatrixDriverResponseFn =
647 Box<dyn FnOnce(Result<MatrixDriverResponse>, &mut WidgetMachine) -> Vec<Action> + Send>;
648
649pub(crate) struct MatrixDriverRequestMeta {
650 response_fn: Option<MatrixDriverResponseFn>,
651}
652
653impl MatrixDriverRequestMeta {
654 fn new() -> Self {
655 Self { response_fn: None }
656 }
657}
658
659enum CapabilitiesState {
661 Unset,
663 Negotiating,
665 Negotiated(Capabilities),
667}
668
669impl CapabilitiesState {
670 #[must_use]
671 fn is_unset(&self) -> bool {
672 matches!(self, Self::Unset)
673 }
674}