matrix_sdk/client/
futures.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::{SharedObservable, Subscriber};
20use js_int::UInt;
21use matrix_sdk_common::{SendOutsideWasm, SyncOutsideWasm, boxed_into_future};
22use oauth2::{RequestTokenError, basic::BasicErrorResponseType};
23use ruma::api::{
24    OutgoingRequest,
25    client::{error::ErrorKind, media},
26    error::FromHttpResponseError,
27};
28use tracing::{error, trace};
29
30use super::super::Client;
31use crate::{
32    Error, RefreshTokenError, TransmissionProgress,
33    authentication::oauth::OAuthError,
34    config::RequestConfig,
35    error::{HttpError, HttpResult},
36    http_client::SupportedAuthScheme,
37    media::MediaError,
38};
39
40/// `IntoFuture` returned by [`Client::send`].
41#[allow(missing_debug_implementations)]
42pub struct SendRequest<R> {
43    pub(crate) client: Client,
44    pub(crate) request: R,
45    pub(crate) config: Option<RequestConfig>,
46    pub(crate) send_progress: SharedObservable<TransmissionProgress>,
47}
48
49impl<R> SendRequest<R> {
50    /// Replace the default `SharedObservable` used for tracking upload
51    /// progress.
52    ///
53    /// Note that any subscribers obtained from
54    /// [`subscribe_to_send_progress`][Self::subscribe_to_send_progress]
55    /// will be invalidated by this.
56    pub fn with_send_progress_observable(
57        mut self,
58        send_progress: SharedObservable<TransmissionProgress>,
59    ) -> Self {
60        self.send_progress = send_progress;
61        self
62    }
63
64    /// Use the given [`RequestConfig`] for this send request, instead of the
65    /// one provided by default.
66    pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
67        self.config = request_config.into();
68        self
69    }
70
71    /// Get a subscriber to observe the progress of sending the request
72    /// body.
73    pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
74        self.send_progress.subscribe()
75    }
76}
77
78impl<R> IntoFuture for SendRequest<R>
79where
80    R: OutgoingRequest + Clone + Debug + SendOutsideWasm + SyncOutsideWasm + 'static,
81    R::Authentication: SupportedAuthScheme,
82    R::IncomingResponse: SendOutsideWasm + SyncOutsideWasm,
83    HttpError: From<FromHttpResponseError<R::EndpointError>>,
84{
85    type Output = HttpResult<R::IncomingResponse>;
86    boxed_into_future!();
87
88    fn into_future(self) -> Self::IntoFuture {
89        let Self { client, request, config, send_progress } = self;
90
91        Box::pin(async move {
92            let res =
93                Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
94
95            // An `M_UNKNOWN_TOKEN` error can potentially be fixed with a token refresh.
96            if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
97                res.as_ref().map_err(HttpError::client_api_error_kind)
98            {
99                trace!("Token refresh: Unknown token error received.");
100
101                // If automatic token refresh isn't supported, there is nothing more to do.
102                if !client.inner.auth_ctx.handle_refresh_tokens {
103                    trace!("Token refresh: Automatic refresh disabled.");
104                    client.broadcast_unknown_token(soft_logout);
105                    return res;
106                }
107
108                // Try to refresh the token and retry the request.
109                if let Err(refresh_error) = client.refresh_access_token().await {
110                    match &refresh_error {
111                        RefreshTokenError::RefreshTokenRequired => {
112                            trace!("Token refresh: The session doesn't have a refresh token.");
113                            // Refreshing access tokens is not supported by this `Session`, ignore.
114                            client.broadcast_unknown_token(soft_logout);
115                        }
116
117                        RefreshTokenError::OAuth(oauth_error) => {
118                            match &**oauth_error {
119                                OAuthError::RefreshToken(RequestTokenError::ServerResponse(
120                                    error_response,
121                                )) if *error_response.error()
122                                    == BasicErrorResponseType::InvalidGrant =>
123                                {
124                                    error!(
125                                        "Token refresh: OAuth 2.0 refresh_token rejected \
126                                         with invalid grant"
127                                    );
128                                    // The refresh was denied, signal to sign out the user.
129                                    client.broadcast_unknown_token(soft_logout);
130                                }
131                                _ => {
132                                    trace!(
133                                        "Token refresh: OAuth 2.0 refresh encountered a problem."
134                                    );
135                                    // The refresh failed for other reasons, no
136                                    // need to sign out.
137                                }
138                            }
139                            return Err(HttpError::RefreshToken(refresh_error));
140                        }
141
142                        _ => {
143                            trace!("Token refresh: Token refresh failed.");
144                            // This isn't necessarily correct, but matches the behaviour when
145                            // implementing OAuth 2.0.
146                            client.broadcast_unknown_token(soft_logout);
147                            return Err(HttpError::RefreshToken(refresh_error));
148                        }
149                    }
150                } else {
151                    trace!("Token refresh: Refresh succeeded, retrying request.");
152                    return Box::pin(client.send_inner(request, config, send_progress)).await;
153                }
154            }
155
156            res
157        })
158    }
159}
160
161/// `IntoFuture` used to send media upload requests. It wraps another
162/// [`SendRequest`], checking its size will be accepted by the homeserver before
163/// uploading.
164#[allow(missing_debug_implementations)]
165pub struct SendMediaUploadRequest {
166    send_request: SendRequest<media::create_content::v3::Request>,
167}
168
169impl SendMediaUploadRequest {
170    pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
171        Self { send_request: request }
172    }
173
174    /// Replace the default `SharedObservable` used for tracking upload
175    /// progress.
176    ///
177    /// Note that any subscribers obtained from
178    /// [`subscribe_to_send_progress`][Self::subscribe_to_send_progress]
179    /// will be invalidated by this.
180    pub fn with_send_progress_observable(
181        mut self,
182        send_progress: SharedObservable<TransmissionProgress>,
183    ) -> Self {
184        self.send_request = self.send_request.with_send_progress_observable(send_progress);
185        self
186    }
187
188    /// Get a subscriber to observe the progress of sending the request
189    /// body.
190    pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
191        self.send_request.send_progress.subscribe()
192    }
193}
194
195impl IntoFuture for SendMediaUploadRequest {
196    type Output = Result<media::create_content::v3::Response, Error>;
197    boxed_into_future!();
198
199    fn into_future(self) -> Self::IntoFuture {
200        let request_length = self.send_request.request.file.len();
201        let client = self.send_request.client.clone();
202        let send_request = self.send_request;
203
204        Box::pin(async move {
205            let max_upload_size = client.load_or_fetch_max_upload_size().await?;
206            let request_length = UInt::new_wrapping(request_length as u64);
207            if request_length > max_upload_size {
208                return Err(Error::Media(MediaError::MediaTooLargeToUpload {
209                    max: max_upload_size,
210                    current: request_length,
211                }));
212            }
213
214            send_request.into_future().await.map_err(Into::into)
215        })
216    }
217}