Skip to main content

matrix_sdk/client/
futures.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::{SharedObservable, Subscriber};
20use js_int::UInt;
21use matrix_sdk_common::{SendOutsideWasm, SyncOutsideWasm, boxed_into_future};
22use oauth2::{RequestTokenError, basic::BasicErrorResponseType};
23use ruma::api::{
24    OutgoingRequest,
25    client::{error::ErrorKind, media},
26    error::FromHttpResponseError,
27    path_builder::PathBuilder,
28};
29use tracing::{error, trace};
30
31use super::super::Client;
32use crate::{
33    Error, RefreshTokenError, TransmissionProgress,
34    authentication::oauth::OAuthError,
35    config::RequestConfig,
36    error::{HttpError, HttpResult},
37    http_client::{SupportedAuthScheme, SupportedPathBuilder},
38    media::MediaError,
39};
40
41/// `IntoFuture` returned by [`Client::send`].
42#[allow(missing_debug_implementations)]
43pub struct SendRequest<R> {
44    pub(crate) client: Client,
45    pub(crate) request: R,
46    pub(crate) config: Option<RequestConfig>,
47    pub(crate) send_progress: SharedObservable<TransmissionProgress>,
48}
49
50impl<R> SendRequest<R> {
51    /// Replace the default `SharedObservable` used for tracking upload
52    /// progress.
53    ///
54    /// Note that any subscribers obtained from
55    /// [`subscribe_to_send_progress`][Self::subscribe_to_send_progress]
56    /// will be invalidated by this.
57    pub fn with_send_progress_observable(
58        mut self,
59        send_progress: SharedObservable<TransmissionProgress>,
60    ) -> Self {
61        self.send_progress = send_progress;
62        self
63    }
64
65    /// Use the given [`RequestConfig`] for this send request, instead of the
66    /// one provided by default.
67    pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
68        self.config = request_config.into();
69        self
70    }
71
72    /// Get a subscriber to observe the progress of sending the request
73    /// body.
74    pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
75        self.send_progress.subscribe()
76    }
77}
78
79impl<R> IntoFuture for SendRequest<R>
80where
81    R: OutgoingRequest + Clone + Debug + SendOutsideWasm + SyncOutsideWasm + 'static,
82    R::Authentication: SupportedAuthScheme,
83    R::PathBuilder: SupportedPathBuilder,
84    for<'a> <R::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
85    R::IncomingResponse: SendOutsideWasm + SyncOutsideWasm,
86    HttpError: From<FromHttpResponseError<R::EndpointError>>,
87{
88    type Output = HttpResult<R::IncomingResponse>;
89    boxed_into_future!();
90
91    fn into_future(self) -> Self::IntoFuture {
92        let Self { client, request, config, send_progress } = self;
93
94        Box::pin(async move {
95            let res =
96                Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
97
98            // An `M_UNKNOWN_TOKEN` error can potentially be fixed with a token refresh.
99            if let Err(Some(ErrorKind::UnknownToken(unknown_token_data))) =
100                res.as_ref().map_err(HttpError::client_api_error_kind)
101            {
102                trace!("Token refresh: Unknown token error received.");
103
104                // If automatic token refresh isn't supported, there is nothing more to do.
105                if !client.inner.auth_ctx.handle_refresh_tokens {
106                    trace!("Token refresh: Automatic refresh disabled.");
107                    client.broadcast_unknown_token(unknown_token_data);
108                    return res;
109                }
110
111                // Try to refresh the token and retry the request.
112                if let Err(refresh_error) = client.refresh_access_token().await {
113                    match &refresh_error {
114                        RefreshTokenError::RefreshTokenRequired => {
115                            trace!("Token refresh: The session doesn't have a refresh token.");
116                            // Refreshing access tokens is not supported by this `Session`, ignore.
117                            client.broadcast_unknown_token(unknown_token_data);
118                        }
119
120                        RefreshTokenError::OAuth(oauth_error) => {
121                            match &**oauth_error {
122                                OAuthError::RefreshToken(RequestTokenError::ServerResponse(
123                                    error_response,
124                                )) if *error_response.error()
125                                    == BasicErrorResponseType::InvalidGrant =>
126                                {
127                                    error!(
128                                        "Token refresh: OAuth 2.0 refresh_token rejected \
129                                         with invalid grant"
130                                    );
131                                    // The refresh was denied, signal to sign out the user.
132                                    client.broadcast_unknown_token(unknown_token_data);
133                                }
134                                _ => {
135                                    trace!(
136                                        "Token refresh: OAuth 2.0 refresh encountered a problem."
137                                    );
138                                    // The refresh failed for other reasons, no
139                                    // need to sign out.
140                                }
141                            }
142                            return Err(HttpError::RefreshToken(refresh_error));
143                        }
144
145                        _ => {
146                            trace!("Token refresh: Token refresh failed.");
147                            // This isn't necessarily correct, but matches the behaviour when
148                            // implementing OAuth 2.0.
149                            client.broadcast_unknown_token(unknown_token_data);
150                            return Err(HttpError::RefreshToken(refresh_error));
151                        }
152                    }
153                } else {
154                    trace!("Token refresh: Refresh succeeded, retrying request.");
155                    return Box::pin(client.send_inner(request, config, send_progress)).await;
156                }
157            }
158
159            res
160        })
161    }
162}
163
164/// `IntoFuture` used to send media upload requests. It wraps another
165/// [`SendRequest`], checking its size will be accepted by the homeserver before
166/// uploading.
167#[allow(missing_debug_implementations)]
168pub struct SendMediaUploadRequest {
169    send_request: SendRequest<media::create_content::v3::Request>,
170}
171
172impl SendMediaUploadRequest {
173    pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
174        Self { send_request: request }
175    }
176
177    /// Replace the default `SharedObservable` used for tracking upload
178    /// progress.
179    ///
180    /// Note that any subscribers obtained from
181    /// [`subscribe_to_send_progress`][Self::subscribe_to_send_progress]
182    /// will be invalidated by this.
183    pub fn with_send_progress_observable(
184        mut self,
185        send_progress: SharedObservable<TransmissionProgress>,
186    ) -> Self {
187        self.send_request = self.send_request.with_send_progress_observable(send_progress);
188        self
189    }
190
191    /// Get a subscriber to observe the progress of sending the request
192    /// body.
193    pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
194        self.send_request.send_progress.subscribe()
195    }
196}
197
198impl IntoFuture for SendMediaUploadRequest {
199    type Output = Result<media::create_content::v3::Response, Error>;
200    boxed_into_future!();
201
202    fn into_future(self) -> Self::IntoFuture {
203        let request_length = self.send_request.request.file.len();
204        let client = self.send_request.client.clone();
205        let send_request = self.send_request;
206
207        Box::pin(async move {
208            let max_upload_size = client.load_or_fetch_max_upload_size().await?;
209            let request_length = UInt::new_wrapping(request_length as u64);
210            if request_length > max_upload_size {
211                return Err(Error::Media(MediaError::MediaTooLargeToUpload {
212                    max: max_upload_size,
213                    current: request_length,
214                }));
215            }
216
217            send_request.into_future().await.map_err(Into::into)
218        })
219    }
220}