matrix_sdk/client/
futures.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![deny(unreachable_pub)]
16
17use std::{fmt::Debug, future::IntoFuture};
18
19use eyeball::{SharedObservable, Subscriber};
20use js_int::UInt;
21use matrix_sdk_common::{boxed_into_future, SendOutsideWasm, SyncOutsideWasm};
22use oauth2::{basic::BasicErrorResponseType, RequestTokenError};
23use ruma::api::{
24    client::{error::ErrorKind, media},
25    error::FromHttpResponseError,
26    OutgoingRequest,
27};
28use tracing::{error, trace};
29
30use super::super::Client;
31use crate::{
32    authentication::oauth::OAuthError,
33    config::RequestConfig,
34    error::{HttpError, HttpResult},
35    media::MediaError,
36    Error, RefreshTokenError, TransmissionProgress,
37};
38
39/// `IntoFuture` returned by [`Client::send`].
40#[allow(missing_debug_implementations)]
41pub struct SendRequest<R> {
42    pub(crate) client: Client,
43    pub(crate) request: R,
44    pub(crate) config: Option<RequestConfig>,
45    pub(crate) send_progress: SharedObservable<TransmissionProgress>,
46}
47
48impl<R> SendRequest<R> {
49    /// Replace the default `SharedObservable` used for tracking upload
50    /// progress.
51    ///
52    /// Note that any subscribers obtained from
53    /// [`subscribe_to_send_progress`][Self::subscribe_to_send_progress]
54    /// will be invalidated by this.
55    pub fn with_send_progress_observable(
56        mut self,
57        send_progress: SharedObservable<TransmissionProgress>,
58    ) -> Self {
59        self.send_progress = send_progress;
60        self
61    }
62
63    /// Use the given [`RequestConfig`] for this send request, instead of the
64    /// one provided by default.
65    pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
66        self.config = request_config.into();
67        self
68    }
69
70    /// Get a subscriber to observe the progress of sending the request
71    /// body.
72    pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
73        self.send_progress.subscribe()
74    }
75}
76
77impl<R> IntoFuture for SendRequest<R>
78where
79    R: OutgoingRequest + Clone + Debug + SendOutsideWasm + SyncOutsideWasm + 'static,
80    R::IncomingResponse: SendOutsideWasm + SyncOutsideWasm,
81    HttpError: From<FromHttpResponseError<R::EndpointError>>,
82{
83    type Output = HttpResult<R::IncomingResponse>;
84    boxed_into_future!();
85
86    fn into_future(self) -> Self::IntoFuture {
87        let Self { client, request, config, send_progress } = self;
88
89        Box::pin(async move {
90            let res =
91                Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
92
93            // An `M_UNKNOWN_TOKEN` error can potentially be fixed with a token refresh.
94            if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
95                res.as_ref().map_err(HttpError::client_api_error_kind)
96            {
97                trace!("Token refresh: Unknown token error received.");
98
99                // If automatic token refresh isn't supported, there is nothing more to do.
100                if !client.inner.auth_ctx.handle_refresh_tokens {
101                    trace!("Token refresh: Automatic refresh disabled.");
102                    client.broadcast_unknown_token(soft_logout);
103                    return res;
104                }
105
106                // Try to refresh the token and retry the request.
107                if let Err(refresh_error) = client.refresh_access_token().await {
108                    match &refresh_error {
109                        RefreshTokenError::RefreshTokenRequired => {
110                            trace!("Token refresh: The session doesn't have a refresh token.");
111                            // Refreshing access tokens is not supported by this `Session`, ignore.
112                            client.broadcast_unknown_token(soft_logout);
113                        }
114
115                        RefreshTokenError::OAuth(oauth_error) => {
116                            match &**oauth_error {
117                                OAuthError::RefreshToken(RequestTokenError::ServerResponse(
118                                    error_response,
119                                )) if *error_response.error()
120                                    == BasicErrorResponseType::InvalidGrant =>
121                                {
122                                    error!(
123                                        "Token refresh: OAuth 2.0 refresh_token rejected \
124                                         with invalid grant"
125                                    );
126                                    // The refresh was denied, signal to sign out the user.
127                                    client.broadcast_unknown_token(soft_logout);
128                                }
129                                _ => {
130                                    trace!(
131                                        "Token refresh: OAuth 2.0 refresh encountered a problem."
132                                    );
133                                    // The refresh failed for other reasons, no
134                                    // need to sign out.
135                                }
136                            }
137                            return Err(HttpError::RefreshToken(refresh_error));
138                        }
139
140                        _ => {
141                            trace!("Token refresh: Token refresh failed.");
142                            // This isn't necessarily correct, but matches the behaviour when
143                            // implementing OAuth 2.0.
144                            client.broadcast_unknown_token(soft_logout);
145                            return Err(HttpError::RefreshToken(refresh_error));
146                        }
147                    }
148                } else {
149                    trace!("Token refresh: Refresh succeeded, retrying request.");
150                    return Box::pin(client.send_inner(request, config, send_progress)).await;
151                }
152            }
153
154            res
155        })
156    }
157}
158
159/// `IntoFuture` used to send media upload requests. It wraps another
160/// [`SendRequest`], checking its size will be accepted by the homeserver before
161/// uploading.
162#[allow(missing_debug_implementations)]
163pub struct SendMediaUploadRequest {
164    send_request: SendRequest<media::create_content::v3::Request>,
165}
166
167impl SendMediaUploadRequest {
168    pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
169        Self { send_request: request }
170    }
171
172    /// Replace the default `SharedObservable` used for tracking upload
173    /// progress.
174    ///
175    /// Note that any subscribers obtained from
176    /// [`subscribe_to_send_progress`][Self::subscribe_to_send_progress]
177    /// will be invalidated by this.
178    pub fn with_send_progress_observable(
179        mut self,
180        send_progress: SharedObservable<TransmissionProgress>,
181    ) -> Self {
182        self.send_request = self.send_request.with_send_progress_observable(send_progress);
183        self
184    }
185
186    /// Get a subscriber to observe the progress of sending the request
187    /// body.
188    pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
189        self.send_request.send_progress.subscribe()
190    }
191}
192
193impl IntoFuture for SendMediaUploadRequest {
194    type Output = Result<media::create_content::v3::Response, Error>;
195    boxed_into_future!();
196
197    fn into_future(self) -> Self::IntoFuture {
198        let request_length = self.send_request.request.file.len();
199        let client = self.send_request.client.clone();
200        let send_request = self.send_request;
201
202        Box::pin(async move {
203            let max_upload_size = client.load_or_fetch_max_upload_size().await?;
204            let request_length = UInt::new_wrapping(request_length as u64);
205            if request_length > max_upload_size {
206                return Err(Error::Media(MediaError::MediaTooLargeToUpload {
207                    max: max_upload_size,
208                    current: request_length,
209                }));
210            }
211
212            send_request.into_future().await.map_err(Into::into)
213        })
214    }
215}