matrix_sdk/client/
futures.rs1#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::{SharedObservable, Subscriber};
20use js_int::UInt;
21use matrix_sdk_common::{SendOutsideWasm, SyncOutsideWasm, boxed_into_future};
22use oauth2::{RequestTokenError, basic::BasicErrorResponseType};
23use ruma::api::{
24 OutgoingRequest,
25 client::{error::ErrorKind, media},
26 error::FromHttpResponseError,
27};
28use tracing::{error, trace};
29
30use super::super::Client;
31use crate::{
32 Error, RefreshTokenError, TransmissionProgress,
33 authentication::oauth::OAuthError,
34 config::RequestConfig,
35 error::{HttpError, HttpResult},
36 http_client::SupportedAuthScheme,
37 media::MediaError,
38};
39
40#[allow(missing_debug_implementations)]
42pub struct SendRequest<R> {
43 pub(crate) client: Client,
44 pub(crate) request: R,
45 pub(crate) config: Option<RequestConfig>,
46 pub(crate) send_progress: SharedObservable<TransmissionProgress>,
47}
48
49impl<R> SendRequest<R> {
50 pub fn with_send_progress_observable(
57 mut self,
58 send_progress: SharedObservable<TransmissionProgress>,
59 ) -> Self {
60 self.send_progress = send_progress;
61 self
62 }
63
64 pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
67 self.config = request_config.into();
68 self
69 }
70
71 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
74 self.send_progress.subscribe()
75 }
76}
77
78impl<R> IntoFuture for SendRequest<R>
79where
80 R: OutgoingRequest + Clone + Debug + SendOutsideWasm + SyncOutsideWasm + 'static,
81 R::Authentication: SupportedAuthScheme,
82 R::IncomingResponse: SendOutsideWasm + SyncOutsideWasm,
83 HttpError: From<FromHttpResponseError<R::EndpointError>>,
84{
85 type Output = HttpResult<R::IncomingResponse>;
86 boxed_into_future!();
87
88 fn into_future(self) -> Self::IntoFuture {
89 let Self { client, request, config, send_progress } = self;
90
91 Box::pin(async move {
92 let res =
93 Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
94
95 if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
97 res.as_ref().map_err(HttpError::client_api_error_kind)
98 {
99 trace!("Token refresh: Unknown token error received.");
100
101 if !client.inner.auth_ctx.handle_refresh_tokens {
103 trace!("Token refresh: Automatic refresh disabled.");
104 client.broadcast_unknown_token(soft_logout);
105 return res;
106 }
107
108 if let Err(refresh_error) = client.refresh_access_token().await {
110 match &refresh_error {
111 RefreshTokenError::RefreshTokenRequired => {
112 trace!("Token refresh: The session doesn't have a refresh token.");
113 client.broadcast_unknown_token(soft_logout);
115 }
116
117 RefreshTokenError::OAuth(oauth_error) => {
118 match &**oauth_error {
119 OAuthError::RefreshToken(RequestTokenError::ServerResponse(
120 error_response,
121 )) if *error_response.error()
122 == BasicErrorResponseType::InvalidGrant =>
123 {
124 error!(
125 "Token refresh: OAuth 2.0 refresh_token rejected \
126 with invalid grant"
127 );
128 client.broadcast_unknown_token(soft_logout);
130 }
131 _ => {
132 trace!(
133 "Token refresh: OAuth 2.0 refresh encountered a problem."
134 );
135 }
138 }
139 return Err(HttpError::RefreshToken(refresh_error));
140 }
141
142 _ => {
143 trace!("Token refresh: Token refresh failed.");
144 client.broadcast_unknown_token(soft_logout);
147 return Err(HttpError::RefreshToken(refresh_error));
148 }
149 }
150 } else {
151 trace!("Token refresh: Refresh succeeded, retrying request.");
152 return Box::pin(client.send_inner(request, config, send_progress)).await;
153 }
154 }
155
156 res
157 })
158 }
159}
160
161#[allow(missing_debug_implementations)]
165pub struct SendMediaUploadRequest {
166 send_request: SendRequest<media::create_content::v3::Request>,
167}
168
169impl SendMediaUploadRequest {
170 pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
171 Self { send_request: request }
172 }
173
174 pub fn with_send_progress_observable(
181 mut self,
182 send_progress: SharedObservable<TransmissionProgress>,
183 ) -> Self {
184 self.send_request = self.send_request.with_send_progress_observable(send_progress);
185 self
186 }
187
188 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
191 self.send_request.send_progress.subscribe()
192 }
193}
194
195impl IntoFuture for SendMediaUploadRequest {
196 type Output = Result<media::create_content::v3::Response, Error>;
197 boxed_into_future!();
198
199 fn into_future(self) -> Self::IntoFuture {
200 let request_length = self.send_request.request.file.len();
201 let client = self.send_request.client.clone();
202 let send_request = self.send_request;
203
204 Box::pin(async move {
205 let max_upload_size = client.load_or_fetch_max_upload_size().await?;
206 let request_length = UInt::new_wrapping(request_length as u64);
207 if request_length > max_upload_size {
208 return Err(Error::Media(MediaError::MediaTooLargeToUpload {
209 max: max_upload_size,
210 current: request_length,
211 }));
212 }
213
214 send_request.into_future().await.map_err(Into::into)
215 })
216 }
217}