matrix_sdk/widget/machine/
pending.rs1use indexmap::{map::Entry, IndexMap};
19use ruma::time::{Duration, Instant};
20use tracing::warn;
21use uuid::Uuid;
22
23#[derive(Clone, Debug)]
25pub(crate) struct RequestLimits {
26 pub(crate) max_pending_requests: usize,
31 pub(crate) response_timeout: Duration,
35}
36
37pub(super) struct PendingRequests<T> {
43 requests: IndexMap<Uuid, Expirable<T>>,
44 limits: RequestLimits,
45}
46
47impl<T> PendingRequests<T> {
48 pub(super) fn new(limits: RequestLimits) -> Self {
49 Self { requests: IndexMap::with_capacity(limits.max_pending_requests), limits }
50 }
51
52 pub(super) fn insert(&mut self, key: Uuid, value: T) -> Option<&mut T> {
56 if self.requests.len() >= self.limits.max_pending_requests {
57 return None;
58 }
59
60 let Entry::Vacant(entry) = self.requests.entry(key) else {
61 panic!("uuid collision");
62 };
63
64 let expirable = Expirable::new(value, Instant::now() + self.limits.response_timeout);
65 let inserted = entry.insert(expirable);
66 Some(&mut inserted.value)
67 }
68
69 pub(super) fn extract(&mut self, key: &Uuid) -> Result<T, &'static str> {
73 let value =
74 self.requests.swap_remove(key).ok_or("Received response for an unknown request")?;
75 value.value().ok_or("Dropping response for an expired request")
76 }
77
78 pub(super) fn remove_expired(&mut self) {
80 self.requests.retain(|id, req| {
81 let expired = req.expired();
82 if expired {
83 warn!(?id, "Dropping response for an expired request");
84 }
85 !expired
86 });
87 }
88}
89
90struct Expirable<T> {
91 value: T,
92 expires_at: Instant,
93}
94
95impl<T> Expirable<T> {
96 fn new(value: T, expires_at: Instant) -> Self {
97 Self { value, expires_at }
98 }
99
100 fn value(self) -> Option<T> {
101 (!self.expired()).then_some(self.value)
102 }
103
104 fn expired(&self) -> bool {
105 Instant::now() >= self.expires_at
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use std::time::Duration;
112
113 use uuid::Uuid;
114
115 use super::{PendingRequests, RequestLimits};
116
117 struct Dummy;
118
119 #[test]
120 fn insertion_limits_for_pending_requests_work() {
121 let mut pending: PendingRequests<Dummy> = PendingRequests::new(RequestLimits {
122 max_pending_requests: 1,
123 response_timeout: Duration::from_secs(10),
124 });
125
126 let first = Uuid::new_v4();
128 assert!(pending.insert(first, Dummy).is_some());
129 assert!(!pending.requests.is_empty());
130
131 let second = Uuid::new_v4();
133 assert!(pending.insert(second, Dummy).is_none());
134
135 assert!(pending.extract(&first).is_ok());
138 assert!(pending.extract(&second).is_err());
139
140 assert!(pending.insert(second, Dummy).is_some());
142 assert!(pending.extract(&second).is_ok());
144 assert!(pending.requests.is_empty());
146 }
147
148 #[test]
149 fn time_limits_for_pending_requests_work() {
150 let mut pending: PendingRequests<Dummy> = PendingRequests::new(RequestLimits {
151 max_pending_requests: 10,
152 response_timeout: Duration::from_secs(1),
153 });
154
155 let key = Uuid::new_v4();
157 assert!(pending.insert(key, Dummy).is_some());
158
159 std::thread::sleep(Duration::from_secs(2));
161 assert!(pending.extract(&key).is_err());
162
163 assert!(pending.insert(Uuid::new_v4(), Dummy).is_some());
165 assert!(pending.insert(Uuid::new_v4(), Dummy).is_some());
166
167 std::thread::sleep(Duration::from_millis(500));
171 pending.remove_expired();
172 let key = Uuid::new_v4();
173 assert!(pending.insert(key, Dummy).is_some());
174 assert!(pending.requests.len() == 3);
175
176 std::thread::sleep(Duration::from_millis(500));
179 pending.remove_expired();
180 assert!(pending.requests.len() == 1);
181 assert!(pending.extract(&key).is_ok());
182 assert!(pending.requests.is_empty());
183 }
184}