matrix_sdk/client/
futures.rs1#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::{SharedObservable, Subscriber};
20use js_int::UInt;
21use matrix_sdk_common::{boxed_into_future, SendOutsideWasm, SyncOutsideWasm};
22use oauth2::{basic::BasicErrorResponseType, RequestTokenError};
23use ruma::api::{
24 client::{error::ErrorKind, media},
25 error::FromHttpResponseError,
26 OutgoingRequest,
27};
28use tracing::{error, trace};
29
30use super::super::Client;
31use crate::{
32 authentication::oauth::OAuthError,
33 config::RequestConfig,
34 error::{HttpError, HttpResult},
35 media::MediaError,
36 Error, RefreshTokenError, TransmissionProgress,
37};
38
39#[allow(missing_debug_implementations)]
41pub struct SendRequest<R> {
42 pub(crate) client: Client,
43 pub(crate) request: R,
44 pub(crate) config: Option<RequestConfig>,
45 pub(crate) send_progress: SharedObservable<TransmissionProgress>,
46}
47
48impl<R> SendRequest<R> {
49 pub fn with_send_progress_observable(
56 mut self,
57 send_progress: SharedObservable<TransmissionProgress>,
58 ) -> Self {
59 self.send_progress = send_progress;
60 self
61 }
62
63 pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
66 self.config = request_config.into();
67 self
68 }
69
70 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
73 self.send_progress.subscribe()
74 }
75}
76
77impl<R> IntoFuture for SendRequest<R>
78where
79 R: OutgoingRequest + Clone + Debug + SendOutsideWasm + SyncOutsideWasm + 'static,
80 R::IncomingResponse: SendOutsideWasm + SyncOutsideWasm,
81 HttpError: From<FromHttpResponseError<R::EndpointError>>,
82{
83 type Output = HttpResult<R::IncomingResponse>;
84 boxed_into_future!();
85
86 fn into_future(self) -> Self::IntoFuture {
87 let Self { client, request, config, send_progress } = self;
88
89 Box::pin(async move {
90 let res =
91 Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
92
93 if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
95 res.as_ref().map_err(HttpError::client_api_error_kind)
96 {
97 trace!("Token refresh: Unknown token error received.");
98
99 if !client.inner.auth_ctx.handle_refresh_tokens {
101 trace!("Token refresh: Automatic refresh disabled.");
102 client.broadcast_unknown_token(soft_logout);
103 return res;
104 }
105
106 if let Err(refresh_error) = client.refresh_access_token().await {
108 match &refresh_error {
109 RefreshTokenError::RefreshTokenRequired => {
110 trace!("Token refresh: The session doesn't have a refresh token.");
111 client.broadcast_unknown_token(soft_logout);
113 }
114
115 RefreshTokenError::OAuth(oauth_error) => {
116 match &**oauth_error {
117 OAuthError::RefreshToken(RequestTokenError::ServerResponse(
118 error_response,
119 )) if *error_response.error()
120 == BasicErrorResponseType::InvalidGrant =>
121 {
122 error!(
123 "Token refresh: OAuth 2.0 refresh_token rejected \
124 with invalid grant"
125 );
126 client.broadcast_unknown_token(soft_logout);
128 }
129 _ => {
130 trace!(
131 "Token refresh: OAuth 2.0 refresh encountered a problem."
132 );
133 }
136 }
137 return Err(HttpError::RefreshToken(refresh_error));
138 }
139
140 _ => {
141 trace!("Token refresh: Token refresh failed.");
142 client.broadcast_unknown_token(soft_logout);
145 return Err(HttpError::RefreshToken(refresh_error));
146 }
147 }
148 } else {
149 trace!("Token refresh: Refresh succeeded, retrying request.");
150 return Box::pin(client.send_inner(request, config, send_progress)).await;
151 }
152 }
153
154 res
155 })
156 }
157}
158
159#[allow(missing_debug_implementations)]
163pub struct SendMediaUploadRequest {
164 send_request: SendRequest<media::create_content::v3::Request>,
165}
166
167impl SendMediaUploadRequest {
168 pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
169 Self { send_request: request }
170 }
171
172 pub fn with_send_progress_observable(
179 mut self,
180 send_progress: SharedObservable<TransmissionProgress>,
181 ) -> Self {
182 self.send_request = self.send_request.with_send_progress_observable(send_progress);
183 self
184 }
185
186 pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
189 self.send_request.send_progress.subscribe()
190 }
191}
192
193impl IntoFuture for SendMediaUploadRequest {
194 type Output = Result<media::create_content::v3::Response, Error>;
195 boxed_into_future!();
196
197 fn into_future(self) -> Self::IntoFuture {
198 let request_length = self.send_request.request.file.len();
199 let client = self.send_request.client.clone();
200 let send_request = self.send_request;
201
202 Box::pin(async move {
203 let max_upload_size = client.load_or_fetch_max_upload_size().await?;
204 let request_length = UInt::new_wrapping(request_length as u64);
205 if request_length > max_upload_size {
206 return Err(Error::Media(MediaError::MediaTooLargeToUpload {
207 max: max_upload_size,
208 current: request_length,
209 }));
210 }
211
212 send_request.into_future().await.map_err(Into::into)
213 })
214 }
215}