matrix_sdk_common/
cross_process_lock.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cross-process lock implementation.
16//!
17//! This is a per-process lock that may be used only for very specific use
18//! cases, where multiple processes might concurrently write to the same
19//! database at the same time; this would invalidate store caches, so
20//! that should be done mindfully. Such a lock can be obtained multiple times by
21//! the same process, and it remains active as long as there's at least one user
22//! in a given process.
23//!
24//! The lock is implemented using time-based leases. The lock maintains the lock
25//! identifier (key), who's the current holder (value), and an expiration
26//! timestamp on the side; see also `CryptoStore::try_take_leased_lock` for more
27//! details.
28//!
29//! The lock is initially obtained for a certain period of time (namely, the
30//! duration of a lease, aka `LEASE_DURATION_MS`), and then a “heartbeat” task
31//! renews the lease to extend its duration, every so often (namely, every
32//! `EXTEND_LEASE_EVERY_MS`). Since the Tokio scheduler might be busy, the
33//! extension request should happen way more frequently than the duration of a
34//! lease, in case a deadline is missed. The current values have been chosen to
35//! reflect that, with a ratio of 1:10 as of 2023-06-23.
36//!
37//! Releasing the lock happens naturally, by not renewing a lease. It happens
38//! automatically after the duration of the last lease, at most.
39
40use std::{
41    error::Error,
42    future::Future,
43    sync::{
44        Arc,
45        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
46    },
47    time::Duration,
48};
49
50use tokio::sync::Mutex;
51use tracing::{debug, error, instrument, trace, warn};
52
53use crate::{
54    SendOutsideWasm,
55    executor::{JoinHandle, spawn},
56    sleep::sleep,
57};
58
59/// A lock generation is an integer incremented each time the lock is taken by
60/// a different holder.
61///
62/// This is used to know if a lock has been dirtied.
63pub type CrossProcessLockGeneration = u64;
64
65/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
66pub trait TryLock {
67    #[cfg(not(target_family = "wasm"))]
68    type LockError: Error + Send + Sync;
69
70    #[cfg(target_family = "wasm")]
71    type LockError: Error;
72
73    /// Try to take a leased lock.
74    ///
75    /// This attempts to take a lock for the given lease duration.
76    ///
77    /// - If we already had the lease, this will extend the lease.
78    /// - If we didn't, but the previous lease has expired, we will obtain the
79    ///   lock.
80    /// - If there was no previous lease, we will obtain the lock.
81    /// - Otherwise, we don't get the lock.
82    ///
83    /// Returns `Some(_)` to indicate the lock succeeded, `None` otherwise. The
84    /// cross-process lock generation must be compared to the generation before
85    /// the call to see if the lock has been dirtied: a different generation
86    /// means the lock has been dirtied, i.e. taken by a different holder in
87    /// the meantime.
88    fn try_lock(
89        &self,
90        lease_duration_ms: u32,
91        key: &str,
92        holder: &str,
93    ) -> impl Future<Output = Result<Option<CrossProcessLockGeneration>, Self::LockError>>
94    + SendOutsideWasm;
95}
96
97/// Small state machine to handle wait times.
98#[derive(Clone, Debug)]
99enum WaitingTime {
100    /// Some time to wait, in milliseconds.
101    Some(u32),
102    /// Stop waiting when seeing this value.
103    Stop,
104}
105
106/// A guard of a cross-process lock.
107///
108/// The lock will be automatically released a short period of time after all the
109/// guards have dropped.
110#[derive(Clone, Debug)]
111#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
112pub struct CrossProcessLockGuard {
113    /// A clone of [`CrossProcessLock::num_holders`].
114    num_holders: Arc<AtomicU32>,
115
116    /// A clone of [`CrossProcessLock::is_dirty`].
117    is_dirty: Arc<AtomicBool>,
118}
119
120impl CrossProcessLockGuard {
121    fn new(num_holders: Arc<AtomicU32>, is_dirty: Arc<AtomicBool>) -> Self {
122        Self { num_holders, is_dirty }
123    }
124
125    /// Determine whether the cross-process lock associated to this guard is
126    /// dirty.
127    ///
128    /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
129    /// of _dirty_.
130    pub fn is_dirty(&self) -> bool {
131        self.is_dirty.load(Ordering::SeqCst)
132    }
133
134    /// Clear the dirty state from the cross-process lock associated to this
135    /// guard.
136    ///
137    /// If the cross-process lock is dirtied, it will remain dirtied until
138    /// this method is called. This allows recovering from a dirty state and
139    /// marking that it has recovered.
140    pub fn clear_dirty(&self) {
141        self.is_dirty.store(false, Ordering::SeqCst);
142    }
143}
144
145impl Drop for CrossProcessLockGuard {
146    fn drop(&mut self) {
147        self.num_holders.fetch_sub(1, Ordering::SeqCst);
148    }
149}
150
151/// A cross-process lock implementation.
152///
153/// See the doc-comment of this module for more information.
154#[derive(Clone, Debug)]
155pub struct CrossProcessLock<L>
156where
157    L: TryLock + Clone + SendOutsideWasm + 'static,
158{
159    /// The locker implementation.
160    ///
161    /// `L` is responsible for trying to take the lock, while
162    /// [`CrossProcessLock`] is responsible to make it cross-process, with the
163    /// retry mechanism, plus guard and so on.
164    locker: L,
165
166    /// Number of holders of the lock in this process.
167    ///
168    /// If greater than 0, this means we've already obtained this lock, in this
169    /// process, and the store lock mustn't be touched.
170    ///
171    /// When the number of holders is decreased to 0, then the lock must be
172    /// released in the store.
173    num_holders: Arc<AtomicU32>,
174
175    /// A mutex to control an attempt to take the lock, to avoid making it
176    /// reentrant.
177    locking_attempt: Arc<Mutex<()>>,
178
179    /// Current renew task spawned by `try_lock_once`.
180    renew_task: Arc<Mutex<Option<JoinHandle<()>>>>,
181
182    /// The key used in the key/value mapping for the lock entry.
183    lock_key: String,
184
185    /// A specific value to identify the lock's holder.
186    lock_holder: String,
187
188    /// Backoff time, in milliseconds.
189    backoff: Arc<Mutex<WaitingTime>>,
190
191    /// This lock generation.
192    generation: Arc<AtomicU64>,
193
194    /// Whether the lock has been dirtied.
195    ///
196    /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
197    /// of _dirty_.
198    is_dirty: Arc<AtomicBool>,
199}
200
201/// Amount of time a lease of the lock should last, in milliseconds.
202pub const LEASE_DURATION_MS: u32 = 500;
203
204/// Period of time between two attempts to extend the lease. We'll
205/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
206/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
207/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
208/// that we can miss a deadline without compromising the lock.
209pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
210
211/// Initial backoff, in milliseconds. This is the time we wait the first
212/// time, if taking the lock initially failed.
213const INITIAL_BACKOFF_MS: u32 = 10;
214
215/// Maximal backoff, in milliseconds. This is the maximum amount of time
216/// we'll wait for the lock, *between two attempts*.
217pub const MAX_BACKOFF_MS: u32 = 1000;
218
219/// Sentinel value representing the absence of a lock generation value.
220///
221/// When the lock is created, it has no generation. Once locked, it receives its
222/// first generation from [`TryLock::try_lock`]. Subsequent lockings may
223/// generate new lock generation. The generation is incremented by 1 every time.
224///
225/// The first generation is defined by [`FIRST_CROSS_PROCESS_LOCK_GENERATION`].
226pub const NO_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 0;
227
228/// Describe the first lock generation value (see
229/// [`CrossProcessLockGeneration`]).
230pub const FIRST_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 1;
231
232impl<L> CrossProcessLock<L>
233where
234    L: TryLock + Clone + SendOutsideWasm + 'static,
235{
236    /// Create a new cross-process lock.
237    ///
238    /// # Parameters
239    ///
240    /// - `lock_key`: key in the key-value store to store the lock's state.
241    /// - `lock_holder`: identify the lock's holder with this given value.
242    pub fn new(locker: L, lock_key: String, lock_holder: String) -> Self {
243        Self {
244            locker,
245            lock_key,
246            lock_holder,
247            backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
248            num_holders: Arc::new(0.into()),
249            locking_attempt: Arc::new(Mutex::new(())),
250            renew_task: Default::default(),
251            generation: Arc::new(AtomicU64::new(NO_CROSS_PROCESS_LOCK_GENERATION)),
252            is_dirty: Arc::new(AtomicBool::new(false)),
253        }
254    }
255
256    /// Determine whether the cross-process lock is dirty.
257    ///
258    /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
259    /// of _dirty_.
260    pub fn is_dirty(&self) -> bool {
261        self.is_dirty.load(Ordering::SeqCst)
262    }
263
264    /// Clear the dirty state from this cross-process lock.
265    ///
266    /// If the cross-process lock is dirtied, it will remain dirtied until
267    /// this method is called. This allows recovering from a dirty state and
268    /// marking that it has recovered.
269    pub fn clear_dirty(&self) {
270        self.is_dirty.store(false, Ordering::SeqCst);
271    }
272
273    /// Try to lock once, returns whether the lock was obtained or not.
274    ///
275    /// The lock can be obtained but it can be dirty. In all cases, the renew
276    /// task will run in the background.
277    #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
278    pub async fn try_lock_once(
279        &self,
280    ) -> Result<Result<CrossProcessLockState, CrossProcessLockUnobtained>, L::LockError> {
281        // Hold onto the locking attempt mutex for the entire lifetime of this
282        // function, to avoid multiple reentrant calls.
283        let mut _attempt = self.locking_attempt.lock().await;
284
285        // If another thread obtained the lock, make sure to only superficially increase
286        // the number of holders, and carry on.
287        if self.num_holders.load(Ordering::SeqCst) > 0 {
288            // Note: between the above load and the fetch_add below, another thread may
289            // decrement `num_holders`. That's fine because that means the lock
290            // was taken by at least one thread, and after this call it will be
291            // taken by at least one thread.
292            trace!("We already had the lock, incrementing holder count");
293
294            self.num_holders.fetch_add(1, Ordering::SeqCst);
295
296            return Ok(Ok(CrossProcessLockState::Clean(CrossProcessLockGuard::new(
297                self.num_holders.clone(),
298                self.is_dirty.clone(),
299            ))));
300        }
301
302        if let Some(new_generation) =
303            self.locker.try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder).await?
304        {
305            match self.generation.swap(new_generation, Ordering::SeqCst) {
306                // If there was no lock generation, it means this is the first time the lock is
307                // obtained. It cannot be dirty.
308                NO_CROSS_PROCESS_LOCK_GENERATION => {
309                    trace!(?new_generation, "Setting the lock generation for the first time");
310                }
311
312                // This was NOT the same generation, the lock has been dirtied!
313                previous_generation if previous_generation != new_generation => {
314                    warn!(
315                        ?previous_generation,
316                        ?new_generation,
317                        "The lock has been obtained, but it's been dirtied!"
318                    );
319                    self.is_dirty.store(true, Ordering::SeqCst);
320                }
321
322                // This was the same generation, no problem.
323                _ => {
324                    trace!("Same lock generation; no problem");
325                }
326            }
327
328            trace!("Lock obtained!");
329        } else {
330            trace!("Couldn't obtain the lock immediately.");
331            return Ok(Err(CrossProcessLockUnobtained::Busy));
332        }
333
334        trace!("Obtained the lock, spawning the lease extension task.");
335
336        // This is the first time we've obtaind the lock. We're going to spawn the task
337        // that will renew the lease.
338
339        // Clone data to be owned by the task.
340        let this = (*self).clone();
341
342        let mut renew_task = self.renew_task.lock().await;
343
344        // Cancel the previous task, if any. That's safe to do, because:
345        // - either the task was done,
346        // - or it was still running, but taking a lock in the db has to be an atomic
347        //   operation running in a transaction.
348
349        if let Some(_prev) = renew_task.take() {
350            #[cfg(not(target_family = "wasm"))]
351            if !_prev.is_finished() {
352                trace!("aborting the previous renew task");
353                _prev.abort();
354            }
355        }
356
357        // Restart a new one.
358        *renew_task = Some(spawn(async move {
359            loop {
360                {
361                    // First, check if there are still users of this lock.
362                    //
363                    // This is not racy, because:
364                    // - the `locking_attempt` mutex makes sure we don't have unexpected
365                    // interactions with the non-atomic sequence above in `try_lock_once`
366                    // (check > 0, then add 1).
367                    // - other entities holding onto the `num_holders` atomic will only
368                    // decrease it over time.
369
370                    let _guard = this.locking_attempt.lock().await;
371
372                    // If there are no more users, we can quit.
373                    if this.num_holders.load(Ordering::SeqCst) == 0 {
374                        trace!("exiting the lease extension loop");
375
376                        // Cancel the lease with another 0ms lease.
377                        // If we don't get the lock, that's (weird but) fine.
378                        let fut = this.locker.try_lock(0, &this.lock_key, &this.lock_holder);
379                        let _ = fut.await;
380
381                        // Exit the loop.
382                        break;
383                    }
384                }
385
386                sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
387
388                match this
389                    .locker
390                    .try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder)
391                    .await
392                {
393                    Ok(Some(_generation)) => {
394                        // It's impossible that the generation can be different
395                        // from the previous generation.
396                        //
397                        // As long as the task runs, the lock is renewed, so the
398                        // generation remains the same. If the lock is not
399                        // taken, it's because the lease has expired, which is
400                        // represented by the `Ok(None)` value, and the task
401                        // must stop.
402                    }
403
404                    Ok(None) => {
405                        error!("Failed to renew the lock lease: the lock could not be obtained");
406
407                        // Exit the loop.
408                        break;
409                    }
410
411                    Err(err) => {
412                        error!("Error when extending the lock lease: {err:#}");
413
414                        // Exit the loop.
415                        break;
416                    }
417                }
418            }
419        }));
420
421        self.num_holders.fetch_add(1, Ordering::SeqCst);
422
423        let guard = CrossProcessLockGuard::new(self.num_holders.clone(), self.is_dirty.clone());
424
425        Ok(Ok(if self.is_dirty() {
426            CrossProcessLockState::Dirty(guard)
427        } else {
428            CrossProcessLockState::Clean(guard)
429        }))
430    }
431
432    /// Attempt to take the lock, with exponential backoff if the lock has
433    /// already been taken before.
434    ///
435    /// The `max_backoff` parameter is the maximum time (in milliseconds) that
436    /// should be waited for, between two attempts. When that time is
437    /// reached a second time, the lock will stop attempting to get the lock
438    /// and will return a timeout error upon locking. If not provided,
439    /// will wait for [`MAX_BACKOFF_MS`].
440    #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
441    pub async fn spin_lock(
442        &self,
443        max_backoff: Option<u32>,
444    ) -> Result<Result<CrossProcessLockState, CrossProcessLockUnobtained>, L::LockError> {
445        let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
446
447        // Note: reads/writes to the backoff are racy across threads in theory, but the
448        // lock in `try_lock_once` should sequentialize it all.
449
450        loop {
451            let lock_result = self.try_lock_once().await?;
452
453            if lock_result.is_ok() {
454                // Reset backoff before returning, for the next attempt to lock.
455                *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
456
457                return Ok(lock_result);
458            }
459
460            // Exponential backoff! Multiply by 2 the time we've waited before, cap it to
461            // max_backoff.
462            let mut backoff = self.backoff.lock().await;
463
464            let wait = match &mut *backoff {
465                WaitingTime::Some(val) => {
466                    let wait = *val;
467                    *val = val.saturating_mul(2);
468                    if *val >= max_backoff {
469                        *backoff = WaitingTime::Stop;
470                    }
471                    wait
472                }
473                WaitingTime::Stop => {
474                    // We've reached the maximum backoff, abandon.
475                    return Ok(Err(CrossProcessLockUnobtained::TimedOut));
476                }
477            };
478
479            debug!("Waiting {wait} before re-attempting to take the lock");
480            sleep(Duration::from_millis(wait.into())).await;
481        }
482    }
483
484    /// Returns the value in the database that represents the holder's
485    /// identifier.
486    pub fn lock_holder(&self) -> &str {
487        &self.lock_holder
488    }
489}
490
491/// Represent a successful result of a locking attempt, either by
492/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
493#[derive(Debug)]
494#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
495pub enum CrossProcessLockState {
496    /// The lock has been obtained successfully, all good.
497    Clean(CrossProcessLockGuard),
498
499    /// The lock has been obtained successfully, but the lock is dirty!
500    ///
501    /// This holder has obtained this cross-process lock once, then another
502    /// holder has obtained this cross-process lock _before_ this holder
503    /// obtained it again. The lock is marked as dirty. It means the value
504    /// protected by the cross-process lock may need to be reloaded if
505    /// synchronisation is important.
506    ///
507    /// Until [`CrossProcessLock::clear_dirty`] is called,
508    /// [`CrossProcessLock::is_dirty`], [`CrossProcessLock::try_lock_once`] and
509    /// [`CrossProcessLock::spin_lock`] will report the lock as dirty. Put it
510    /// differently: dirty once, dirty forever, unless
511    /// [`CrossProcessLock::clear_dirty`] is called.
512    Dirty(CrossProcessLockGuard),
513}
514
515impl CrossProcessLockState {
516    /// Map this value into the inner [`CrossProcessLockGuard`].
517    pub fn into_guard(self) -> CrossProcessLockGuard {
518        match self {
519            Self::Clean(guard) | Self::Dirty(guard) => guard,
520        }
521    }
522
523    /// Map this [`CrossProcessLockState`] into a
524    /// [`MappedCrossProcessLockState`].
525    ///
526    /// This is helpful when one wants to create its own wrapper over
527    /// [`CrossProcessLockGuard`].
528    pub fn map<F, G>(self, mapper: F) -> MappedCrossProcessLockState<G>
529    where
530        F: FnOnce(CrossProcessLockGuard) -> G,
531    {
532        match self {
533            Self::Clean(guard) => MappedCrossProcessLockState::Clean(mapper(guard)),
534            Self::Dirty(guard) => MappedCrossProcessLockState::Dirty(mapper(guard)),
535        }
536    }
537}
538
539/// A mapped [`CrossProcessLockState`].
540///
541/// Created by [`CrossProcessLockState::map`].
542#[derive(Debug)]
543#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
544pub enum MappedCrossProcessLockState<G> {
545    /// The equivalent of [`CrossProcessLockState::Clean`].
546    Clean(G),
547
548    /// The equivalent of [`CrossProcessLockState::Dirty`].
549    Dirty(G),
550}
551
552impl<G> MappedCrossProcessLockState<G> {
553    /// Return `Some(G)` if `Self` is [`Clean`][Self::Clean].
554    pub fn as_clean(&self) -> Option<&G> {
555        match self {
556            Self::Clean(guard) => Some(guard),
557            Self::Dirty(_) => None,
558        }
559    }
560}
561
562/// Represent an unsuccessful result of a lock attempt, either by
563/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
564#[derive(Debug, thiserror::Error)]
565pub enum CrossProcessLockUnobtained {
566    /// The lock couldn't be obtained immediately because it is busy, i.e. it is
567    /// held by another holder.
568    #[error(
569        "The lock couldn't be obtained immediately because it is busy, i.e. it is held by another holder"
570    )]
571    Busy,
572
573    /// The lock couldn't be obtained after several attempts: locking has timed
574    /// out.
575    #[error("The lock couldn't be obtained after several attempts: locking has timed out")]
576    TimedOut,
577}
578
579/// Union of [`CrossProcessLockUnobtained`] and [`TryLock::LockError`].
580#[derive(Debug, thiserror::Error)]
581pub enum CrossProcessLockError {
582    #[error(transparent)]
583    Unobtained(#[from] CrossProcessLockUnobtained),
584
585    #[error(transparent)]
586    #[cfg(not(target_family = "wasm"))]
587    TryLock(#[from] Box<dyn Error + Send + Sync>),
588
589    #[error(transparent)]
590    #[cfg(target_family = "wasm")]
591    TryLock(#[from] Box<dyn Error>),
592}
593
594#[cfg(test)]
595#[cfg(not(target_family = "wasm"))] // These tests require tokio::time, which is not implemented on wasm.
596mod tests {
597    use std::{
598        collections::HashMap,
599        ops::Not,
600        sync::{Arc, RwLock, atomic},
601    };
602
603    use assert_matches::assert_matches;
604    use matrix_sdk_test_macros::async_test;
605    use tokio::{spawn, task::yield_now};
606
607    use super::{
608        CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockState,
609        CrossProcessLockUnobtained, TryLock,
610        memory_store_helper::{Lease, try_take_leased_lock},
611    };
612
613    #[derive(Clone, Default)]
614    struct TestStore {
615        leases: Arc<RwLock<HashMap<String, Lease>>>,
616    }
617
618    impl TestStore {
619        fn try_take_leased_lock(
620            &self,
621            lease_duration_ms: u32,
622            key: &str,
623            holder: &str,
624        ) -> Option<CrossProcessLockGeneration> {
625            try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
626        }
627    }
628
629    #[derive(Debug, thiserror::Error)]
630    enum DummyError {}
631
632    impl From<DummyError> for CrossProcessLockError {
633        fn from(value: DummyError) -> Self {
634            Self::TryLock(Box::new(value))
635        }
636    }
637
638    impl TryLock for TestStore {
639        type LockError = DummyError;
640
641        /// Try to take a lock using the given store.
642        async fn try_lock(
643            &self,
644            lease_duration_ms: u32,
645            key: &str,
646            holder: &str,
647        ) -> Result<Option<CrossProcessLockGeneration>, Self::LockError> {
648            Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
649        }
650    }
651
652    async fn release_lock(lock: CrossProcessLockState) {
653        drop(lock);
654        yield_now().await;
655    }
656
657    type TestResult = Result<(), CrossProcessLockError>;
658
659    #[async_test]
660    async fn test_simple_lock_unlock() -> TestResult {
661        let store = TestStore::default();
662        let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned());
663
664        // The lock plain works when used with a single holder.
665        let guard = lock.try_lock_once().await?.expect("lock must be obtained successfully");
666        assert_matches!(guard, CrossProcessLockState::Clean(_));
667        assert!(lock.is_dirty().not());
668        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
669
670        // Releasing works.
671        release_lock(guard).await;
672        assert!(lock.is_dirty().not());
673        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
674
675        // Spin locking on the same lock always works, assuming no concurrent access.
676        let guard = lock.spin_lock(None).await?.expect("spin lock must be obtained successfully");
677        assert!(lock.is_dirty().not());
678        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
679
680        // Releasing still works.
681        release_lock(guard).await;
682        assert!(lock.is_dirty().not());
683        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
684
685        Ok(())
686    }
687
688    #[async_test]
689    async fn test_self_recovery() -> TestResult {
690        let store = TestStore::default();
691        let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
692
693        // When a lock is obtained…
694        let guard = lock.try_lock_once().await?.expect("lock must be obtained successfully");
695        assert_matches!(guard, CrossProcessLockState::Clean(_));
696        assert!(lock.is_dirty().not());
697        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
698
699        // But then forgotten… (note: no need to release the guard)
700        drop(lock);
701
702        // And when rematerializing the lock with the same key/value…
703        let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
704
705        // We still got it.
706        let guard =
707            lock.try_lock_once().await?.expect("lock (again) must be obtained successfully");
708        assert_matches!(guard, CrossProcessLockState::Clean(_));
709        assert!(lock.is_dirty().not());
710        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
711
712        Ok(())
713    }
714
715    #[async_test]
716    async fn test_multiple_holders_same_process() -> TestResult {
717        let store = TestStore::default();
718        let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned());
719
720        // Taking the lock twice…
721        let guard1 = lock.try_lock_once().await?.expect("lock must be obtained successfully");
722        assert_matches!(guard1, CrossProcessLockState::Clean(_));
723        let guard2 = lock.try_lock_once().await?.expect("lock must be obtained successfully");
724        assert_matches!(guard2, CrossProcessLockState::Clean(_));
725        assert!(lock.is_dirty().not());
726
727        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2);
728
729        // … means we can release it twice.
730        release_lock(guard1).await;
731        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
732
733        release_lock(guard2).await;
734        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
735
736        assert!(lock.is_dirty().not());
737
738        Ok(())
739    }
740
741    #[async_test]
742    async fn test_multiple_processes() -> TestResult {
743        let store = TestStore::default();
744        let lock1 = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
745        let lock2 = CrossProcessLock::new(store, "key".to_owned(), "second".to_owned());
746
747        // `lock1` acquires the lock.
748        let guard1 = lock1.try_lock_once().await?.expect("lock must be obtained successfully");
749        assert_matches!(guard1, CrossProcessLockState::Clean(_));
750        assert!(lock1.is_dirty().not());
751
752        // `lock2` cannot acquire the lock.
753        let err = lock2.try_lock_once().await?.expect_err("lock must NOT be obtained");
754        assert_matches!(err, CrossProcessLockUnobtained::Busy);
755
756        // `lock2` is waiting in a task.
757        let lock2_clone = lock2.clone();
758        let task = spawn(async move { lock2_clone.spin_lock(Some(500)).await });
759
760        yield_now().await;
761
762        drop(guard1);
763
764        // Once `lock1` is released, `lock2` managed to obtain it.
765        let guard2 = task
766            .await
767            .expect("join handle is properly awaited")
768            .expect("lock is successfully attempted")
769            .expect("lock must be obtained successfully");
770        assert_matches!(guard2, CrossProcessLockState::Clean(_));
771
772        // `lock1` and `lock2` are both clean!
773        assert!(lock1.is_dirty().not());
774        assert!(lock2.is_dirty().not());
775
776        // Now if `lock1` tries to obtain the lock with a small timeout, it will fail.
777        assert_matches!(
778            lock1.spin_lock(Some(200)).await,
779            Ok(Err(CrossProcessLockUnobtained::TimedOut))
780        );
781
782        Ok(())
783    }
784
785    #[async_test]
786    async fn test_multiple_processes_up_to_dirty() -> TestResult {
787        let store = TestStore::default();
788        let lock1 = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
789        let lock2 = CrossProcessLock::new(store, "key".to_owned(), "second".to_owned());
790
791        // Obtain `lock1` once.
792        {
793            let guard = lock1.try_lock_once().await?.expect("lock must be obtained successfully");
794            assert_matches!(guard, CrossProcessLockState::Clean(_));
795            assert!(lock1.is_dirty().not());
796            drop(guard);
797
798            yield_now().await;
799        }
800
801        // Obtain `lock2` once.
802        {
803            let guard = lock2.try_lock_once().await?.expect("lock must be obtained successfully");
804            assert_matches!(guard, CrossProcessLockState::Clean(_));
805            assert!(lock1.is_dirty().not());
806            drop(guard);
807
808            yield_now().await;
809        }
810
811        for _ in 0..3 {
812            // Obtain `lock1` once more. Now it's dirty because `lock2` has acquired the
813            // lock meanwhile.
814            {
815                let guard =
816                    lock1.try_lock_once().await?.expect("lock must be obtained successfully");
817                assert_matches!(guard, CrossProcessLockState::Dirty(_));
818                assert!(lock1.is_dirty());
819
820                drop(guard);
821                yield_now().await;
822            }
823
824            // Obtain `lock1` once more! It still dirty because it has not been marked as
825            // non-dirty.
826            {
827                let guard =
828                    lock1.try_lock_once().await?.expect("lock must be obtained successfully");
829                assert_matches!(guard, CrossProcessLockState::Dirty(_));
830                assert!(lock1.is_dirty());
831                lock1.clear_dirty();
832
833                drop(guard);
834                yield_now().await;
835            }
836
837            // Obtain `lock1` once more. Now it's clear!
838            {
839                let guard =
840                    lock1.try_lock_once().await?.expect("lock must be obtained successfully");
841                assert_matches!(guard, CrossProcessLockState::Clean(_));
842                assert!(lock1.is_dirty().not());
843
844                drop(guard);
845                yield_now().await;
846            }
847
848            // Same dance with `lock2`!
849            {
850                let guard =
851                    lock2.try_lock_once().await?.expect("lock must be obtained successfully");
852                assert_matches!(guard, CrossProcessLockState::Dirty(_));
853                assert!(lock2.is_dirty());
854                lock2.clear_dirty();
855
856                drop(guard);
857                yield_now().await;
858            }
859        }
860
861        Ok(())
862    }
863}
864
865/// Some code that is shared by almost all `MemoryStore` implementations out
866/// there.
867pub mod memory_store_helper {
868    use std::collections::{HashMap, hash_map::Entry};
869
870    use ruma::time::{Duration, Instant};
871
872    use super::{CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION};
873
874    #[derive(Debug)]
875    pub struct Lease {
876        holder: String,
877        expiration: Instant,
878        generation: CrossProcessLockGeneration,
879    }
880
881    pub fn try_take_leased_lock(
882        leases: &mut HashMap<String, Lease>,
883        lease_duration_ms: u32,
884        key: &str,
885        holder: &str,
886    ) -> Option<CrossProcessLockGeneration> {
887        let now = Instant::now();
888        let expiration = now + Duration::from_millis(lease_duration_ms.into());
889
890        match leases.entry(key.to_owned()) {
891            // There is an existing holder.
892            Entry::Occupied(mut entry) => {
893                let Lease {
894                    holder: current_holder,
895                    expiration: current_expiration,
896                    generation: current_generation,
897                } = entry.get_mut();
898
899                if current_holder == holder {
900                    // We had the lease before, extend it.
901                    *current_expiration = expiration;
902
903                    Some(*current_generation)
904                } else {
905                    // We didn't have it.
906                    if *current_expiration < now {
907                        // Steal it!
908                        *current_holder = holder.to_owned();
909                        *current_expiration = expiration;
910                        *current_generation += 1;
911
912                        Some(*current_generation)
913                    } else {
914                        // We tried our best.
915                        None
916                    }
917                }
918            }
919
920            // There is no holder, easy.
921            Entry::Vacant(entry) => {
922                entry.insert(Lease {
923                    holder: holder.to_owned(),
924                    expiration: Instant::now() + Duration::from_millis(lease_duration_ms.into()),
925                    generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
926                });
927
928                Some(FIRST_CROSS_PROCESS_LOCK_GENERATION)
929            }
930        }
931    }
932}