matrix_sdk/client/
futures.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::{SharedObservable, Subscriber};
20use js_int::UInt;
21use matrix_sdk_common::{SendOutsideWasm, SyncOutsideWasm, boxed_into_future};
22use oauth2::{RequestTokenError, basic::BasicErrorResponseType};
23use ruma::api::{
24    OutgoingRequest,
25    auth_scheme::{AuthScheme, SendAccessToken},
26    client::{error::ErrorKind, media},
27    error::FromHttpResponseError,
28    path_builder::PathBuilder,
29};
30use tracing::{error, trace};
31
32use super::super::Client;
33use crate::{
34    Error, RefreshTokenError, TransmissionProgress,
35    authentication::oauth::OAuthError,
36    config::RequestConfig,
37    error::{HttpError, HttpResult},
38    http_client::SupportedPathBuilder,
39    media::MediaError,
40};
41
42/// `IntoFuture` returned by [`Client::send`].
43#[allow(missing_debug_implementations)]
44pub struct SendRequest<R> {
45    pub(crate) client: Client,
46    pub(crate) request: R,
47    pub(crate) config: Option<RequestConfig>,
48    pub(crate) send_progress: SharedObservable<TransmissionProgress>,
49}
50
51impl<R> SendRequest<R> {
52    /// Replace the default `SharedObservable` used for tracking upload
53    /// progress.
54    ///
55    /// Note that any subscribers obtained from
56    /// [`subscribe_to_send_progress`][Self::subscribe_to_send_progress]
57    /// will be invalidated by this.
58    pub fn with_send_progress_observable(
59        mut self,
60        send_progress: SharedObservable<TransmissionProgress>,
61    ) -> Self {
62        self.send_progress = send_progress;
63        self
64    }
65
66    /// Use the given [`RequestConfig`] for this send request, instead of the
67    /// one provided by default.
68    pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
69        self.config = request_config.into();
70        self
71    }
72
73    /// Get a subscriber to observe the progress of sending the request
74    /// body.
75    pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
76        self.send_progress.subscribe()
77    }
78}
79
80impl<R> IntoFuture for SendRequest<R>
81where
82    R: OutgoingRequest + Clone + Debug + SendOutsideWasm + SyncOutsideWasm + 'static,
83    for<'a> R::Authentication: AuthScheme<Input<'a> = SendAccessToken<'a>>,
84    R::PathBuilder: SupportedPathBuilder,
85    for<'a> <R::PathBuilder as PathBuilder>::Input<'a>: SendOutsideWasm + SyncOutsideWasm,
86    R::IncomingResponse: SendOutsideWasm + SyncOutsideWasm,
87    HttpError: From<FromHttpResponseError<R::EndpointError>>,
88{
89    type Output = HttpResult<R::IncomingResponse>;
90    boxed_into_future!();
91
92    fn into_future(self) -> Self::IntoFuture {
93        let Self { client, request, config, send_progress } = self;
94
95        Box::pin(async move {
96            let res =
97                Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
98
99            // An `M_UNKNOWN_TOKEN` error can potentially be fixed with a token refresh.
100            if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
101                res.as_ref().map_err(HttpError::client_api_error_kind)
102            {
103                trace!("Token refresh: Unknown token error received.");
104
105                // If automatic token refresh isn't supported, there is nothing more to do.
106                if !client.inner.auth_ctx.handle_refresh_tokens {
107                    trace!("Token refresh: Automatic refresh disabled.");
108                    client.broadcast_unknown_token(soft_logout);
109                    return res;
110                }
111
112                // Try to refresh the token and retry the request.
113                if let Err(refresh_error) = client.refresh_access_token().await {
114                    match &refresh_error {
115                        RefreshTokenError::RefreshTokenRequired => {
116                            trace!("Token refresh: The session doesn't have a refresh token.");
117                            // Refreshing access tokens is not supported by this `Session`, ignore.
118                            client.broadcast_unknown_token(soft_logout);
119                        }
120
121                        RefreshTokenError::OAuth(oauth_error) => {
122                            match &**oauth_error {
123                                OAuthError::RefreshToken(RequestTokenError::ServerResponse(
124                                    error_response,
125                                )) if *error_response.error()
126                                    == BasicErrorResponseType::InvalidGrant =>
127                                {
128                                    error!(
129                                        "Token refresh: OAuth 2.0 refresh_token rejected \
130                                         with invalid grant"
131                                    );
132                                    // The refresh was denied, signal to sign out the user.
133                                    client.broadcast_unknown_token(soft_logout);
134                                }
135                                _ => {
136                                    trace!(
137                                        "Token refresh: OAuth 2.0 refresh encountered a problem."
138                                    );
139                                    // The refresh failed for other reasons, no
140                                    // need to sign out.
141                                }
142                            }
143                            return Err(HttpError::RefreshToken(refresh_error));
144                        }
145
146                        _ => {
147                            trace!("Token refresh: Token refresh failed.");
148                            // This isn't necessarily correct, but matches the behaviour when
149                            // implementing OAuth 2.0.
150                            client.broadcast_unknown_token(soft_logout);
151                            return Err(HttpError::RefreshToken(refresh_error));
152                        }
153                    }
154                } else {
155                    trace!("Token refresh: Refresh succeeded, retrying request.");
156                    return Box::pin(client.send_inner(request, config, send_progress)).await;
157                }
158            }
159
160            res
161        })
162    }
163}
164
165/// `IntoFuture` used to send media upload requests. It wraps another
166/// [`SendRequest`], checking its size will be accepted by the homeserver before
167/// uploading.
168#[allow(missing_debug_implementations)]
169pub struct SendMediaUploadRequest {
170    send_request: SendRequest<media::create_content::v3::Request>,
171}
172
173impl SendMediaUploadRequest {
174    pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
175        Self { send_request: request }
176    }
177
178    /// Replace the default `SharedObservable` used for tracking upload
179    /// progress.
180    ///
181    /// Note that any subscribers obtained from
182    /// [`subscribe_to_send_progress`][Self::subscribe_to_send_progress]
183    /// will be invalidated by this.
184    pub fn with_send_progress_observable(
185        mut self,
186        send_progress: SharedObservable<TransmissionProgress>,
187    ) -> Self {
188        self.send_request = self.send_request.with_send_progress_observable(send_progress);
189        self
190    }
191
192    /// Get a subscriber to observe the progress of sending the request
193    /// body.
194    pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
195        self.send_request.send_progress.subscribe()
196    }
197}
198
199impl IntoFuture for SendMediaUploadRequest {
200    type Output = Result<media::create_content::v3::Response, Error>;
201    boxed_into_future!();
202
203    fn into_future(self) -> Self::IntoFuture {
204        let request_length = self.send_request.request.file.len();
205        let client = self.send_request.client.clone();
206        let send_request = self.send_request;
207
208        Box::pin(async move {
209            let max_upload_size = client.load_or_fetch_max_upload_size().await?;
210            let request_length = UInt::new_wrapping(request_length as u64);
211            if request_length > max_upload_size {
212                return Err(Error::Media(MediaError::MediaTooLargeToUpload {
213                    max: max_upload_size,
214                    current: request_length,
215                }));
216            }
217
218            send_request.into_future().await.map_err(Into::into)
219        })
220    }
221}