matrix_sdk/client/
futures.rs1#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::{SharedObservable, Subscriber};
20use js_int::UInt;
21use matrix_sdk_common::{SendOutsideWasm, SyncOutsideWasm, boxed_into_future};
22use oauth2::{RequestTokenError, basic::BasicErrorResponseType};
23use ruma::api::{
24 OutgoingRequest,
25 auth_scheme::{AuthScheme, SendAccessToken},
26 client::{error::ErrorKind, media},
27 error::FromHttpResponseError,
28 path_builder::PathBuilder,
29};
30use tracing::{error, trace};
31
32use super::super::Client;
33use crate::{
34 Error, RefreshTokenError, TransmissionProgress,
35 authentication::oauth::OAuthError,
36 config::RequestConfig,
37 error::{HttpError, HttpResult},
38 http_client::SupportedPathBuilder,
39 media::MediaError,
40};
41
42#[allow(missing_debug_implementations)]
44pub struct SendRequest<R> {
45 pub(crate) client: Client,
46 pub(crate) request: R,
47 pub(crate) config: Option<RequestConfig>,
48 pub(crate) send_progress: SharedObservable<TransmissionProgress>,
49}
50
51impl<R> SendRequest<R> {
52 pub fn with_send_progress_observable(
59 mut self,
60 send_progress: SharedObservable<TransmissionProgress>,
61 ) -> Self {
62 self.send_progress = send_progress;
63 self
64 }
65
66 pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
69 self.config = request_config.into();
70 self
71 }
72
73 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
76 self.send_progress.subscribe()
77 }
78}
79
80impl<R> IntoFuture for SendRequest<R>
81where
82 R: OutgoingRequest + Clone + Debug + SendOutsideWasm + SyncOutsideWasm + 'static,
83 for<'a> R::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
84 R::PathBuilder: SupportedPathBuilder,
85 for<'a> <R::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
86 R::IncomingResponse: SendOutsideWasm + SyncOutsideWasm,
87 HttpError: From<FromHttpResponseError<R::EndpointError>>,
88{
89 type Output = HttpResult<R::IncomingResponse>;
90 boxed_into_future!();
91
92 fn into_future(self) -> Self::IntoFuture {
93 let Self { client, request, config, send_progress } = self;
94
95 Box::pin(async move {
96 let res =
97 Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
98
99 if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
101 res.as_ref().map_err(HttpError::client_api_error_kind)
102 {
103 trace!("Token refresh: Unknown token error received.");
104
105 if !client.inner.auth_ctx.handle_refresh_tokens {
107 trace!("Token refresh: Automatic refresh disabled.");
108 client.broadcast_unknown_token(soft_logout);
109 return res;
110 }
111
112 if let Err(refresh_error) = client.refresh_access_token().await {
114 match &refresh_error {
115 RefreshTokenError::RefreshTokenRequired => {
116 trace!("Token refresh: The session doesn't have a refresh token.");
117 client.broadcast_unknown_token(soft_logout);
119 }
120
121 RefreshTokenError::OAuth(oauth_error) => {
122 match &**oauth_error {
123 OAuthError::RefreshToken(RequestTokenError::ServerResponse(
124 error_response,
125 )) if *error_response.error()
126 == BasicErrorResponseType::InvalidGrant =>
127 {
128 error!(
129 "Token refresh: OAuth 2.0 refresh_token rejected \
130 with invalid grant"
131 );
132 client.broadcast_unknown_token(soft_logout);
134 }
135 _ => {
136 trace!(
137 "Token refresh: OAuth 2.0 refresh encountered a problem."
138 );
139 }
142 }
143 return Err(HttpError::RefreshToken(refresh_error));
144 }
145
146 _ => {
147 trace!("Token refresh: Token refresh failed.");
148 client.broadcast_unknown_token(soft_logout);
151 return Err(HttpError::RefreshToken(refresh_error));
152 }
153 }
154 } else {
155 trace!("Token refresh: Refresh succeeded, retrying request.");
156 return Box::pin(client.send_inner(request, config, send_progress)).await;
157 }
158 }
159
160 res
161 })
162 }
163}
164
165#[allow(missing_debug_implementations)]
169pub struct SendMediaUploadRequest {
170 send_request: SendRequest<media::create_content::v3::Request>,
171}
172
173impl SendMediaUploadRequest {
174 pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
175 Self { send_request: request }
176 }
177
178 pub fn with_send_progress_observable(
185 mut self,
186 send_progress: SharedObservable<TransmissionProgress>,
187 ) -> Self {
188 self.send_request = self.send_request.with_send_progress_observable(send_progress);
189 self
190 }
191
192 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
195 self.send_request.send_progress.subscribe()
196 }
197}
198
199impl IntoFuture for SendMediaUploadRequest {
200 type Output = Result<media::create_content::v3::Response, Error>;
201 boxed_into_future!();
202
203 fn into_future(self) -> Self::IntoFuture {
204 let request_length = self.send_request.request.file.len();
205 let client = self.send_request.client.clone();
206 let send_request = self.send_request;
207
208 Box::pin(async move {
209 let max_upload_size = client.load_or_fetch_max_upload_size().await?;
210 let request_length = UInt::new_wrapping(request_length as u64);
211 if request_length > max_upload_size {
212 return Err(Error::Media(MediaError::MediaTooLargeToUpload {
213 max: max_upload_size,
214 current: request_length,
215 }));
216 }
217
218 send_request.into_future().await.map_err(Into::into)
219 })
220 }
221}