matrix_sdk_common/
store_locks.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Collection of small helpers that implement store-based locks.
16//!
17//! This is a per-process lock that may be used only for very specific use
18//! cases, where multiple processes might concurrently write to the same
19//! database at the same time; this would invalidate store caches, so
20//! that should be done mindfully. Such a lock can be acquired multiple times by
21//! the same process, and it remains active as long as there's at least one user
22//! in a given process.
23//!
24//! The lock is implemented using time-based leases to values inserted in a
25//! store. The store maintains the lock identifier (key), who's the
26//! current holder (value), and an expiration timestamp on the side; see also
27//! `CryptoStore::try_take_leased_lock` for more details.
28//!
29//! The lock is initially acquired for a certain period of time (namely, the
30//! duration of a lease, aka `LEASE_DURATION_MS`), and then a "heartbeat" task
31//! renews the lease to extend its duration, every so often (namely, every
32//! `EXTEND_LEASE_EVERY_MS`). Since the tokio scheduler might be busy, the
33//! extension request should happen way more frequently than the duration of a
34//! lease, in case a deadline is missed. The current values have been chosen to
35//! reflect that, with a ratio of 1:10 as of 2023-06-23.
36//!
37//! Releasing the lock happens naturally, by not renewing a lease. It happens
38//! automatically after the duration of the last lease, at most.
39
40use std::{
41    error::Error,
42    future::Future,
43    sync::{
44        atomic::{self, AtomicU32},
45        Arc,
46    },
47    time::Duration,
48};
49
50use tokio::sync::Mutex;
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54    executor::{spawn, JoinHandle},
55    sleep::sleep,
56    SendOutsideWasm,
57};
58
59/// Backing store for a cross-process lock.
60pub trait BackingStore {
61    #[cfg(not(target_family = "wasm"))]
62    type LockError: Error + Send + Sync;
63
64    #[cfg(target_family = "wasm")]
65    type LockError: Error;
66
67    /// Try to take a lock using the given store.
68    fn try_lock(
69        &self,
70        lease_duration_ms: u32,
71        key: &str,
72        holder: &str,
73    ) -> impl Future<Output = Result<bool, Self::LockError>> + SendOutsideWasm;
74}
75
76/// Small state machine to handle wait times.
77#[derive(Clone, Debug)]
78enum WaitingTime {
79    /// Some time to wait, in milliseconds.
80    Some(u32),
81    /// Stop waiting when seeing this value.
82    Stop,
83}
84
85/// A guard on the store lock.
86///
87/// The lock will be automatically released a short period of time after all the
88/// guards have dropped.
89#[derive(Debug)]
90pub struct CrossProcessStoreLockGuard {
91    num_holders: Arc<AtomicU32>,
92}
93
94impl Drop for CrossProcessStoreLockGuard {
95    fn drop(&mut self) {
96        self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst);
97    }
98}
99
100/// A store-based lock for a `Store`.
101///
102/// See the doc-comment of this module for more information.
103#[derive(Clone, Debug)]
104pub struct CrossProcessStoreLock<S: BackingStore + Clone + SendOutsideWasm + 'static> {
105    /// The store we're using to lock.
106    store: S,
107
108    /// Number of holders of the lock in this process.
109    ///
110    /// If greater than 0, this means we've already acquired this lock, in this
111    /// process, and the store lock mustn't be touched.
112    ///
113    /// When the number of holders is decreased to 0, then the lock must be
114    /// released in the store.
115    num_holders: Arc<AtomicU32>,
116
117    /// A mutex to control an attempt to take the lock, to avoid making it
118    /// reentrant.
119    locking_attempt: Arc<Mutex<()>>,
120
121    /// Current renew task spawned by `try_lock_once`.
122    renew_task: Arc<Mutex<Option<JoinHandle<()>>>>,
123
124    /// The key used in the key/value mapping for the lock entry.
125    lock_key: String,
126
127    /// A specific value to identify the lock's holder.
128    lock_holder: String,
129
130    /// Backoff time, in milliseconds.
131    backoff: Arc<Mutex<WaitingTime>>,
132}
133
134/// Amount of time a lease of the lock should last, in milliseconds.
135pub const LEASE_DURATION_MS: u32 = 500;
136
137/// Period of time between two attempts to extend the lease. We'll
138/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
139/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
140/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
141/// that we can miss a deadline without compromising the lock.
142pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
143
144/// Initial backoff, in milliseconds. This is the time we wait the first
145/// time, if taking the lock initially failed.
146const INITIAL_BACKOFF_MS: u32 = 10;
147
148/// Maximal backoff, in milliseconds. This is the maximum amount of time
149/// we'll wait for the lock, *between two attempts*.
150pub const MAX_BACKOFF_MS: u32 = 1000;
151
152impl<S: BackingStore + Clone + SendOutsideWasm + 'static> CrossProcessStoreLock<S> {
153    /// Create a new store-based lock implemented as a value in the store.
154    ///
155    /// # Parameters
156    ///
157    /// - `lock_key`: key in the key-value store to store the lock's state.
158    /// - `lock_holder`: identify the lock's holder with this given value.
159    pub fn new(store: S, lock_key: String, lock_holder: String) -> Self {
160        Self {
161            store,
162            lock_key,
163            lock_holder,
164            backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
165            num_holders: Arc::new(0.into()),
166            locking_attempt: Arc::new(Mutex::new(())),
167            renew_task: Default::default(),
168        }
169    }
170
171    /// Try to lock once, returns whether the lock was obtained or not.
172    #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
173    pub async fn try_lock_once(
174        &self,
175    ) -> Result<Option<CrossProcessStoreLockGuard>, LockStoreError> {
176        // Hold onto the locking attempt mutex for the entire lifetime of this
177        // function, to avoid multiple reentrant calls.
178        let mut _attempt = self.locking_attempt.lock().await;
179
180        // If another thread obtained the lock, make sure to only superficially increase
181        // the number of holders, and carry on.
182        if self.num_holders.load(atomic::Ordering::SeqCst) > 0 {
183            // Note: between the above load and the fetch_add below, another thread may
184            // decrement `num_holders`. That's fine because that means the lock
185            // was taken by at least one thread, and after this call it will be
186            // taken by at least one thread.
187            trace!("We already had the lock, incrementing holder count");
188            self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
189            let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() };
190            return Ok(Some(guard));
191        }
192
193        let acquired = self
194            .store
195            .try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder)
196            .await
197            .map_err(|err| {
198                #[cfg(not(target_family = "wasm"))]
199                {
200                    LockStoreError::BackingStoreError(Box::new(err))
201                }
202                #[cfg(target_family = "wasm")]
203                {
204                    LockStoreError::BackingStoreError(Box::new(err))
205                }
206            })?;
207
208        if !acquired {
209            trace!("Couldn't acquire the lock immediately.");
210            return Ok(None);
211        }
212
213        trace!("Acquired the lock, spawning the lease extension task.");
214
215        // This is the first time we've acquired the lock. We're going to spawn the task
216        // that will renew the lease.
217
218        // Clone data to be owned by the task.
219        let this = (*self).clone();
220
221        let mut renew_task = self.renew_task.lock().await;
222
223        // Cancel the previous task, if any. That's safe to do, because:
224        // - either the task was done,
225        // - or it was still running, but taking a lock in the db has to be an atomic
226        //   operation running in a transaction.
227
228        if let Some(_prev) = renew_task.take() {
229            #[cfg(not(target_family = "wasm"))]
230            if !_prev.is_finished() {
231                trace!("aborting the previous renew task");
232                _prev.abort();
233            }
234        }
235
236        // Restart a new one.
237        *renew_task = Some(spawn(async move {
238            loop {
239                {
240                    // First, check if there are still users of this lock.
241                    //
242                    // This is not racy, because:
243                    // - the `locking_attempt` mutex makes sure we don't have unexpected
244                    // interactions with the non-atomic sequence above in `try_lock_once`
245                    // (check > 0, then add 1).
246                    // - other entities holding onto the `num_holders` atomic will only
247                    // decrease it over time.
248
249                    let _guard = this.locking_attempt.lock().await;
250
251                    // If there are no more users, we can quit.
252                    if this.num_holders.load(atomic::Ordering::SeqCst) == 0 {
253                        trace!("exiting the lease extension loop");
254
255                        // Cancel the lease with another 0ms lease.
256                        // If we don't get the lock, that's (weird but) fine.
257                        let fut = this.store.try_lock(0, &this.lock_key, &this.lock_holder);
258                        let _ = fut.await;
259
260                        // Exit the loop.
261                        break;
262                    }
263                }
264
265                sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
266
267                let fut = this.store.try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder);
268                if let Err(err) = fut.await {
269                    error!("error when extending lock lease: {err:#}");
270                    // Exit the loop.
271                    break;
272                }
273            }
274        }));
275
276        self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
277
278        let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() };
279        Ok(Some(guard))
280    }
281
282    /// Attempt to take the lock, with exponential backoff if the lock has
283    /// already been taken before.
284    ///
285    /// The `max_backoff` parameter is the maximum time (in milliseconds) that
286    /// should be waited for, between two attempts. When that time is
287    /// reached a second time, the lock will stop attempting to get the lock
288    /// and will return a timeout error upon locking. If not provided,
289    /// will wait for [`MAX_BACKOFF_MS`].
290    #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
291    pub async fn spin_lock(
292        &self,
293        max_backoff: Option<u32>,
294    ) -> Result<CrossProcessStoreLockGuard, LockStoreError> {
295        let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
296
297        // Note: reads/writes to the backoff are racy across threads in theory, but the
298        // lock in `try_lock_once` should sequentialize it all.
299
300        loop {
301            if let Some(guard) = self.try_lock_once().await? {
302                // Reset backoff before returning, for the next attempt to lock.
303                *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
304                return Ok(guard);
305            }
306
307            // Exponential backoff! Multiply by 2 the time we've waited before, cap it to
308            // max_backoff.
309            let mut backoff = self.backoff.lock().await;
310
311            let wait = match &mut *backoff {
312                WaitingTime::Some(ref mut val) => {
313                    let wait = *val;
314                    *val = val.saturating_mul(2);
315                    if *val >= max_backoff {
316                        *backoff = WaitingTime::Stop;
317                    }
318                    wait
319                }
320                WaitingTime::Stop => {
321                    // We've reached the maximum backoff, abandon.
322                    return Err(LockStoreError::LockTimeout);
323                }
324            };
325
326            debug!("Waiting {wait} before re-attempting to take the lock");
327            sleep(Duration::from_millis(wait.into())).await;
328        }
329    }
330
331    /// Returns the value in the database that represents the holder's
332    /// identifier.
333    pub fn lock_holder(&self) -> &str {
334        &self.lock_holder
335    }
336}
337
338/// Error related to the locking API of the store.
339#[derive(Debug, thiserror::Error)]
340pub enum LockStoreError {
341    /// Spent too long waiting for a database lock.
342    #[error("a lock timed out")]
343    LockTimeout,
344
345    #[error(transparent)]
346    #[cfg(not(target_family = "wasm"))]
347    BackingStoreError(#[from] Box<dyn Error + Send + Sync>),
348
349    #[error(transparent)]
350    #[cfg(target_family = "wasm")]
351    BackingStoreError(Box<dyn Error>),
352}
353
354#[cfg(test)]
355#[cfg(not(target_family = "wasm"))] // These tests require tokio::time, which is not implemented on wasm.
356mod tests {
357    use std::{
358        collections::HashMap,
359        sync::{atomic, Arc, RwLock},
360        time::Instant,
361    };
362
363    use assert_matches::assert_matches;
364    use matrix_sdk_test_macros::async_test;
365    use tokio::{
366        spawn,
367        time::{sleep, Duration},
368    };
369
370    use super::{
371        memory_store_helper::try_take_leased_lock, BackingStore, CrossProcessStoreLock,
372        CrossProcessStoreLockGuard, LockStoreError, EXTEND_LEASE_EVERY_MS,
373    };
374
375    #[derive(Clone, Default)]
376    struct TestStore {
377        leases: Arc<RwLock<HashMap<String, (String, Instant)>>>,
378    }
379
380    impl TestStore {
381        fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool {
382            try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
383        }
384    }
385
386    #[derive(Debug, thiserror::Error)]
387    enum DummyError {}
388
389    impl BackingStore for TestStore {
390        type LockError = DummyError;
391
392        /// Try to take a lock using the given store.
393        async fn try_lock(
394            &self,
395            lease_duration_ms: u32,
396            key: &str,
397            holder: &str,
398        ) -> Result<bool, Self::LockError> {
399            Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
400        }
401    }
402
403    async fn release_lock(guard: Option<CrossProcessStoreLockGuard>) {
404        drop(guard);
405        sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
406    }
407
408    type TestResult = Result<(), LockStoreError>;
409
410    #[async_test]
411    async fn test_simple_lock_unlock() -> TestResult {
412        let store = TestStore::default();
413        let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned());
414
415        // The lock plain works when used with a single holder.
416        let acquired = lock.try_lock_once().await?;
417        assert!(acquired.is_some());
418        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
419
420        // Releasing works.
421        release_lock(acquired).await;
422        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
423
424        // Spin locking on the same lock always works, assuming no concurrent access.
425        let acquired = lock.spin_lock(None).await.unwrap();
426
427        // Releasing still works.
428        release_lock(Some(acquired)).await;
429        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
430
431        Ok(())
432    }
433
434    #[async_test]
435    async fn test_self_recovery() -> TestResult {
436        let store = TestStore::default();
437        let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
438
439        // When a lock is acquired...
440        let acquired = lock.try_lock_once().await?;
441        assert!(acquired.is_some());
442        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
443
444        // But then forgotten... (note: no need to release the guard)
445        drop(lock);
446
447        // And when rematerializing the lock with the same key/value...
448        let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
449
450        // We still got it.
451        let acquired = lock.try_lock_once().await?;
452        assert!(acquired.is_some());
453        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
454
455        Ok(())
456    }
457
458    #[async_test]
459    async fn test_multiple_holders_same_process() -> TestResult {
460        let store = TestStore::default();
461        let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned());
462
463        // Taking the lock twice...
464        let acquired = lock.try_lock_once().await?;
465        assert!(acquired.is_some());
466
467        let acquired2 = lock.try_lock_once().await?;
468        assert!(acquired2.is_some());
469
470        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2);
471
472        // ...means we can release it twice.
473        release_lock(acquired).await;
474        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
475
476        release_lock(acquired2).await;
477        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
478
479        Ok(())
480    }
481
482    #[async_test]
483    async fn test_multiple_processes() -> TestResult {
484        let store = TestStore::default();
485        let lock1 = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
486        let lock2 = CrossProcessStoreLock::new(store, "key".to_owned(), "second".to_owned());
487
488        // When the first process takes the lock...
489        let acquired1 = lock1.try_lock_once().await?;
490        assert!(acquired1.is_some());
491
492        // The second can't take it immediately.
493        let acquired2 = lock2.try_lock_once().await?;
494        assert!(acquired2.is_none());
495
496        let lock2_clone = lock2.clone();
497        let handle = spawn(async move { lock2_clone.spin_lock(Some(1000)).await });
498
499        sleep(Duration::from_millis(100)).await;
500
501        drop(acquired1);
502
503        // lock2 in the background manages to get the lock at some point.
504        let _acquired2 = handle
505            .await
506            .expect("join handle is properly awaited")
507            .expect("lock was obtained after spin-locking");
508
509        // Now if lock1 tries to get the lock with a small timeout, it will fail.
510        assert_matches!(lock1.spin_lock(Some(200)).await, Err(LockStoreError::LockTimeout));
511
512        Ok(())
513    }
514}
515
516/// Some code that is shared by almost all `MemoryStore` implementations out
517/// there.
518pub mod memory_store_helper {
519    use std::collections::{hash_map::Entry, HashMap};
520
521    use ruma::time::{Duration, Instant};
522
523    pub fn try_take_leased_lock(
524        leases: &mut HashMap<String, (String, Instant)>,
525        lease_duration_ms: u32,
526        key: &str,
527        holder: &str,
528    ) -> bool {
529        let now = Instant::now();
530        let expiration = now + Duration::from_millis(lease_duration_ms.into());
531
532        match leases.entry(key.to_owned()) {
533            // There is an existing holder.
534            Entry::Occupied(mut entry) => {
535                let (current_holder, current_expiration) = entry.get_mut();
536
537                if current_holder == holder {
538                    // We had the lease before, extend it.
539                    *current_expiration = expiration;
540
541                    true
542                } else {
543                    // We didn't have it.
544                    if *current_expiration < now {
545                        // Steal it!
546                        *current_holder = holder.to_owned();
547                        *current_expiration = expiration;
548
549                        true
550                    } else {
551                        // We tried our best.
552                        false
553                    }
554                }
555            }
556
557            // There is no holder, easy.
558            Entry::Vacant(entry) => {
559                entry.insert((
560                    holder.to_owned(),
561                    Instant::now() + Duration::from_millis(lease_duration_ms.into()),
562                ));
563
564                true
565            }
566        }
567    }
568}