matrix_sdk/client/
futures.rs1#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::SharedObservable;
20#[cfg(not(target_arch = "wasm32"))]
21use eyeball::Subscriber;
22#[cfg(feature = "experimental-oidc")]
23use mas_oidc_client::{
24 error::{
25 Error as OidcClientError, ErrorBody as OidcErrorBody, HttpError as OidcHttpError,
26 TokenRefreshError, TokenRequestError,
27 },
28 types::errors::ClientErrorCode,
29};
30use matrix_sdk_common::boxed_into_future;
31use ruma::api::{client::error::ErrorKind, error::FromHttpResponseError, OutgoingRequest};
32#[cfg(feature = "experimental-oidc")]
33use tracing::error;
34use tracing::trace;
35
36use super::super::Client;
37#[cfg(feature = "experimental-oidc")]
38use crate::authentication::oidc::OidcError;
39use crate::{
40 config::RequestConfig,
41 error::{HttpError, HttpResult},
42 RefreshTokenError, TransmissionProgress,
43};
44
45#[allow(missing_debug_implementations)]
47pub struct SendRequest<R> {
48 pub(crate) client: Client,
49 pub(crate) request: R,
50 pub(crate) config: Option<RequestConfig>,
51 pub(crate) send_progress: SharedObservable<TransmissionProgress>,
52}
53
54impl<R> SendRequest<R> {
55 pub fn with_send_progress_observable(
62 mut self,
63 send_progress: SharedObservable<TransmissionProgress>,
64 ) -> Self {
65 self.send_progress = send_progress;
66 self
67 }
68
69 pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
72 self.config = request_config.into();
73 self
74 }
75
76 #[cfg(not(target_arch = "wasm32"))]
79 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
80 self.send_progress.subscribe()
81 }
82}
83
84impl<R> IntoFuture for SendRequest<R>
85where
86 R: OutgoingRequest + Clone + Debug + Send + Sync + 'static,
87 R::IncomingResponse: Send + Sync,
88 HttpError: From<FromHttpResponseError<R::EndpointError>>,
89{
90 type Output = HttpResult<R::IncomingResponse>;
91 boxed_into_future!();
92
93 fn into_future(self) -> Self::IntoFuture {
94 let Self { client, request, config, send_progress } = self;
95
96 Box::pin(async move {
97 let res =
98 Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
99
100 if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
102 res.as_ref().map_err(HttpError::client_api_error_kind)
103 {
104 trace!("Token refresh: Unknown token error received.");
105
106 if !client.inner.auth_ctx.handle_refresh_tokens {
108 trace!("Token refresh: Automatic refresh disabled.");
109 client.broadcast_unknown_token(soft_logout);
110 return res;
111 }
112
113 if let Err(refresh_error) = client.refresh_access_token().await {
115 match &refresh_error {
116 RefreshTokenError::RefreshTokenRequired => {
117 trace!("Token refresh: The session doesn't have a refresh token.");
118 client.broadcast_unknown_token(soft_logout);
120 }
121
122 #[cfg(feature = "experimental-oidc")]
123 RefreshTokenError::Oidc(oidc_error) => {
124 match **oidc_error {
125 OidcError::Oidc(OidcClientError::TokenRefresh(
126 TokenRefreshError::Token(TokenRequestError::Http(
127 OidcHttpError {
128 body:
129 Some(OidcErrorBody {
130 error: ClientErrorCode::InvalidGrant,
131 ..
132 }),
133 ..
134 },
135 )),
136 )) => {
137 error!("Token refresh: OIDC refresh_token rejected with invalid grant");
138 client.broadcast_unknown_token(soft_logout);
140 }
141 _ => {
142 trace!("Token refresh: OIDC refresh encountered a problem.");
143 }
146 };
147 return Err(HttpError::RefreshToken(refresh_error));
148 }
149
150 _ => {
151 trace!("Token refresh: Token refresh failed.");
152 client.broadcast_unknown_token(soft_logout);
155 return Err(HttpError::RefreshToken(refresh_error));
156 }
157 }
158 } else {
159 trace!("Token refresh: Refresh succeeded, retrying request.");
160 return Box::pin(client.send_inner(request, config, send_progress)).await;
161 }
162 }
163
164 res
165 })
166 }
167}