Skip to main content

matrix_sdk_common/
cross_process_lock.rs

1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cross-process lock implementation.
16//!
17//! This is a per-process lock that may be used only for very specific use
18//! cases, where multiple processes might concurrently write to the same
19//! database at the same time; this would invalidate store caches, so
20//! that should be done mindfully. Such a lock can be obtained multiple times by
21//! the same process, and it remains active as long as there's at least one user
22//! in a given process.
23//!
24//! The lock is implemented using time-based leases. The lock maintains the lock
25//! identifier (key), who's the current holder (value), and an expiration
26//! timestamp on the side; see also `CryptoStore::try_take_leased_lock` for more
27//! details.
28//!
29//! The lock is initially obtained for a certain period of time (namely, the
30//! duration of a lease, aka `LEASE_DURATION_MS`), and then a “heartbeat” task
31//! renews the lease to extend its duration, every so often (namely, every
32//! `EXTEND_LEASE_EVERY_MS`). Since the Tokio scheduler might be busy, the
33//! extension request should happen way more frequently than the duration of a
34//! lease, in case a deadline is missed. The current values have been chosen to
35//! reflect that, with a ratio of 1:10 as of 2023-06-23.
36//!
37//! Releasing the lock happens naturally, by not renewing a lease. It happens
38//! automatically after the duration of the last lease, at most.
39
40use std::{
41    error::Error,
42    future::Future,
43    sync::{
44        Arc,
45        atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
46    },
47    time::Duration,
48};
49
50use tokio::sync::Mutex;
51use tracing::{debug, error, instrument, trace, warn};
52
53use crate::{
54    SendOutsideWasm,
55    executor::{JoinHandle, spawn},
56    sleep::sleep,
57};
58
59/// A lock generation is an integer incremented each time the lock is taken by
60/// a different holder.
61///
62/// This is used to know if a lock has been dirtied.
63pub type CrossProcessLockGeneration = u64;
64
65/// A trait that represents any function which can be used to
66/// acquire the underlying lock of a [`CrossProcessLock`].
67///
68/// For example, this can be useful when writing a function which
69/// is parameterized to acquire the underlying lock through either
70/// [`CrossProcessLock::spin_lock`] or [`CrossProcessLock::try_lock_once`].
71pub trait AcquireCrossProcessLockFn<L>
72where
73    Self: AsyncFn(&CrossProcessLock<L>) -> AcquireCrossProcessLockResult<L::LockError>,
74    L: TryLock + Clone + SendOutsideWasm + 'static,
75{
76}
77
78impl<L, T> AcquireCrossProcessLockFn<L> for T
79where
80    T: AsyncFn(&CrossProcessLock<L>) -> AcquireCrossProcessLockResult<L::LockError>,
81    L: TryLock + Clone + SendOutsideWasm + 'static,
82{
83}
84
85/// A convenience type for the [`Result`] returned from calling
86/// or [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
87pub type AcquireCrossProcessLockResult<E> =
88    Result<Result<CrossProcessLockState, CrossProcessLockUnobtained>, E>;
89
90/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
91pub trait TryLock {
92    #[cfg(not(target_family = "wasm"))]
93    type LockError: Error + Send + Sync;
94
95    #[cfg(target_family = "wasm")]
96    type LockError: Error;
97
98    /// Try to take a leased lock.
99    ///
100    /// This attempts to take a lock for the given lease duration.
101    ///
102    /// - If we already had the lease, this will extend the lease.
103    /// - If we didn't, but the previous lease has expired, we will obtain the
104    ///   lock.
105    /// - If there was no previous lease, we will obtain the lock.
106    /// - Otherwise, we don't get the lock.
107    ///
108    /// Returns `Some(_)` to indicate the lock succeeded, `None` otherwise. The
109    /// cross-process lock generation must be compared to the generation before
110    /// the call to see if the lock has been dirtied: a different generation
111    /// means the lock has been dirtied, i.e. taken by a different holder in
112    /// the meantime.
113    fn try_lock(
114        &self,
115        lease_duration_ms: u32,
116        key: &str,
117        holder: &str,
118    ) -> impl Future<Output = Result<Option<CrossProcessLockGeneration>, Self::LockError>>
119    + SendOutsideWasm;
120}
121
122/// Small state machine to handle wait times.
123#[derive(Clone, Debug)]
124enum WaitingTime {
125    /// Some time to wait, in milliseconds.
126    Some(u32),
127    /// Stop waiting when seeing this value.
128    Stop,
129}
130
131/// A guard of a cross-process lock.
132///
133/// The lock will be automatically released a short period of time after all the
134/// guards have dropped.
135#[derive(Clone, Debug)]
136#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
137pub struct CrossProcessLockGuard {
138    /// A clone of [`CrossProcessLock::num_holders`].
139    num_holders: Arc<AtomicU32>,
140
141    /// A clone of [`CrossProcessLock::is_dirty`].
142    is_dirty: Arc<AtomicBool>,
143}
144
145impl CrossProcessLockGuard {
146    fn new(num_holders: Arc<AtomicU32>, is_dirty: Arc<AtomicBool>) -> Self {
147        Self { num_holders, is_dirty }
148    }
149
150    /// Determine whether the cross-process lock associated to this guard is
151    /// dirty.
152    ///
153    /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
154    /// of _dirty_.
155    pub fn is_dirty(&self) -> bool {
156        self.is_dirty.load(Ordering::SeqCst)
157    }
158
159    /// Clear the dirty state from the cross-process lock associated to this
160    /// guard.
161    ///
162    /// If the cross-process lock is dirtied, it will remain dirtied until
163    /// this method is called. This allows recovering from a dirty state and
164    /// marking that it has recovered.
165    pub fn clear_dirty(&self) {
166        self.is_dirty.store(false, Ordering::SeqCst);
167    }
168}
169
170impl Drop for CrossProcessLockGuard {
171    fn drop(&mut self) {
172        self.num_holders.fetch_sub(1, Ordering::SeqCst);
173    }
174}
175
176/// A cross-process lock implementation.
177///
178/// See the doc-comment of this module for more information.
179#[derive(Clone, Debug)]
180pub struct CrossProcessLock<L>
181where
182    L: TryLock + Clone + SendOutsideWasm + 'static,
183{
184    /// The locker implementation.
185    ///
186    /// `L` is responsible for trying to take the lock, while
187    /// [`CrossProcessLock`] is responsible to make it cross-process, with the
188    /// retry mechanism, plus guard and so on.
189    locker: L,
190
191    /// Number of holders of the lock in this process.
192    ///
193    /// If greater than 0, this means we've already obtained this lock, in this
194    /// process, and the store lock mustn't be touched.
195    ///
196    /// When the number of holders is decreased to 0, then the lock must be
197    /// released in the store.
198    num_holders: Arc<AtomicU32>,
199
200    /// A mutex to control an attempt to take the lock, to avoid making it
201    /// reentrant.
202    locking_attempt: Arc<Mutex<()>>,
203
204    /// Current renew task spawned by `try_lock_once`.
205    renew_task: Arc<Mutex<Option<JoinHandle<()>>>>,
206
207    /// The key used in the key/value mapping for the lock entry.
208    lock_key: String,
209
210    /// The cross-process lock configuration.
211    config: CrossProcessLockConfig,
212
213    /// Backoff time, in milliseconds.
214    backoff: Arc<Mutex<WaitingTime>>,
215
216    /// This lock generation.
217    generation: Arc<AtomicU64>,
218
219    /// Whether the lock has been dirtied.
220    ///
221    /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
222    /// of _dirty_.
223    is_dirty: Arc<AtomicBool>,
224}
225
226/// Amount of time a lease of the lock should last, in milliseconds.
227pub const LEASE_DURATION_MS: u32 = 500;
228
229/// Period of time between two attempts to extend the lease. We'll
230/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
231/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
232/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
233/// that we can miss a deadline without compromising the lock.
234pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
235
236/// Initial backoff, in milliseconds. This is the time we wait the first
237/// time, if taking the lock initially failed.
238const INITIAL_BACKOFF_MS: u32 = 10;
239
240/// Maximal backoff, in milliseconds. This is the maximum amount of time
241/// we'll wait for the lock, *between two attempts*.
242pub const MAX_BACKOFF_MS: u32 = 1000;
243
244/// Sentinel value representing the absence of a lock generation value.
245///
246/// When the lock is created, it has no generation. Once locked, it receives its
247/// first generation from [`TryLock::try_lock`]. Subsequent lockings may
248/// generate new lock generation. The generation is incremented by 1 every time.
249///
250/// The first generation is defined by [`FIRST_CROSS_PROCESS_LOCK_GENERATION`].
251pub const NO_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 0;
252
253/// Describe the first lock generation value (see
254/// [`CrossProcessLockGeneration`]).
255pub const FIRST_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 1;
256
257impl<L> CrossProcessLock<L>
258where
259    L: TryLock + Clone + SendOutsideWasm + 'static,
260{
261    /// Create a new cross-process lock.
262    ///
263    /// # Parameters
264    ///
265    /// - `lock_key`: key in the key-value store to store the lock's state.
266    /// - `config`: the cross-process lock configuration to use, if it's
267    ///   [`CrossProcessLockConfig::SingleProcess`], no actual lock will be
268    ///   taken.
269    pub fn new(locker: L, lock_key: String, config: CrossProcessLockConfig) -> Self {
270        Self {
271            locker,
272            lock_key,
273            config,
274            backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
275            num_holders: Arc::new(0.into()),
276            locking_attempt: Arc::new(Mutex::new(())),
277            renew_task: Default::default(),
278            generation: Arc::new(AtomicU64::new(NO_CROSS_PROCESS_LOCK_GENERATION)),
279            is_dirty: Arc::new(AtomicBool::new(false)),
280        }
281    }
282
283    /// Determine whether the cross-process lock is dirty.
284    ///
285    /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
286    /// of _dirty_.
287    pub fn is_dirty(&self) -> bool {
288        self.is_dirty.load(Ordering::SeqCst)
289    }
290
291    /// Clear the dirty state from this cross-process lock.
292    ///
293    /// If the cross-process lock is dirtied, it will remain dirtied until
294    /// this method is called. This allows recovering from a dirty state and
295    /// marking that it has recovered.
296    pub fn clear_dirty(&self) {
297        self.is_dirty.store(false, Ordering::SeqCst);
298    }
299
300    /// Try to lock once, returns whether the lock was obtained or not.
301    ///
302    /// The lock can be obtained but it can be dirty. In all cases, the renew
303    /// task will run in the background.
304    #[instrument(skip(self), fields(?self.lock_key, ?self.config, ?self.generation))]
305    pub async fn try_lock_once(&self) -> AcquireCrossProcessLockResult<L::LockError> {
306        // If it's not `MultiProcess`, this behaves as a no-op
307        let CrossProcessLockConfig::MultiProcess { holder_name } = &self.config else {
308            let guard = CrossProcessLockGuard::new(self.num_holders.clone(), self.is_dirty.clone());
309            return Ok(Ok(CrossProcessLockState::Clean(guard)));
310        };
311
312        // Hold onto the locking attempt mutex for the entire lifetime of this
313        // function, to avoid multiple reentrant calls.
314        let mut _attempt = self.locking_attempt.lock().await;
315
316        // If another thread obtained the lock, make sure to only superficially increase
317        // the number of holders, and carry on.
318        if self.num_holders.load(Ordering::SeqCst) > 0 {
319            // Note: between the above load and the fetch_add below, another thread may
320            // decrement `num_holders`. That's fine because that means the lock
321            // was taken by at least one thread, and after this call it will be
322            // taken by at least one thread.
323            trace!("We already had the lock, incrementing holder count");
324
325            self.num_holders.fetch_add(1, Ordering::SeqCst);
326
327            return Ok(Ok(CrossProcessLockState::Clean(CrossProcessLockGuard::new(
328                self.num_holders.clone(),
329                self.is_dirty.clone(),
330            ))));
331        }
332
333        if let Some(new_generation) =
334            self.locker.try_lock(LEASE_DURATION_MS, &self.lock_key, holder_name).await?
335        {
336            match self.generation.swap(new_generation, Ordering::SeqCst) {
337                // If there was no lock generation, it means this is the first time the lock is
338                // obtained. It cannot be dirty.
339                NO_CROSS_PROCESS_LOCK_GENERATION => {
340                    trace!(?new_generation, "Setting the lock generation for the first time");
341                }
342
343                // This was NOT the same generation, the lock has been dirtied!
344                previous_generation if previous_generation != new_generation => {
345                    warn!(
346                        ?previous_generation,
347                        ?new_generation,
348                        "The lock has been obtained, but it's been dirtied!"
349                    );
350                    self.is_dirty.store(true, Ordering::SeqCst);
351                }
352
353                // This was the same generation, no problem.
354                _ => {
355                    trace!("Same lock generation; no problem");
356                }
357            }
358
359            trace!("Lock obtained!");
360        } else {
361            trace!("Couldn't obtain the lock immediately.");
362            return Ok(Err(CrossProcessLockUnobtained::Busy));
363        }
364
365        trace!("Obtained the lock, spawning the lease extension task.");
366
367        // This is the first time we've obtaind the lock. We're going to spawn the task
368        // that will renew the lease.
369
370        // Clone data to be owned by the task.
371        let this = (*self).clone();
372
373        let mut renew_task = self.renew_task.lock().await;
374
375        // Cancel the previous task, if any. That's safe to do, because:
376        // - either the task was done,
377        // - or it was still running, but taking a lock in the db has to be an atomic
378        //   operation running in a transaction.
379
380        if let Some(_prev) = renew_task.take() {
381            #[cfg(not(target_family = "wasm"))]
382            if !_prev.is_finished() {
383                trace!("aborting the previous renew task");
384                _prev.abort();
385            }
386        }
387
388        // Restart a new one.
389        *renew_task = Some(spawn(async move {
390            let CrossProcessLockConfig::MultiProcess { holder_name } = this.config else { return };
391            loop {
392                {
393                    // First, check if there are still users of this lock.
394                    //
395                    // This is not racy, because:
396                    // - the `locking_attempt` mutex makes sure we don't have unexpected
397                    // interactions with the non-atomic sequence above in `try_lock_once`
398                    // (check > 0, then add 1).
399                    // - other entities holding onto the `num_holders` atomic will only
400                    // decrease it over time.
401
402                    let _guard = this.locking_attempt.lock().await;
403
404                    // If there are no more users, we can quit.
405                    if this.num_holders.load(Ordering::SeqCst) == 0 {
406                        trace!("exiting the lease extension loop");
407
408                        // Cancel the lease with another 0ms lease.
409                        // If we don't get the lock, that's (weird but) fine.
410                        let fut = this.locker.try_lock(0, &this.lock_key, &holder_name);
411                        let _ = fut.await;
412
413                        // Exit the loop.
414                        break;
415                    }
416                }
417
418                sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
419
420                match this.locker.try_lock(LEASE_DURATION_MS, &this.lock_key, &holder_name).await {
421                    Ok(Some(_generation)) => {
422                        // It's impossible that the generation can be
423                        // different from the previous generation.
424                        //
425                        // As long as the task runs, the lock is renewed, so
426                        // the generation remains the same. If the lock is not
427                        // taken, it's because the lease has expired, which
428                        // is represented by the
429                        // `Ok(None)` value, and the task
430                        // must stop.
431                    }
432
433                    Ok(None) => {
434                        error!("Failed to renew the lock lease: the lock could not be obtained");
435
436                        // Exit the loop.
437                        break;
438                    }
439
440                    Err(err) => {
441                        error!("Error when extending the lock lease: {err:#}");
442
443                        // Exit the loop.
444                        break;
445                    }
446                }
447            }
448        }));
449
450        self.num_holders.fetch_add(1, Ordering::SeqCst);
451
452        let guard = CrossProcessLockGuard::new(self.num_holders.clone(), self.is_dirty.clone());
453
454        Ok(Ok(if self.is_dirty() {
455            CrossProcessLockState::Dirty(guard)
456        } else {
457            CrossProcessLockState::Clean(guard)
458        }))
459    }
460
461    /// Attempt to take the lock, with exponential backoff if the lock has
462    /// already been taken before.
463    ///
464    /// The `max_backoff` parameter is the maximum time (in milliseconds) that
465    /// should be waited for, between two attempts. When that time is
466    /// reached a second time, the lock will stop attempting to get the lock
467    /// and will return a timeout error upon locking. If not provided,
468    /// will wait for [`MAX_BACKOFF_MS`].
469    #[instrument(skip(self), fields(?self.lock_key, ?self.config))]
470    pub async fn spin_lock(
471        &self,
472        max_backoff: Option<u32>,
473    ) -> AcquireCrossProcessLockResult<L::LockError> {
474        // If there is no holder, this behaves as a no-op
475        let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
476
477        // Note: reads/writes to the backoff are racy across threads in theory, but the
478        // lock in `try_lock_once` should sequentialize it all.
479
480        loop {
481            // If the cross-process lock config is not `MultiProcess`, this behaves as a
482            // no-op and we just return
483            let lock_result = self.try_lock_once().await?;
484
485            if lock_result.is_ok() {
486                if matches!(self.config, CrossProcessLockConfig::MultiProcess { .. }) {
487                    // Reset backoff before returning, for the next attempt to lock.
488                    *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
489                }
490
491                return Ok(lock_result);
492            }
493
494            // Exponential backoff! Multiply by 2 the time we've waited before, cap it to
495            // max_backoff.
496            let mut backoff = self.backoff.lock().await;
497
498            let wait = match &mut *backoff {
499                WaitingTime::Some(val) => {
500                    let wait = *val;
501                    *val = val.saturating_mul(2);
502                    if *val >= max_backoff {
503                        *backoff = WaitingTime::Stop;
504                    }
505                    wait
506                }
507                WaitingTime::Stop => {
508                    // We've reached the maximum backoff, abandon.
509                    return Ok(Err(CrossProcessLockUnobtained::TimedOut));
510                }
511            };
512
513            debug!("Waiting {wait} before re-attempting to take the lock");
514            sleep(Duration::from_millis(wait.into())).await;
515        }
516    }
517
518    /// Returns the value in the database that represents the holder's
519    /// identifier.
520    pub fn lock_holder(&self) -> Option<&str> {
521        self.config.holder_name()
522    }
523}
524
525/// Represent a successful result of a locking attempt, either by
526/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
527#[derive(Debug)]
528#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
529pub enum CrossProcessLockState {
530    /// The lock has been obtained successfully, all good.
531    Clean(CrossProcessLockGuard),
532
533    /// The lock has been obtained successfully, but the lock is dirty!
534    ///
535    /// This holder has obtained this cross-process lock once, then another
536    /// holder has obtained this cross-process lock _before_ this holder
537    /// obtained it again. The lock is marked as dirty. It means the value
538    /// protected by the cross-process lock may need to be reloaded if
539    /// synchronisation is important.
540    ///
541    /// Until [`CrossProcessLock::clear_dirty`] is called,
542    /// [`CrossProcessLock::is_dirty`], [`CrossProcessLock::try_lock_once`] and
543    /// [`CrossProcessLock::spin_lock`] will report the lock as dirty. Put it
544    /// differently: dirty once, dirty forever, unless
545    /// [`CrossProcessLock::clear_dirty`] is called.
546    Dirty(CrossProcessLockGuard),
547}
548
549impl CrossProcessLockState {
550    /// Map this value into the inner [`CrossProcessLockGuard`].
551    pub fn into_guard(self) -> CrossProcessLockGuard {
552        match self {
553            Self::Clean(guard) | Self::Dirty(guard) => guard,
554        }
555    }
556
557    /// Map this [`CrossProcessLockState`] into a
558    /// [`MappedCrossProcessLockState`].
559    ///
560    /// This is helpful when one wants to create its own wrapper over
561    /// [`CrossProcessLockGuard`].
562    pub fn map<F, G>(self, mapper: F) -> MappedCrossProcessLockState<G>
563    where
564        F: FnOnce(CrossProcessLockGuard) -> G,
565    {
566        match self {
567            Self::Clean(guard) => MappedCrossProcessLockState::Clean(mapper(guard)),
568            Self::Dirty(guard) => MappedCrossProcessLockState::Dirty(mapper(guard)),
569        }
570    }
571}
572
573/// A mapped [`CrossProcessLockState`].
574///
575/// Created by [`CrossProcessLockState::map`].
576#[derive(Debug)]
577#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
578pub enum MappedCrossProcessLockState<G> {
579    /// The equivalent of [`CrossProcessLockState::Clean`].
580    Clean(G),
581
582    /// The equivalent of [`CrossProcessLockState::Dirty`].
583    Dirty(G),
584}
585
586impl<G> MappedCrossProcessLockState<G> {
587    /// Return `Some(G)` if `Self` is [`Clean`][Self::Clean].
588    pub fn as_clean(&self) -> Option<&G> {
589        match self {
590            Self::Clean(guard) => Some(guard),
591            Self::Dirty(_) => None,
592        }
593    }
594}
595
596/// Represent an unsuccessful result of a lock attempt, either by
597/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
598#[derive(Clone, Debug, thiserror::Error)]
599pub enum CrossProcessLockUnobtained {
600    /// The lock couldn't be obtained immediately because it is busy, i.e. it is
601    /// held by another holder.
602    #[error(
603        "The lock couldn't be obtained immediately because it is busy, i.e. it is held by another holder"
604    )]
605    Busy,
606
607    /// The lock couldn't be obtained after several attempts: locking has timed
608    /// out.
609    #[error("The lock couldn't be obtained after several attempts: locking has timed out")]
610    TimedOut,
611}
612
613/// Union of [`CrossProcessLockUnobtained`] and [`TryLock::LockError`].
614#[derive(Clone, Debug, thiserror::Error)]
615pub enum CrossProcessLockError {
616    #[error(transparent)]
617    Unobtained(#[from] CrossProcessLockUnobtained),
618
619    #[error(transparent)]
620    #[cfg(not(target_family = "wasm"))]
621    TryLock(#[from] Arc<dyn Error + Send + Sync>),
622
623    #[error(transparent)]
624    #[cfg(target_family = "wasm")]
625    TryLock(#[from] Arc<dyn Error>),
626}
627
628/// The cross-process lock config to use for the various stores.
629#[derive(Clone, Debug)]
630pub enum CrossProcessLockConfig {
631    /// The stores will be used in multiple processes, the holder name for the
632    /// cross-process lock is the associated `String`.
633    MultiProcess {
634        /// The name of the holder of the cross-process lock.
635        holder_name: String,
636    },
637    /// The stores will be used in a single process, there is no need for a
638    /// cross-process lock.
639    SingleProcess,
640}
641
642impl CrossProcessLockConfig {
643    /// Helper for quickly creating a [`CrossProcessLockConfig::MultiProcess`]
644    /// variant.
645    pub fn multi_process(holder_name: impl Into<String>) -> Self {
646        Self::MultiProcess { holder_name: holder_name.into() }
647    }
648
649    /// The holder name for the cross-process lock. This is only relevant for
650    /// [`CrossProcessLockConfig::MultiProcess`] variants.
651    pub fn holder_name(&self) -> Option<&str> {
652        match self {
653            Self::MultiProcess { holder_name } => Some(holder_name),
654            Self::SingleProcess => None,
655        }
656    }
657}
658
659#[cfg(test)]
660#[cfg(not(target_family = "wasm"))] // These tests require tokio::time, which is not implemented on wasm.
661mod tests {
662    use std::{
663        collections::HashMap,
664        ops::Not,
665        sync::{Arc, RwLock, atomic},
666    };
667
668    use assert_matches::assert_matches;
669    use matrix_sdk_test_macros::async_test;
670    use tokio::{spawn, task::yield_now};
671
672    use super::{
673        CrossProcessLock, CrossProcessLockConfig, CrossProcessLockError,
674        CrossProcessLockGeneration, CrossProcessLockState, CrossProcessLockUnobtained, TryLock,
675        memory_store_helper::{Lease, try_take_leased_lock},
676    };
677
678    #[derive(Clone, Default)]
679    struct TestStore {
680        leases: Arc<RwLock<HashMap<String, Lease>>>,
681    }
682
683    impl TestStore {
684        fn try_take_leased_lock(
685            &self,
686            lease_duration_ms: u32,
687            key: &str,
688            holder: &str,
689        ) -> Option<CrossProcessLockGeneration> {
690            try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
691        }
692    }
693
694    #[derive(Debug, thiserror::Error)]
695    enum DummyError {}
696
697    impl From<DummyError> for CrossProcessLockError {
698        fn from(value: DummyError) -> Self {
699            Self::TryLock(Arc::new(value))
700        }
701    }
702
703    impl TryLock for TestStore {
704        type LockError = DummyError;
705
706        /// Try to take a lock using the given store.
707        async fn try_lock(
708            &self,
709            lease_duration_ms: u32,
710            key: &str,
711            holder: &str,
712        ) -> Result<Option<CrossProcessLockGeneration>, Self::LockError> {
713            Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
714        }
715    }
716
717    async fn release_lock(lock: CrossProcessLockState) {
718        drop(lock);
719        yield_now().await;
720    }
721
722    type TestResult = Result<(), CrossProcessLockError>;
723
724    #[async_test]
725    async fn test_simple_lock_unlock() -> TestResult {
726        let store = TestStore::default();
727        let lock = CrossProcessLock::new(
728            store,
729            "key".to_owned(),
730            CrossProcessLockConfig::multi_process("first"),
731        );
732
733        // The lock plain works when used with a single holder.
734        let guard = lock.try_lock_once().await?.expect("lock must be obtained successfully");
735        assert_matches!(guard, CrossProcessLockState::Clean(_));
736        assert!(lock.is_dirty().not());
737        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
738
739        // Releasing works.
740        release_lock(guard).await;
741        assert!(lock.is_dirty().not());
742        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
743
744        // Spin locking on the same lock always works, assuming no concurrent access.
745        let guard = lock.spin_lock(None).await?.expect("spin lock must be obtained successfully");
746        assert!(lock.is_dirty().not());
747        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
748
749        // Releasing still works.
750        release_lock(guard).await;
751        assert!(lock.is_dirty().not());
752        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
753
754        Ok(())
755    }
756
757    #[async_test]
758    async fn test_self_recovery() -> TestResult {
759        let store = TestStore::default();
760        let lock = CrossProcessLock::new(
761            store.clone(),
762            "key".to_owned(),
763            CrossProcessLockConfig::multi_process("first"),
764        );
765
766        // When a lock is obtained…
767        let guard = lock.try_lock_once().await?.expect("lock must be obtained successfully");
768        assert_matches!(guard, CrossProcessLockState::Clean(_));
769        assert!(lock.is_dirty().not());
770        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
771
772        // But then forgotten… (note: no need to release the guard)
773        drop(lock);
774
775        // And when rematerializing the lock with the same key/value…
776        let lock = CrossProcessLock::new(
777            store.clone(),
778            "key".to_owned(),
779            CrossProcessLockConfig::multi_process("first"),
780        );
781
782        // We still got it.
783        let guard =
784            lock.try_lock_once().await?.expect("lock (again) must be obtained successfully");
785        assert_matches!(guard, CrossProcessLockState::Clean(_));
786        assert!(lock.is_dirty().not());
787        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
788
789        Ok(())
790    }
791
792    #[async_test]
793    async fn test_multiple_holders_same_process() -> TestResult {
794        let store = TestStore::default();
795        let lock = CrossProcessLock::new(
796            store,
797            "key".to_owned(),
798            CrossProcessLockConfig::multi_process("first"),
799        );
800
801        // Taking the lock twice…
802        let guard1 = lock.try_lock_once().await?.expect("lock must be obtained successfully");
803        assert_matches!(guard1, CrossProcessLockState::Clean(_));
804        let guard2 = lock.try_lock_once().await?.expect("lock must be obtained successfully");
805        assert_matches!(guard2, CrossProcessLockState::Clean(_));
806        assert!(lock.is_dirty().not());
807
808        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2);
809
810        // … means we can release it twice.
811        release_lock(guard1).await;
812        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
813
814        release_lock(guard2).await;
815        assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
816
817        assert!(lock.is_dirty().not());
818
819        Ok(())
820    }
821
822    #[async_test]
823    async fn test_multiple_processes() -> TestResult {
824        let store = TestStore::default();
825        let lock1 = CrossProcessLock::new(
826            store.clone(),
827            "key".to_owned(),
828            CrossProcessLockConfig::multi_process("first"),
829        );
830        let lock2 = CrossProcessLock::new(
831            store,
832            "key".to_owned(),
833            CrossProcessLockConfig::multi_process("second"),
834        );
835
836        // `lock1` acquires the lock.
837        let guard1 = lock1.try_lock_once().await?.expect("lock must be obtained successfully");
838        assert_matches!(guard1, CrossProcessLockState::Clean(_));
839        assert!(lock1.is_dirty().not());
840
841        // `lock2` cannot acquire the lock.
842        let err = lock2.try_lock_once().await?.expect_err("lock must NOT be obtained");
843        assert_matches!(err, CrossProcessLockUnobtained::Busy);
844
845        // `lock2` is waiting in a task.
846        let lock2_clone = lock2.clone();
847        let task = spawn(async move { lock2_clone.spin_lock(Some(500)).await });
848
849        yield_now().await;
850
851        drop(guard1);
852
853        // Once `lock1` is released, `lock2` managed to obtain it.
854        let guard2 = task
855            .await
856            .expect("join handle is properly awaited")
857            .expect("lock is successfully attempted")
858            .expect("lock must be obtained successfully");
859        assert_matches!(guard2, CrossProcessLockState::Clean(_));
860
861        // `lock1` and `lock2` are both clean!
862        assert!(lock1.is_dirty().not());
863        assert!(lock2.is_dirty().not());
864
865        // Now if `lock1` tries to obtain the lock with a small timeout, it will fail.
866        assert_matches!(
867            lock1.spin_lock(Some(200)).await,
868            Ok(Err(CrossProcessLockUnobtained::TimedOut))
869        );
870
871        Ok(())
872    }
873
874    #[async_test]
875    async fn test_multiple_processes_up_to_dirty() -> TestResult {
876        let store = TestStore::default();
877        let lock1 = CrossProcessLock::new(
878            store.clone(),
879            "key".to_owned(),
880            CrossProcessLockConfig::multi_process("first"),
881        );
882        let lock2 = CrossProcessLock::new(
883            store,
884            "key".to_owned(),
885            CrossProcessLockConfig::multi_process("second"),
886        );
887
888        // Obtain `lock1` once.
889        {
890            let guard = lock1.try_lock_once().await?.expect("lock must be obtained successfully");
891            assert_matches!(guard, CrossProcessLockState::Clean(_));
892            assert!(lock1.is_dirty().not());
893            drop(guard);
894
895            yield_now().await;
896        }
897
898        // Obtain `lock2` once.
899        {
900            let guard = lock2.try_lock_once().await?.expect("lock must be obtained successfully");
901            assert_matches!(guard, CrossProcessLockState::Clean(_));
902            assert!(lock1.is_dirty().not());
903            drop(guard);
904
905            yield_now().await;
906        }
907
908        for _ in 0..3 {
909            // Obtain `lock1` once more. Now it's dirty because `lock2` has acquired the
910            // lock meanwhile.
911            {
912                let guard =
913                    lock1.try_lock_once().await?.expect("lock must be obtained successfully");
914                assert_matches!(guard, CrossProcessLockState::Dirty(_));
915                assert!(lock1.is_dirty());
916
917                drop(guard);
918                yield_now().await;
919            }
920
921            // Obtain `lock1` once more! It still dirty because it has not been marked as
922            // non-dirty.
923            {
924                let guard =
925                    lock1.try_lock_once().await?.expect("lock must be obtained successfully");
926                assert_matches!(guard, CrossProcessLockState::Dirty(_));
927                assert!(lock1.is_dirty());
928                lock1.clear_dirty();
929
930                drop(guard);
931                yield_now().await;
932            }
933
934            // Obtain `lock1` once more. Now it's clear!
935            {
936                let guard =
937                    lock1.try_lock_once().await?.expect("lock must be obtained successfully");
938                assert_matches!(guard, CrossProcessLockState::Clean(_));
939                assert!(lock1.is_dirty().not());
940
941                drop(guard);
942                yield_now().await;
943            }
944
945            // Same dance with `lock2`!
946            {
947                let guard =
948                    lock2.try_lock_once().await?.expect("lock must be obtained successfully");
949                assert_matches!(guard, CrossProcessLockState::Dirty(_));
950                assert!(lock2.is_dirty());
951                lock2.clear_dirty();
952
953                drop(guard);
954                yield_now().await;
955            }
956        }
957
958        Ok(())
959    }
960}
961
962/// Some code that is shared by almost all `MemoryStore` implementations out
963/// there.
964pub mod memory_store_helper {
965    use std::collections::{HashMap, hash_map::Entry};
966
967    use ruma::time::{Duration, Instant};
968
969    use super::{CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION};
970
971    #[derive(Debug)]
972    pub struct Lease {
973        holder: String,
974        expiration: Instant,
975        generation: CrossProcessLockGeneration,
976    }
977
978    pub fn try_take_leased_lock(
979        leases: &mut HashMap<String, Lease>,
980        lease_duration_ms: u32,
981        key: &str,
982        holder: &str,
983    ) -> Option<CrossProcessLockGeneration> {
984        let now = Instant::now();
985        let expiration = now + Duration::from_millis(lease_duration_ms.into());
986
987        match leases.entry(key.to_owned()) {
988            // There is an existing holder.
989            Entry::Occupied(mut entry) => {
990                let Lease {
991                    holder: current_holder,
992                    expiration: current_expiration,
993                    generation: current_generation,
994                } = entry.get_mut();
995
996                if current_holder == holder {
997                    // We had the lease before, extend it.
998                    *current_expiration = expiration;
999
1000                    Some(*current_generation)
1001                } else {
1002                    // We didn't have it.
1003                    if *current_expiration < now {
1004                        // Steal it!
1005                        *current_holder = holder.to_owned();
1006                        *current_expiration = expiration;
1007                        *current_generation += 1;
1008
1009                        Some(*current_generation)
1010                    } else {
1011                        // We tried our best.
1012                        None
1013                    }
1014                }
1015            }
1016
1017            // There is no holder, easy.
1018            Entry::Vacant(entry) => {
1019                entry.insert(Lease {
1020                    holder: holder.to_owned(),
1021                    expiration: Instant::now() + Duration::from_millis(lease_duration_ms.into()),
1022                    generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
1023                });
1024
1025                Some(FIRST_CROSS_PROCESS_LOCK_GENERATION)
1026            }
1027        }
1028    }
1029}