matrix_sdk_common/
failures_cache.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A TTL cache which can be used to time out repeated operations that might
16//! experience intermittent failures.
17
18use std::{borrow::Borrow, collections::HashMap, hash::Hash, sync::Arc, time::Duration};
19
20use ruma::time::Instant;
21
22use super::locks::RwLock;
23
24const MAX_DELAY: u64 = 15 * 60;
25const MULTIPLIER: u64 = 15;
26
27/// A TTL cache where items get inactive instead of discarded.
28///
29/// The items need to be explicitly removed from the cache. This allows us to
30/// implement exponential backoff based TTL.
31#[derive(Clone, Debug)]
32pub struct FailuresCache<T: Eq + Hash> {
33    inner: Arc<InnerCache<T>>,
34}
35
36#[derive(Debug)]
37struct InnerCache<T: Eq + Hash> {
38    max_delay: Duration,
39    backoff_multiplier: u64,
40    items: RwLock<HashMap<T, FailuresItem>>,
41}
42
43impl<T: Eq + Hash> Default for InnerCache<T> {
44    fn default() -> Self {
45        Self {
46            max_delay: Duration::from_secs(MAX_DELAY),
47            backoff_multiplier: MULTIPLIER,
48            items: Default::default(),
49        }
50    }
51}
52
53#[derive(Debug, Clone, Copy)]
54struct FailuresItem {
55    insertion_time: Instant,
56    duration: Duration,
57
58    /// Number of times that this item has failed after it was first added to
59    /// the cache. (In other words, one less than the total number of
60    /// failures.)
61    failure_count: u8,
62}
63
64impl FailuresItem {
65    /// Has the item expired.
66    fn expired(&self) -> bool {
67        self.insertion_time.elapsed() >= self.duration
68    }
69
70    /// Force the expiry of this item.
71    ///
72    /// This doesn't reset the failure count, but does mark the item as ready
73    /// for immediate retry.
74    fn expire(&mut self) {
75        self.duration = Duration::from_secs(0);
76    }
77}
78
79impl<T> FailuresCache<T>
80where
81    T: Eq + Hash,
82{
83    pub fn new() -> Self {
84        Self { inner: Default::default() }
85    }
86
87    pub fn with_settings(max_delay: Duration, multiplier: u8) -> Self {
88        Self {
89            inner: InnerCache {
90                max_delay,
91                backoff_multiplier: multiplier.into(),
92                items: Default::default(),
93            }
94            .into(),
95        }
96    }
97
98    /// Is the given key non-expired and part of the cache.
99    pub fn contains<Q>(&self, key: &Q) -> bool
100    where
101        T: Borrow<Q>,
102        Q: Hash + Eq + ?Sized,
103    {
104        self.inner.items.read().get(key).is_some_and(|item| !item.expired())
105    }
106
107    /// Get the failure count for a given key.
108    ///
109    /// # Returns
110    ///
111    ///  * `None` if this key is not in the failure cache. (It has never failed,
112    ///    or it has been [`FailuresCache::remove()`]d since the last failure.)
113    ///
114    ///  * `Some(u8)`: the number of times it has failed since it was first
115    ///    added to the failure cache. (In other words, one less than the total
116    ///    number of failures.)
117    pub fn failure_count<Q>(&self, key: &Q) -> Option<u8>
118    where
119        T: Borrow<Q>,
120        Q: Hash + Eq + ?Sized,
121    {
122        self.inner.items.read().get(key).map(|i| i.failure_count)
123    }
124
125    /// This will calculate a duration that determines how long an item is
126    /// considered to be valid while being in the cache.
127    ///
128    /// The returned duration will follow this sequence if the default
129    /// multiplier and `max_delay` values are used, values are in minutes:
130    ///      [0.25, 0.5, 1.0, 2.0, 4.0, 8.0, 15.0]
131    fn calculate_delay(&self, failure_count: u8) -> Duration {
132        let exponential_backoff = 2u64.saturating_pow(failure_count.into());
133        let delay = exponential_backoff.saturating_mul(self.inner.backoff_multiplier);
134
135        Duration::from_secs(delay).clamp(Duration::from_secs(1), self.inner.max_delay)
136    }
137
138    /// Add a single item to the cache.
139    pub fn insert(&self, item: T) {
140        self.extend([item]);
141    }
142
143    /// Extend the cache with the given iterator of items.
144    ///
145    /// Items that are already part of the cache, whether they are expired or
146    /// not, will have their TTL extended using an exponential backoff
147    /// algorithm.
148    pub fn extend(&self, iterator: impl IntoIterator<Item = T>) {
149        let mut lock = self.inner.items.write();
150
151        let now = Instant::now();
152
153        for key in iterator {
154            let failure_count = if let Some(value) = lock.get(&key) {
155                value.failure_count.saturating_add(1)
156            } else {
157                0
158            };
159
160            let delay = self.calculate_delay(failure_count);
161
162            let item = FailuresItem { insertion_time: now, duration: delay, failure_count };
163
164            lock.insert(key, item);
165        }
166    }
167
168    /// Remove the items contained in the iterator from the cache.
169    pub fn remove<'a, I, Q>(&'a self, iterator: I)
170    where
171        I: Iterator<Item = &'a Q>,
172        T: Borrow<Q>,
173        Q: Hash + Eq + 'a + ?Sized,
174    {
175        let mut lock = self.inner.items.write();
176
177        for item in iterator {
178            lock.remove(item);
179        }
180    }
181
182    /// Force the expiry of the given item, if it is present in the cache.
183    ///
184    /// This doesn't reset the failure count, but does mark the item as ready
185    /// for immediate retry.
186    #[doc(hidden)]
187    pub fn expire(&self, item: &T) {
188        self.inner.items.write().get_mut(item).map(FailuresItem::expire);
189    }
190}
191
192impl<T: Eq + Hash> Default for FailuresCache<T> {
193    fn default() -> Self {
194        Self::new()
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use std::time::Duration;
201
202    use proptest::prelude::*;
203
204    use super::FailuresCache;
205
206    #[test]
207    fn failures_cache() {
208        let cache = FailuresCache::new();
209
210        assert!(!cache.contains(&1));
211        cache.extend([1u8].iter());
212        assert!(cache.contains(&1));
213
214        cache.inner.items.write().get_mut(&1).unwrap().duration = Duration::from_secs(0);
215        assert!(!cache.contains(&1));
216
217        cache.remove([1u8].iter());
218        assert!(cache.inner.items.read().get(&1).is_none())
219    }
220
221    #[test]
222    fn failures_cache_timeout() {
223        let cache: FailuresCache<u8> = FailuresCache::new();
224
225        assert_eq!(cache.calculate_delay(0).as_secs(), 15);
226        assert_eq!(cache.calculate_delay(1).as_secs(), 30);
227        assert_eq!(cache.calculate_delay(2).as_secs(), 60);
228        assert_eq!(cache.calculate_delay(3).as_secs(), 120);
229        assert_eq!(cache.calculate_delay(4).as_secs(), 240);
230        assert_eq!(cache.calculate_delay(5).as_secs(), 480);
231        assert_eq!(cache.calculate_delay(6).as_secs(), 900);
232        assert_eq!(cache.calculate_delay(7).as_secs(), 900);
233    }
234
235    proptest! {
236        #[test]
237        fn failures_cache_proptest_timeout(count in 0..10u8) {
238            let cache: FailuresCache<u8> = FailuresCache::new();
239            let delay = cache.calculate_delay(count).as_secs();
240
241            assert!(delay <= 900);
242            assert!(delay >= 15);
243        }
244    }
245}