matrix_sdk_common/
cross_process_lock.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cross-process lock implementation.
16//!
17//! This is a per-process lock that may be used only for very specific use
18//! cases, where multiple processes might concurrently write to the same
19//! database at the same time; this would invalidate store caches, so
20//! that should be done mindfully. Such a lock can be acquired multiple times by
21//! the same process, and it remains active as long as there's at least one user
22//! in a given process.
23//!
24//! The lock is implemented using time-based leases. The lock maintains the lock
25//! identifier (key), who's the current holder (value), and an expiration
26//! timestamp on the side; see also `CryptoStore::try_take_leased_lock` for more
27//! details.
28//!
29//! The lock is initially acquired for a certain period of time (namely, the
30//! duration of a lease, aka `LEASE_DURATION_MS`), and then a “heartbeat” task
31//! renews the lease to extend its duration, every so often (namely, every
32//! `EXTEND_LEASE_EVERY_MS`). Since the Tokio scheduler might be busy, the
33//! extension request should happen way more frequently than the duration of a
34//! lease, in case a deadline is missed. The current values have been chosen to
35//! reflect that, with a ratio of 1:10 as of 2023-06-23.
36//!
37//! Releasing the lock happens naturally, by not renewing a lease. It happens
38//! automatically after the duration of the last lease, at most.
39
40use std::{
41    error::Error,
42    future::Future,
43    sync::{
44        Arc,
45        atomic::{self, AtomicU32},
46    },
47    time::Duration,
48};
49
50use tokio::sync::Mutex;
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54    SendOutsideWasm,
55    executor::{JoinHandle, spawn},
56    sleep::sleep,
57};
58
59/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
60pub trait TryLock {
61    #[cfg(not(target_family = "wasm"))]
62    type LockError: Error + Send + Sync;
63
64    #[cfg(target_family = "wasm")]
65    type LockError: Error;
66
67    /// Try to take a leased lock.
68    ///
69    /// This attempts to take a lock for the given lease duration.
70    ///
71    /// - If we already had the lease, this will extend the lease.
72    /// - If we didn't, but the previous lease has expired, we will acquire the
73    ///   lock.
74    /// - If there was no previous lease, we will acquire the lock.
75    /// - Otherwise, we don't get the lock.
76    ///
77    /// Returns whether taking the lock succeeded.
78    fn try_lock(
79        &self,
80        lease_duration_ms: u32,
81        key: &str,
82        holder: &str,
83    ) -> impl Future<Output = Result<bool, Self::LockError>> + SendOutsideWasm;
84}
85
86/// Small state machine to handle wait times.
87#[derive(Clone, Debug)]
88enum WaitingTime {
89    /// Some time to wait, in milliseconds.
90    Some(u32),
91    /// Stop waiting when seeing this value.
92    Stop,
93}
94
95/// A guard of a cross-process lock.
96///
97/// The lock will be automatically released a short period of time after all the
98/// guards have dropped.
99#[derive(Debug)]
100pub struct CrossProcessLockGuard {
101    num_holders: Arc<AtomicU32>,
102}
103
104impl Drop for CrossProcessLockGuard {
105    fn drop(&mut self) {
106        self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst);
107    }
108}
109
110/// A store-based lock for a `Store`.
111///
112/// See the doc-comment of this module for more information.
113#[derive(Clone, Debug)]
114pub struct CrossProcessLock<L>
115where
116    L: TryLock + Clone + SendOutsideWasm + 'static,
117{
118    /// The locker implementation.
119    ///
120    /// `L` is responsible for trying to take the lock, while
121    /// [`CrossProcessLock`] is responsible to make it cross-process, with the
122    /// retry mechanism, plus guard and so on.
123    locker: L,
124
125    /// Number of holders of the lock in this process.
126    ///
127    /// If greater than 0, this means we've already acquired this lock, in this
128    /// process, and the store lock mustn't be touched.
129    ///
130    /// When the number of holders is decreased to 0, then the lock must be
131    /// released in the store.
132    num_holders: Arc<AtomicU32>,
133
134    /// A mutex to control an attempt to take the lock, to avoid making it
135    /// reentrant.
136    locking_attempt: Arc<Mutex<()>>,
137
138    /// Current renew task spawned by `try_lock_once`.
139    renew_task: Arc<Mutex<Option<JoinHandle<()>>>>,
140
141    /// The key used in the key/value mapping for the lock entry.
142    lock_key: String,
143
144    /// A specific value to identify the lock's holder.
145    lock_holder: String,
146
147    /// Backoff time, in milliseconds.
148    backoff: Arc<Mutex<WaitingTime>>,
149}
150
151/// Amount of time a lease of the lock should last, in milliseconds.
152pub const LEASE_DURATION_MS: u32 = 500;
153
154/// Period of time between two attempts to extend the lease. We'll
155/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
156/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
157/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
158/// that we can miss a deadline without compromising the lock.
159pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
160
161/// Initial backoff, in milliseconds. This is the time we wait the first
162/// time, if taking the lock initially failed.
163const INITIAL_BACKOFF_MS: u32 = 10;
164
165/// Maximal backoff, in milliseconds. This is the maximum amount of time
166/// we'll wait for the lock, *between two attempts*.
167pub const MAX_BACKOFF_MS: u32 = 1000;
168
169impl<L> CrossProcessLock<L>
170where
171    L: TryLock + Clone + SendOutsideWasm + 'static,
172{
173    /// Create a new cross-process lock.
174    ///
175    /// # Parameters
176    ///
177    /// - `lock_key`: key in the key-value store to store the lock's state.
178    /// - `lock_holder`: identify the lock's holder with this given value.
179    pub fn new(locker: L, lock_key: String, lock_holder: String) -> Self {
180        Self {
181            locker,
182            lock_key,
183            lock_holder,
184            backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
185            num_holders: Arc::new(0.into()),
186            locking_attempt: Arc::new(Mutex::new(())),
187            renew_task: Default::default(),
188        }
189    }
190
191    /// Try to lock once, returns whether the lock was obtained or not.
192    #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
193    pub async fn try_lock_once(
194        &self,
195    ) -> Result<Option<CrossProcessLockGuard>, CrossProcessLockError> {
196        // Hold onto the locking attempt mutex for the entire lifetime of this
197        // function, to avoid multiple reentrant calls.
198        let mut _attempt = self.locking_attempt.lock().await;
199
200        // If another thread obtained the lock, make sure to only superficially increase
201        // the number of holders, and carry on.
202        if self.num_holders.load(atomic::Ordering::SeqCst) > 0 {
203            // Note: between the above load and the fetch_add below, another thread may
204            // decrement `num_holders`. That's fine because that means the lock
205            // was taken by at least one thread, and after this call it will be
206            // taken by at least one thread.
207            trace!("We already had the lock, incrementing holder count");
208            self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
209            let guard = CrossProcessLockGuard { num_holders: self.num_holders.clone() };
210            return Ok(Some(guard));
211        }
212
213        let acquired = self
214            .locker
215            .try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder)
216            .await
217            .map_err(|err| CrossProcessLockError::TryLockError(Box::new(err)))?;
218
219        if !acquired {
220            trace!("Couldn't acquire the lock immediately.");
221            return Ok(None);
222        }
223
224        trace!("Acquired the lock, spawning the lease extension task.");
225
226        // This is the first time we've acquired the lock. We're going to spawn the task
227        // that will renew the lease.
228
229        // Clone data to be owned by the task.
230        let this = (*self).clone();
231
232        let mut renew_task = self.renew_task.lock().await;
233
234        // Cancel the previous task, if any. That's safe to do, because:
235        // - either the task was done,
236        // - or it was still running, but taking a lock in the db has to be an atomic
237        //   operation running in a transaction.
238
239        if let Some(_prev) = renew_task.take() {
240            #[cfg(not(target_family = "wasm"))]
241            if !_prev.is_finished() {
242                trace!("aborting the previous renew task");
243                _prev.abort();
244            }
245        }
246
247        // Restart a new one.
248        *renew_task = Some(spawn(async move {
249            loop {
250                {
251                    // First, check if there are still users of this lock.
252                    //
253                    // This is not racy, because:
254                    // - the `locking_attempt` mutex makes sure we don't have unexpected
255                    // interactions with the non-atomic sequence above in `try_lock_once`
256                    // (check > 0, then add 1).
257                    // - other entities holding onto the `num_holders` atomic will only
258                    // decrease it over time.
259
260                    let _guard = this.locking_attempt.lock().await;
261
262                    // If there are no more users, we can quit.
263                    if this.num_holders.load(atomic::Ordering::SeqCst) == 0 {
264                        trace!("exiting the lease extension loop");
265
266                        // Cancel the lease with another 0ms lease.
267                        // If we don't get the lock, that's (weird but) fine.
268                        let fut = this.locker.try_lock(0, &this.lock_key, &this.lock_holder);
269                        let _ = fut.await;
270
271                        // Exit the loop.
272                        break;
273                    }
274                }
275
276                sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
277
278                let fut =
279                    this.locker.try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder);
280
281                if let Err(err) = fut.await {
282                    error!("error when extending lock lease: {err:#}");
283                    // Exit the loop.
284                    break;
285                }
286            }
287        }));
288
289        self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
290
291        let guard = CrossProcessLockGuard { num_holders: self.num_holders.clone() };
292        Ok(Some(guard))
293    }
294
295    /// Attempt to take the lock, with exponential backoff if the lock has
296    /// already been taken before.
297    ///
298    /// The `max_backoff` parameter is the maximum time (in milliseconds) that
299    /// should be waited for, between two attempts. When that time is
300    /// reached a second time, the lock will stop attempting to get the lock
301    /// and will return a timeout error upon locking. If not provided,
302    /// will wait for [`MAX_BACKOFF_MS`].
303    #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
304    pub async fn spin_lock(
305        &self,
306        max_backoff: Option<u32>,
307    ) -> Result<CrossProcessLockGuard, CrossProcessLockError> {
308        let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
309
310        // Note: reads/writes to the backoff are racy across threads in theory, but the
311        // lock in `try_lock_once` should sequentialize it all.
312
313        loop {
314            if let Some(guard) = self.try_lock_once().await? {
315                // Reset backoff before returning, for the next attempt to lock.
316                *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
317                return Ok(guard);
318            }
319
320            // Exponential backoff! Multiply by 2 the time we've waited before, cap it to
321            // max_backoff.
322            let mut backoff = self.backoff.lock().await;
323
324            let wait = match &mut *backoff {
325                WaitingTime::Some(val) => {
326                    let wait = *val;
327                    *val = val.saturating_mul(2);
328                    if *val >= max_backoff {
329                        *backoff = WaitingTime::Stop;
330                    }
331                    wait
332                }
333                WaitingTime::Stop => {
334                    // We've reached the maximum backoff, abandon.
335                    return Err(CrossProcessLockError::LockTimeout);
336                }
337            };
338
339            debug!("Waiting {wait} before re-attempting to take the lock");
340            sleep(Duration::from_millis(wait.into())).await;
341        }
342    }
343
344    /// Returns the value in the database that represents the holder's
345    /// identifier.
346    pub fn lock_holder(&self) -> &str {
347        &self.lock_holder
348    }
349}
350
351/// Error related to the locking API of the store.
352#[derive(Debug, thiserror::Error)]
353pub enum CrossProcessLockError {
354    /// Spent too long waiting for a database lock.
355    #[error("a lock timed out")]
356    LockTimeout,
357
358    #[error(transparent)]
359    #[cfg(not(target_family = "wasm"))]
360    TryLockError(#[from] Box<dyn Error + Send + Sync>),
361
362    #[error(transparent)]
363    #[cfg(target_family = "wasm")]
364    TryLockError(Box<dyn Error>),
365}
366
367#[cfg(test)]
368#[cfg(not(target_family = "wasm"))] // These tests require tokio::time, which is not implemented on wasm.
369mod tests {
370    use std::{
371        collections::HashMap,
372        sync::{Arc, RwLock, atomic},
373        time::Instant,
374    };
375
376    use assert_matches::assert_matches;
377    use matrix_sdk_test_macros::async_test;
378    use tokio::{
379        spawn,
380        time::{Duration, sleep},
381    };
382
383    use super::{
384        CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, EXTEND_LEASE_EVERY_MS,
385        TryLock, memory_store_helper::try_take_leased_lock,
386    };
387
388    #[derive(Clone, Default)]
389    struct TestStore {
390        leases: Arc<RwLock<HashMap<String, (String, Instant)>>>,
391    }
392
393    impl TestStore {
394        fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool {
395            try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
396        }
397    }
398
399    #[derive(Debug, thiserror::Error)]
400    enum DummyError {}
401
402    impl TryLock for TestStore {
403        type LockError = DummyError;
404
405        /// Try to take a lock using the given store.
406        async fn try_lock(
407            &self,
408            lease_duration_ms: u32,
409            key: &str,
410            holder: &str,
411        ) -> Result<bool, Self::LockError> {
412            Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
413        }
414    }
415
416    async fn release_lock(guard: Option<CrossProcessLockGuard>) {
417        drop(guard);
418        sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
419    }
420
421    type TestResult = Result<(), CrossProcessLockError>;
422
423    #[async_test]
424    async fn test_simple_lock_unlock() -> TestResult {
425        let store = TestStore::default();
426        let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned());
427
428        // The lock plain works when used with a single holder.
429        let acquired = lock.try_lock_once().await?;
430        assert!(acquired.is_some());
431        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
432
433        // Releasing works.
434        release_lock(acquired).await;
435        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
436
437        // Spin locking on the same lock always works, assuming no concurrent access.
438        let acquired = lock.spin_lock(None).await.unwrap();
439
440        // Releasing still works.
441        release_lock(Some(acquired)).await;
442        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
443
444        Ok(())
445    }
446
447    #[async_test]
448    async fn test_self_recovery() -> TestResult {
449        let store = TestStore::default();
450        let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
451
452        // When a lock is acquired...
453        let acquired = lock.try_lock_once().await?;
454        assert!(acquired.is_some());
455        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
456
457        // But then forgotten... (note: no need to release the guard)
458        drop(lock);
459
460        // And when rematerializing the lock with the same key/value...
461        let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
462
463        // We still got it.
464        let acquired = lock.try_lock_once().await?;
465        assert!(acquired.is_some());
466        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
467
468        Ok(())
469    }
470
471    #[async_test]
472    async fn test_multiple_holders_same_process() -> TestResult {
473        let store = TestStore::default();
474        let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned());
475
476        // Taking the lock twice...
477        let acquired = lock.try_lock_once().await?;
478        assert!(acquired.is_some());
479
480        let acquired2 = lock.try_lock_once().await?;
481        assert!(acquired2.is_some());
482
483        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2);
484
485        // ...means we can release it twice.
486        release_lock(acquired).await;
487        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
488
489        release_lock(acquired2).await;
490        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
491
492        Ok(())
493    }
494
495    #[async_test]
496    async fn test_multiple_processes() -> TestResult {
497        let store = TestStore::default();
498        let lock1 = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
499        let lock2 = CrossProcessLock::new(store, "key".to_owned(), "second".to_owned());
500
501        // When the first process takes the lock...
502        let acquired1 = lock1.try_lock_once().await?;
503        assert!(acquired1.is_some());
504
505        // The second can't take it immediately.
506        let acquired2 = lock2.try_lock_once().await?;
507        assert!(acquired2.is_none());
508
509        let lock2_clone = lock2.clone();
510        let handle = spawn(async move { lock2_clone.spin_lock(Some(1000)).await });
511
512        sleep(Duration::from_millis(100)).await;
513
514        drop(acquired1);
515
516        // lock2 in the background manages to get the lock at some point.
517        let _acquired2 = handle
518            .await
519            .expect("join handle is properly awaited")
520            .expect("lock was obtained after spin-locking");
521
522        // Now if lock1 tries to get the lock with a small timeout, it will fail.
523        assert_matches!(lock1.spin_lock(Some(200)).await, Err(CrossProcessLockError::LockTimeout));
524
525        Ok(())
526    }
527}
528
529/// Some code that is shared by almost all `MemoryStore` implementations out
530/// there.
531pub mod memory_store_helper {
532    use std::collections::{HashMap, hash_map::Entry};
533
534    use ruma::time::{Duration, Instant};
535
536    pub fn try_take_leased_lock(
537        leases: &mut HashMap<String, (String, Instant)>,
538        lease_duration_ms: u32,
539        key: &str,
540        holder: &str,
541    ) -> bool {
542        let now = Instant::now();
543        let expiration = now + Duration::from_millis(lease_duration_ms.into());
544
545        match leases.entry(key.to_owned()) {
546            // There is an existing holder.
547            Entry::Occupied(mut entry) => {
548                let (current_holder, current_expiration) = entry.get_mut();
549
550                if current_holder == holder {
551                    // We had the lease before, extend it.
552                    *current_expiration = expiration;
553
554                    true
555                } else {
556                    // We didn't have it.
557                    if *current_expiration < now {
558                        // Steal it!
559                        *current_holder = holder.to_owned();
560                        *current_expiration = expiration;
561
562                        true
563                    } else {
564                        // We tried our best.
565                        false
566                    }
567                }
568            }
569
570            // There is no holder, easy.
571            Entry::Vacant(entry) => {
572                entry.insert((
573                    holder.to_owned(),
574                    Instant::now() + Duration::from_millis(lease_duration_ms.into()),
575                ));
576
577                true
578            }
579        }
580    }
581}