matrix_sdk/client/
futures.rs1#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::{SharedObservable, Subscriber};
20use js_int::UInt;
21use matrix_sdk_common::{SendOutsideWasm, SyncOutsideWasm, boxed_into_future};
22use oauth2::{RequestTokenError, basic::BasicErrorResponseType};
23use ruma::api::{
24 OutgoingRequest,
25 client::{error::ErrorKind, media},
26 error::FromHttpResponseError,
27 path_builder::PathBuilder,
28};
29use tracing::{error, trace};
30
31use super::super::Client;
32use crate::{
33 Error, RefreshTokenError, TransmissionProgress,
34 authentication::oauth::OAuthError,
35 config::RequestConfig,
36 error::{HttpError, HttpResult},
37 http_client::{SupportedAuthScheme, SupportedPathBuilder},
38 media::MediaError,
39};
40
41#[allow(missing_debug_implementations)]
43pub struct SendRequest<R> {
44 pub(crate) client: Client,
45 pub(crate) request: R,
46 pub(crate) config: Option<RequestConfig>,
47 pub(crate) send_progress: SharedObservable<TransmissionProgress>,
48}
49
50impl<R> SendRequest<R> {
51 pub fn with_send_progress_observable(
58 mut self,
59 send_progress: SharedObservable<TransmissionProgress>,
60 ) -> Self {
61 self.send_progress = send_progress;
62 self
63 }
64
65 pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
68 self.config = request_config.into();
69 self
70 }
71
72 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
75 self.send_progress.subscribe()
76 }
77}
78
79impl<R> IntoFuture for SendRequest<R>
80where
81 R: OutgoingRequest + Clone + Debug + SendOutsideWasm + SyncOutsideWasm + 'static,
82 R::Authentication: SupportedAuthScheme,
83 R::PathBuilder: SupportedPathBuilder,
84 for<'a> <R::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
85 R::IncomingResponse: SendOutsideWasm + SyncOutsideWasm,
86 HttpError: From<FromHttpResponseError<R::EndpointError>>,
87{
88 type Output = HttpResult<R::IncomingResponse>;
89 boxed_into_future!();
90
91 fn into_future(self) -> Self::IntoFuture {
92 let Self { client, request, config, send_progress } = self;
93
94 Box::pin(async move {
95 let res =
96 Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
97
98 if let Err(Some(ErrorKind::UnknownToken(unknown_token_data))) =
100 res.as_ref().map_err(HttpError::client_api_error_kind)
101 {
102 trace!("Token refresh: Unknown token error received.");
103
104 if !client.inner.auth_ctx.handle_refresh_tokens {
106 trace!("Token refresh: Automatic refresh disabled.");
107 client.broadcast_unknown_token(unknown_token_data);
108 return res;
109 }
110
111 if let Err(refresh_error) = client.refresh_access_token().await {
113 match &refresh_error {
114 RefreshTokenError::RefreshTokenRequired => {
115 trace!("Token refresh: The session doesn't have a refresh token.");
116 client.broadcast_unknown_token(unknown_token_data);
118 }
119
120 RefreshTokenError::OAuth(oauth_error) => {
121 match &**oauth_error {
122 OAuthError::RefreshToken(RequestTokenError::ServerResponse(
123 error_response,
124 )) if *error_response.error()
125 == BasicErrorResponseType::InvalidGrant =>
126 {
127 error!(
128 "Token refresh: OAuth 2.0 refresh_token rejected \
129 with invalid grant"
130 );
131 client.broadcast_unknown_token(unknown_token_data);
133 }
134 _ => {
135 trace!(
136 "Token refresh: OAuth 2.0 refresh encountered a problem."
137 );
138 }
141 }
142 return Err(HttpError::RefreshToken(refresh_error));
143 }
144
145 _ => {
146 trace!("Token refresh: Token refresh failed.");
147 client.broadcast_unknown_token(unknown_token_data);
150 return Err(HttpError::RefreshToken(refresh_error));
151 }
152 }
153 } else {
154 trace!("Token refresh: Refresh succeeded, retrying request.");
155 return Box::pin(client.send_inner(request, config, send_progress)).await;
156 }
157 }
158
159 res
160 })
161 }
162}
163
164#[allow(missing_debug_implementations)]
168pub struct SendMediaUploadRequest {
169 send_request: SendRequest<media::create_content::v3::Request>,
170}
171
172impl SendMediaUploadRequest {
173 pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
174 Self { send_request: request }
175 }
176
177 pub fn with_send_progress_observable(
184 mut self,
185 send_progress: SharedObservable<TransmissionProgress>,
186 ) -> Self {
187 self.send_request = self.send_request.with_send_progress_observable(send_progress);
188 self
189 }
190
191 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
194 self.send_request.send_progress.subscribe()
195 }
196}
197
198impl IntoFuture for SendMediaUploadRequest {
199 type Output = Result<media::create_content::v3::Response, Error>;
200 boxed_into_future!();
201
202 fn into_future(self) -> Self::IntoFuture {
203 let request_length = self.send_request.request.file.len();
204 let client = self.send_request.client.clone();
205 let send_request = self.send_request;
206
207 Box::pin(async move {
208 let max_upload_size = client.load_or_fetch_max_upload_size().await?;
209 let request_length = UInt::new_wrapping(request_length as u64);
210 if request_length > max_upload_size {
211 return Err(Error::Media(MediaError::MediaTooLargeToUpload {
212 max: max_upload_size,
213 current: request_length,
214 }));
215 }
216
217 send_request.into_future().await.map_err(Into::into)
218 })
219 }
220}