matrix_sdk/widget/machine/
pending.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! A wrapper around a hash map that tracks pending requests and makes sure
//! that expired requests are removed.

use std::time::{Duration, Instant};

use indexmap::{map::Entry, IndexMap};
use tracing::warn;
use uuid::Uuid;

/// Configuration of limits for the outgoing request handling.
#[derive(Clone, Debug)]
pub(crate) struct RequestLimits {
    /// Maximum amount of unanswered (pending) requests that the client widget
    /// API is going to process before starting to drop them. This ensures
    /// that a buggy widget cannot force the client machine to consume memory
    /// indefinitely.
    pub(crate) max_pending_requests: usize,
    /// For how long can the unanswered (pending) request stored in a map before
    /// it is dropped. This ensures that requests that are not answered within
    /// a ceratin amount of time, are dropped/cleaned up (considered as failed).
    pub(crate) response_timeout: Duration,
}

/// A wrapper around a hash map that ensures that the request limits
/// are taken into account.
///
/// Expired requests get cleaned up so that the hashmap remains
/// limited to a certain amount of pending requests.
pub(super) struct PendingRequests<T> {
    requests: IndexMap<Uuid, Expirable<T>>,
    limits: RequestLimits,
}

impl<T> PendingRequests<T> {
    pub(super) fn new(limits: RequestLimits) -> Self {
        Self { requests: IndexMap::with_capacity(limits.max_pending_requests), limits }
    }

    /// Inserts a new request into the map.
    ///
    /// Returns `None` if the maximum allowed capacity is reached.
    pub(super) fn insert(&mut self, key: Uuid, value: T) -> Option<&mut T> {
        if self.requests.len() >= self.limits.max_pending_requests {
            return None;
        }

        let Entry::Vacant(entry) = self.requests.entry(key) else {
            panic!("uuid collision");
        };

        let expirable = Expirable::new(value, Instant::now() + self.limits.response_timeout);
        let inserted = entry.insert(expirable);
        Some(&mut inserted.value)
    }

    /// Extracts a request from the map based on its identifier.
    ///
    /// Returns `None` if the value is not present or expired.
    pub(super) fn extract(&mut self, key: &Uuid) -> Result<T, &'static str> {
        let value =
            self.requests.swap_remove(key).ok_or("Received response for an unknown request")?;
        value.value().ok_or("Dropping response for an expired request")
    }

    /// Removes all expired requests from the map.
    pub(super) fn remove_expired(&mut self) {
        self.requests.retain(|id, req| {
            let expired = req.expired();
            if expired {
                warn!(?id, "Dropping response for an expired request");
            }
            !expired
        });
    }
}

struct Expirable<T> {
    value: T,
    expires_at: Instant,
}

impl<T> Expirable<T> {
    fn new(value: T, expires_at: Instant) -> Self {
        Self { value, expires_at }
    }

    fn value(self) -> Option<T> {
        (!self.expired()).then_some(self.value)
    }

    fn expired(&self) -> bool {
        Instant::now() >= self.expires_at
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use uuid::Uuid;

    use super::{PendingRequests, RequestLimits};

    struct Dummy;

    #[test]
    fn insertion_limits_for_pending_requests_work() {
        let mut pending: PendingRequests<Dummy> = PendingRequests::new(RequestLimits {
            max_pending_requests: 1,
            response_timeout: Duration::from_secs(10),
        });

        // First insert is ok.
        let first = Uuid::new_v4();
        assert!(pending.insert(first, Dummy).is_some());
        assert!(!pending.requests.is_empty());

        // Second insert fails - limits is 1 pending request.
        let second = Uuid::new_v4();
        assert!(pending.insert(second, Dummy).is_none());

        // First extract is ok.
        // Second extract fails - it's not in a map.
        assert!(pending.extract(&first).is_ok());
        assert!(pending.extract(&second).is_err());

        // After first is extracted, we have capacity for the second one.
        assert!(pending.insert(second, Dummy).is_some());
        // So extracting it should also work.
        assert!(pending.extract(&second).is_ok());
        // After extraction, we expect that the map is empty.
        assert!(pending.requests.is_empty());
    }

    #[test]
    fn time_limits_for_pending_requests_work() {
        let mut pending: PendingRequests<Dummy> = PendingRequests::new(RequestLimits {
            max_pending_requests: 10,
            response_timeout: Duration::from_secs(1),
        });

        // Insert a request, it's fine, limits are high.
        let key = Uuid::new_v4();
        assert!(pending.insert(key, Dummy).is_some());

        // Wait for 2 seconds, the inserted request should lapse.
        std::thread::sleep(Duration::from_secs(2));
        assert!(pending.extract(&key).is_err());

        // Insert 2 requests. Should be fine, limits are high.
        assert!(pending.insert(Uuid::new_v4(), Dummy).is_some());
        assert!(pending.insert(Uuid::new_v4(), Dummy).is_some());

        // Wait for half a second, remove expired ones (none must be removed).
        // Then, add another one (should also be fine, limits are high). So
        // we should have 3 requests in a hash map.
        std::thread::sleep(Duration::from_millis(500));
        pending.remove_expired();
        let key = Uuid::new_v4();
        assert!(pending.insert(key, Dummy).is_some());
        assert!(pending.requests.len() == 3);

        // Wait for another half a second. First two requests should lapse.
        // But the last one should still be in the map.
        std::thread::sleep(Duration::from_millis(500));
        pending.remove_expired();
        assert!(pending.requests.len() == 1);
        assert!(pending.extract(&key).is_ok());
        assert!(pending.requests.is_empty());
    }
}