matrix_sdk_common/cross_process_lock.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cross-process lock implementation.
16//!
17//! This is a per-process lock that may be used only for very specific use
18//! cases, where multiple processes might concurrently write to the same
19//! database at the same time; this would invalidate store caches, so
20//! that should be done mindfully. Such a lock can be obtained multiple times by
21//! the same process, and it remains active as long as there's at least one user
22//! in a given process.
23//!
24//! The lock is implemented using time-based leases. The lock maintains the lock
25//! identifier (key), who's the current holder (value), and an expiration
26//! timestamp on the side; see also `CryptoStore::try_take_leased_lock` for more
27//! details.
28//!
29//! The lock is initially obtained for a certain period of time (namely, the
30//! duration of a lease, aka `LEASE_DURATION_MS`), and then a “heartbeat” task
31//! renews the lease to extend its duration, every so often (namely, every
32//! `EXTEND_LEASE_EVERY_MS`). Since the Tokio scheduler might be busy, the
33//! extension request should happen way more frequently than the duration of a
34//! lease, in case a deadline is missed. The current values have been chosen to
35//! reflect that, with a ratio of 1:10 as of 2023-06-23.
36//!
37//! Releasing the lock happens naturally, by not renewing a lease. It happens
38//! automatically after the duration of the last lease, at most.
39
40use std::{
41 error::Error,
42 future::Future,
43 sync::{
44 Arc,
45 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
46 },
47 time::Duration,
48};
49
50use tokio::sync::Mutex;
51use tracing::{debug, error, instrument, trace, warn};
52
53use crate::{
54 SendOutsideWasm,
55 executor::{JoinHandle, spawn},
56 sleep::sleep,
57};
58
59/// A lock generation is an integer incremented each time the lock is taken by
60/// a different holder.
61///
62/// This is used to know if a lock has been dirtied.
63pub type CrossProcessLockGeneration = u64;
64
65/// A trait that represents any function which can be used to
66/// acquire the underlying lock of a [`CrossProcessLock`].
67///
68/// For example, this can be useful when writing a function which
69/// is parameterized to acquire the underlying lock through either
70/// [`CrossProcessLock::spin_lock`] or [`CrossProcessLock::try_lock_once`].
71pub trait AcquireCrossProcessLockFn<L>
72where
73 Self: AsyncFn(&CrossProcessLock<L>) -> AcquireCrossProcessLockResult<L::LockError>,
74 L: TryLock + Clone + SendOutsideWasm + 'static,
75{
76}
77
78impl<L, T> AcquireCrossProcessLockFn<L> for T
79where
80 T: AsyncFn(&CrossProcessLock<L>) -> AcquireCrossProcessLockResult<L::LockError>,
81 L: TryLock + Clone + SendOutsideWasm + 'static,
82{
83}
84
85/// A convenience type for the [`Result`] returned from calling
86/// or [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
87pub type AcquireCrossProcessLockResult<E> =
88 Result<Result<CrossProcessLockState, CrossProcessLockUnobtained>, E>;
89
90/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
91pub trait TryLock {
92 #[cfg(not(target_family = "wasm"))]
93 type LockError: Error + Send + Sync;
94
95 #[cfg(target_family = "wasm")]
96 type LockError: Error;
97
98 /// Try to take a leased lock.
99 ///
100 /// This attempts to take a lock for the given lease duration.
101 ///
102 /// - If we already had the lease, this will extend the lease.
103 /// - If we didn't, but the previous lease has expired, we will obtain the
104 /// lock.
105 /// - If there was no previous lease, we will obtain the lock.
106 /// - Otherwise, we don't get the lock.
107 ///
108 /// Returns `Some(_)` to indicate the lock succeeded, `None` otherwise. The
109 /// cross-process lock generation must be compared to the generation before
110 /// the call to see if the lock has been dirtied: a different generation
111 /// means the lock has been dirtied, i.e. taken by a different holder in
112 /// the meantime.
113 fn try_lock(
114 &self,
115 lease_duration_ms: u32,
116 key: &str,
117 holder: &str,
118 ) -> impl Future<Output = Result<Option<CrossProcessLockGeneration>, Self::LockError>>
119 + SendOutsideWasm;
120}
121
122/// Small state machine to handle wait times.
123#[derive(Clone, Debug)]
124enum WaitingTime {
125 /// Some time to wait, in milliseconds.
126 Some(u32),
127 /// Stop waiting when seeing this value.
128 Stop,
129}
130
131/// A guard of a cross-process lock.
132///
133/// The lock will be automatically released a short period of time after all the
134/// guards have dropped.
135#[derive(Clone, Debug)]
136#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
137pub struct CrossProcessLockGuard {
138 /// A clone of [`CrossProcessLock::num_holders`].
139 num_holders: Arc<AtomicU32>,
140
141 /// A clone of [`CrossProcessLock::is_dirty`].
142 is_dirty: Arc<AtomicBool>,
143}
144
145impl CrossProcessLockGuard {
146 fn new(num_holders: Arc<AtomicU32>, is_dirty: Arc<AtomicBool>) -> Self {
147 Self { num_holders, is_dirty }
148 }
149
150 /// Determine whether the cross-process lock associated to this guard is
151 /// dirty.
152 ///
153 /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
154 /// of _dirty_.
155 pub fn is_dirty(&self) -> bool {
156 self.is_dirty.load(Ordering::SeqCst)
157 }
158
159 /// Clear the dirty state from the cross-process lock associated to this
160 /// guard.
161 ///
162 /// If the cross-process lock is dirtied, it will remain dirtied until
163 /// this method is called. This allows recovering from a dirty state and
164 /// marking that it has recovered.
165 pub fn clear_dirty(&self) {
166 self.is_dirty.store(false, Ordering::SeqCst);
167 }
168}
169
170impl Drop for CrossProcessLockGuard {
171 fn drop(&mut self) {
172 self.num_holders.fetch_sub(1, Ordering::SeqCst);
173 }
174}
175
176/// A cross-process lock implementation.
177///
178/// See the doc-comment of this module for more information.
179#[derive(Clone, Debug)]
180pub struct CrossProcessLock<L>
181where
182 L: TryLock + Clone + SendOutsideWasm + 'static,
183{
184 /// The locker implementation.
185 ///
186 /// `L` is responsible for trying to take the lock, while
187 /// [`CrossProcessLock`] is responsible to make it cross-process, with the
188 /// retry mechanism, plus guard and so on.
189 locker: L,
190
191 /// Number of holders of the lock in this process.
192 ///
193 /// If greater than 0, this means we've already obtained this lock, in this
194 /// process, and the store lock mustn't be touched.
195 ///
196 /// When the number of holders is decreased to 0, then the lock must be
197 /// released in the store.
198 num_holders: Arc<AtomicU32>,
199
200 /// A mutex to control an attempt to take the lock, to avoid making it
201 /// reentrant.
202 locking_attempt: Arc<Mutex<()>>,
203
204 /// Current renew task spawned by `try_lock_once`.
205 renew_task: Arc<Mutex<Option<JoinHandle<()>>>>,
206
207 /// The key used in the key/value mapping for the lock entry.
208 lock_key: String,
209
210 /// The cross-process lock configuration.
211 config: CrossProcessLockConfig,
212
213 /// Backoff time, in milliseconds.
214 backoff: Arc<Mutex<WaitingTime>>,
215
216 /// This lock generation.
217 generation: Arc<AtomicU64>,
218
219 /// Whether the lock has been dirtied.
220 ///
221 /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
222 /// of _dirty_.
223 is_dirty: Arc<AtomicBool>,
224}
225
226/// Amount of time a lease of the lock should last, in milliseconds.
227pub const LEASE_DURATION_MS: u32 = 500;
228
229/// Period of time between two attempts to extend the lease. We'll
230/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
231/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
232/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
233/// that we can miss a deadline without compromising the lock.
234pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
235
236/// Initial backoff, in milliseconds. This is the time we wait the first
237/// time, if taking the lock initially failed.
238const INITIAL_BACKOFF_MS: u32 = 10;
239
240/// Maximal backoff, in milliseconds. This is the maximum amount of time
241/// we'll wait for the lock, *between two attempts*.
242pub const MAX_BACKOFF_MS: u32 = 1000;
243
244/// Sentinel value representing the absence of a lock generation value.
245///
246/// When the lock is created, it has no generation. Once locked, it receives its
247/// first generation from [`TryLock::try_lock`]. Subsequent lockings may
248/// generate new lock generation. The generation is incremented by 1 every time.
249///
250/// The first generation is defined by [`FIRST_CROSS_PROCESS_LOCK_GENERATION`].
251pub const NO_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 0;
252
253/// Describe the first lock generation value (see
254/// [`CrossProcessLockGeneration`]).
255pub const FIRST_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 1;
256
257impl<L> CrossProcessLock<L>
258where
259 L: TryLock + Clone + SendOutsideWasm + 'static,
260{
261 /// Create a new cross-process lock.
262 ///
263 /// # Parameters
264 ///
265 /// - `lock_key`: key in the key-value store to store the lock's state.
266 /// - `config`: the cross-process lock configuration to use, if it's
267 /// [`CrossProcessLockConfig::SingleProcess`], no actual lock will be
268 /// taken.
269 pub fn new(locker: L, lock_key: String, config: CrossProcessLockConfig) -> Self {
270 Self {
271 locker,
272 lock_key,
273 config,
274 backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
275 num_holders: Arc::new(0.into()),
276 locking_attempt: Arc::new(Mutex::new(())),
277 renew_task: Default::default(),
278 generation: Arc::new(AtomicU64::new(NO_CROSS_PROCESS_LOCK_GENERATION)),
279 is_dirty: Arc::new(AtomicBool::new(false)),
280 }
281 }
282
283 /// Determine whether the cross-process lock is dirty.
284 ///
285 /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
286 /// of _dirty_.
287 pub fn is_dirty(&self) -> bool {
288 self.is_dirty.load(Ordering::SeqCst)
289 }
290
291 /// Clear the dirty state from this cross-process lock.
292 ///
293 /// If the cross-process lock is dirtied, it will remain dirtied until
294 /// this method is called. This allows recovering from a dirty state and
295 /// marking that it has recovered.
296 pub fn clear_dirty(&self) {
297 self.is_dirty.store(false, Ordering::SeqCst);
298 }
299
300 /// Try to lock once, returns whether the lock was obtained or not.
301 ///
302 /// The lock can be obtained but it can be dirty. In all cases, the renew
303 /// task will run in the background.
304 #[instrument(skip(self), fields(?self.lock_key, ?self.config, ?self.generation))]
305 pub async fn try_lock_once(&self) -> AcquireCrossProcessLockResult<L::LockError> {
306 // If it's not `MultiProcess`, this behaves as a no-op
307 let CrossProcessLockConfig::MultiProcess { holder_name } = &self.config else {
308 let guard = CrossProcessLockGuard::new(self.num_holders.clone(), self.is_dirty.clone());
309 return Ok(Ok(CrossProcessLockState::Clean(guard)));
310 };
311
312 // Hold onto the locking attempt mutex for the entire lifetime of this
313 // function, to avoid multiple reentrant calls.
314 let mut _attempt = self.locking_attempt.lock().await;
315
316 // If another thread obtained the lock, make sure to only superficially increase
317 // the number of holders, and carry on.
318 if self.num_holders.load(Ordering::SeqCst) > 0 {
319 // Note: between the above load and the fetch_add below, another thread may
320 // decrement `num_holders`. That's fine because that means the lock
321 // was taken by at least one thread, and after this call it will be
322 // taken by at least one thread.
323 trace!("We already had the lock, incrementing holder count");
324
325 self.num_holders.fetch_add(1, Ordering::SeqCst);
326
327 return Ok(Ok(CrossProcessLockState::Clean(CrossProcessLockGuard::new(
328 self.num_holders.clone(),
329 self.is_dirty.clone(),
330 ))));
331 }
332
333 if let Some(new_generation) =
334 self.locker.try_lock(LEASE_DURATION_MS, &self.lock_key, holder_name).await?
335 {
336 match self.generation.swap(new_generation, Ordering::SeqCst) {
337 // If there was no lock generation, it means this is the first time the lock is
338 // obtained. It cannot be dirty.
339 NO_CROSS_PROCESS_LOCK_GENERATION => {
340 trace!(?new_generation, "Setting the lock generation for the first time");
341 }
342
343 // This was NOT the same generation, the lock has been dirtied!
344 previous_generation if previous_generation != new_generation => {
345 warn!(
346 ?previous_generation,
347 ?new_generation,
348 "The lock has been obtained, but it's been dirtied!"
349 );
350 self.is_dirty.store(true, Ordering::SeqCst);
351 }
352
353 // This was the same generation, no problem.
354 _ => {
355 trace!("Same lock generation; no problem");
356 }
357 }
358
359 trace!("Lock obtained!");
360 } else {
361 trace!("Couldn't obtain the lock immediately.");
362 return Ok(Err(CrossProcessLockUnobtained::Busy));
363 }
364
365 trace!("Obtained the lock, spawning the lease extension task.");
366
367 // This is the first time we've obtaind the lock. We're going to spawn the task
368 // that will renew the lease.
369
370 // Clone data to be owned by the task.
371 let this = (*self).clone();
372
373 let mut renew_task = self.renew_task.lock().await;
374
375 // Cancel the previous task, if any. That's safe to do, because:
376 // - either the task was done,
377 // - or it was still running, but taking a lock in the db has to be an atomic
378 // operation running in a transaction.
379
380 if let Some(_prev) = renew_task.take() {
381 #[cfg(not(target_family = "wasm"))]
382 if !_prev.is_finished() {
383 trace!("aborting the previous renew task");
384 _prev.abort();
385 }
386 }
387
388 // Restart a new one.
389 *renew_task = Some(spawn(async move {
390 let CrossProcessLockConfig::MultiProcess { holder_name } = this.config else { return };
391 loop {
392 {
393 // First, check if there are still users of this lock.
394 //
395 // This is not racy, because:
396 // - the `locking_attempt` mutex makes sure we don't have unexpected
397 // interactions with the non-atomic sequence above in `try_lock_once`
398 // (check > 0, then add 1).
399 // - other entities holding onto the `num_holders` atomic will only
400 // decrease it over time.
401
402 let _guard = this.locking_attempt.lock().await;
403
404 // If there are no more users, we can quit.
405 if this.num_holders.load(Ordering::SeqCst) == 0 {
406 trace!("exiting the lease extension loop");
407
408 // Cancel the lease with another 0ms lease.
409 // If we don't get the lock, that's (weird but) fine.
410 let fut = this.locker.try_lock(0, &this.lock_key, &holder_name);
411 let _ = fut.await;
412
413 // Exit the loop.
414 break;
415 }
416 }
417
418 sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
419
420 match this.locker.try_lock(LEASE_DURATION_MS, &this.lock_key, &holder_name).await {
421 Ok(Some(_generation)) => {
422 // It's impossible that the generation can be
423 // different from the previous generation.
424 //
425 // As long as the task runs, the lock is renewed, so
426 // the generation remains the same. If the lock is not
427 // taken, it's because the lease has expired, which
428 // is represented by the
429 // `Ok(None)` value, and the task
430 // must stop.
431 }
432
433 Ok(None) => {
434 error!("Failed to renew the lock lease: the lock could not be obtained");
435
436 // Exit the loop.
437 break;
438 }
439
440 Err(err) => {
441 error!("Error when extending the lock lease: {err:#}");
442
443 // Exit the loop.
444 break;
445 }
446 }
447 }
448 }));
449
450 self.num_holders.fetch_add(1, Ordering::SeqCst);
451
452 let guard = CrossProcessLockGuard::new(self.num_holders.clone(), self.is_dirty.clone());
453
454 Ok(Ok(if self.is_dirty() {
455 CrossProcessLockState::Dirty(guard)
456 } else {
457 CrossProcessLockState::Clean(guard)
458 }))
459 }
460
461 /// Attempt to take the lock, with exponential backoff if the lock has
462 /// already been taken before.
463 ///
464 /// The `max_backoff` parameter is the maximum time (in milliseconds) that
465 /// should be waited for, between two attempts. When that time is
466 /// reached a second time, the lock will stop attempting to get the lock
467 /// and will return a timeout error upon locking. If not provided,
468 /// will wait for [`MAX_BACKOFF_MS`].
469 #[instrument(skip(self), fields(?self.lock_key, ?self.config))]
470 pub async fn spin_lock(
471 &self,
472 max_backoff: Option<u32>,
473 ) -> AcquireCrossProcessLockResult<L::LockError> {
474 // If there is no holder, this behaves as a no-op
475 let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
476
477 // Note: reads/writes to the backoff are racy across threads in theory, but the
478 // lock in `try_lock_once` should sequentialize it all.
479
480 loop {
481 // If the cross-process lock config is not `MultiProcess`, this behaves as a
482 // no-op and we just return
483 let lock_result = self.try_lock_once().await?;
484
485 if lock_result.is_ok() {
486 if matches!(self.config, CrossProcessLockConfig::MultiProcess { .. }) {
487 // Reset backoff before returning, for the next attempt to lock.
488 *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
489 }
490
491 return Ok(lock_result);
492 }
493
494 // Exponential backoff! Multiply by 2 the time we've waited before, cap it to
495 // max_backoff.
496 let mut backoff = self.backoff.lock().await;
497
498 let wait = match &mut *backoff {
499 WaitingTime::Some(val) => {
500 let wait = *val;
501 *val = val.saturating_mul(2);
502 if *val >= max_backoff {
503 *backoff = WaitingTime::Stop;
504 }
505 wait
506 }
507 WaitingTime::Stop => {
508 // We've reached the maximum backoff, abandon.
509 return Ok(Err(CrossProcessLockUnobtained::TimedOut));
510 }
511 };
512
513 debug!("Waiting {wait} before re-attempting to take the lock");
514 sleep(Duration::from_millis(wait.into())).await;
515 }
516 }
517
518 /// Returns the value in the database that represents the holder's
519 /// identifier.
520 pub fn lock_holder(&self) -> Option<&str> {
521 self.config.holder_name()
522 }
523}
524
525/// Represent a successful result of a locking attempt, either by
526/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
527#[derive(Debug)]
528#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
529pub enum CrossProcessLockState {
530 /// The lock has been obtained successfully, all good.
531 Clean(CrossProcessLockGuard),
532
533 /// The lock has been obtained successfully, but the lock is dirty!
534 ///
535 /// This holder has obtained this cross-process lock once, then another
536 /// holder has obtained this cross-process lock _before_ this holder
537 /// obtained it again. The lock is marked as dirty. It means the value
538 /// protected by the cross-process lock may need to be reloaded if
539 /// synchronisation is important.
540 ///
541 /// Until [`CrossProcessLock::clear_dirty`] is called,
542 /// [`CrossProcessLock::is_dirty`], [`CrossProcessLock::try_lock_once`] and
543 /// [`CrossProcessLock::spin_lock`] will report the lock as dirty. Put it
544 /// differently: dirty once, dirty forever, unless
545 /// [`CrossProcessLock::clear_dirty`] is called.
546 Dirty(CrossProcessLockGuard),
547}
548
549impl CrossProcessLockState {
550 /// Map this value into the inner [`CrossProcessLockGuard`].
551 pub fn into_guard(self) -> CrossProcessLockGuard {
552 match self {
553 Self::Clean(guard) | Self::Dirty(guard) => guard,
554 }
555 }
556
557 /// Map this [`CrossProcessLockState`] into a
558 /// [`MappedCrossProcessLockState`].
559 ///
560 /// This is helpful when one wants to create its own wrapper over
561 /// [`CrossProcessLockGuard`].
562 pub fn map<F, G>(self, mapper: F) -> MappedCrossProcessLockState<G>
563 where
564 F: FnOnce(CrossProcessLockGuard) -> G,
565 {
566 match self {
567 Self::Clean(guard) => MappedCrossProcessLockState::Clean(mapper(guard)),
568 Self::Dirty(guard) => MappedCrossProcessLockState::Dirty(mapper(guard)),
569 }
570 }
571}
572
573/// A mapped [`CrossProcessLockState`].
574///
575/// Created by [`CrossProcessLockState::map`].
576#[derive(Debug)]
577#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
578pub enum MappedCrossProcessLockState<G> {
579 /// The equivalent of [`CrossProcessLockState::Clean`].
580 Clean(G),
581
582 /// The equivalent of [`CrossProcessLockState::Dirty`].
583 Dirty(G),
584}
585
586impl<G> MappedCrossProcessLockState<G> {
587 /// Return `Some(G)` if `Self` is [`Clean`][Self::Clean].
588 pub fn as_clean(&self) -> Option<&G> {
589 match self {
590 Self::Clean(guard) => Some(guard),
591 Self::Dirty(_) => None,
592 }
593 }
594}
595
596/// Represent an unsuccessful result of a lock attempt, either by
597/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
598#[derive(Clone, Debug, thiserror::Error)]
599pub enum CrossProcessLockUnobtained {
600 /// The lock couldn't be obtained immediately because it is busy, i.e. it is
601 /// held by another holder.
602 #[error(
603 "The lock couldn't be obtained immediately because it is busy, i.e. it is held by another holder"
604 )]
605 Busy,
606
607 /// The lock couldn't be obtained after several attempts: locking has timed
608 /// out.
609 #[error("The lock couldn't be obtained after several attempts: locking has timed out")]
610 TimedOut,
611}
612
613/// Union of [`CrossProcessLockUnobtained`] and [`TryLock::LockError`].
614#[derive(Clone, Debug, thiserror::Error)]
615pub enum CrossProcessLockError {
616 #[error(transparent)]
617 Unobtained(#[from] CrossProcessLockUnobtained),
618
619 #[error(transparent)]
620 #[cfg(not(target_family = "wasm"))]
621 TryLock(#[from] Arc<dyn Error + Send + Sync>),
622
623 #[error(transparent)]
624 #[cfg(target_family = "wasm")]
625 TryLock(#[from] Arc<dyn Error>),
626}
627
628/// The cross-process lock config to use for the various stores.
629#[derive(Clone, Debug)]
630pub enum CrossProcessLockConfig {
631 /// The stores will be used in multiple processes, the holder name for the
632 /// cross-process lock is the associated `String`.
633 MultiProcess {
634 /// The name of the holder of the cross-process lock.
635 holder_name: String,
636 },
637 /// The stores will be used in a single process, there is no need for a
638 /// cross-process lock.
639 SingleProcess,
640}
641
642impl CrossProcessLockConfig {
643 /// Helper for quickly creating a [`CrossProcessLockConfig::MultiProcess`]
644 /// variant.
645 pub fn multi_process(holder_name: impl Into<String>) -> Self {
646 Self::MultiProcess { holder_name: holder_name.into() }
647 }
648
649 /// The holder name for the cross-process lock. This is only relevant for
650 /// [`CrossProcessLockConfig::MultiProcess`] variants.
651 pub fn holder_name(&self) -> Option<&str> {
652 match self {
653 Self::MultiProcess { holder_name } => Some(holder_name),
654 Self::SingleProcess => None,
655 }
656 }
657}
658
659#[cfg(test)]
660#[cfg(not(target_family = "wasm"))] // These tests require tokio::time, which is not implemented on wasm.
661mod tests {
662 use std::{
663 collections::HashMap,
664 ops::Not,
665 sync::{Arc, RwLock, atomic},
666 };
667
668 use assert_matches::assert_matches;
669 use matrix_sdk_test_macros::async_test;
670 use tokio::{spawn, task::yield_now};
671
672 use super::{
673 CrossProcessLock, CrossProcessLockConfig, CrossProcessLockError,
674 CrossProcessLockGeneration, CrossProcessLockState, CrossProcessLockUnobtained, TryLock,
675 memory_store_helper::{Lease, try_take_leased_lock},
676 };
677
678 #[derive(Clone, Default)]
679 struct TestStore {
680 leases: Arc<RwLock<HashMap<String, Lease>>>,
681 }
682
683 impl TestStore {
684 fn try_take_leased_lock(
685 &self,
686 lease_duration_ms: u32,
687 key: &str,
688 holder: &str,
689 ) -> Option<CrossProcessLockGeneration> {
690 try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
691 }
692 }
693
694 #[derive(Debug, thiserror::Error)]
695 enum DummyError {}
696
697 impl From<DummyError> for CrossProcessLockError {
698 fn from(value: DummyError) -> Self {
699 Self::TryLock(Arc::new(value))
700 }
701 }
702
703 impl TryLock for TestStore {
704 type LockError = DummyError;
705
706 /// Try to take a lock using the given store.
707 async fn try_lock(
708 &self,
709 lease_duration_ms: u32,
710 key: &str,
711 holder: &str,
712 ) -> Result<Option<CrossProcessLockGeneration>, Self::LockError> {
713 Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
714 }
715 }
716
717 async fn release_lock(lock: CrossProcessLockState) {
718 drop(lock);
719 yield_now().await;
720 }
721
722 type TestResult = Result<(), CrossProcessLockError>;
723
724 #[async_test]
725 async fn test_simple_lock_unlock() -> TestResult {
726 let store = TestStore::default();
727 let lock = CrossProcessLock::new(
728 store,
729 "key".to_owned(),
730 CrossProcessLockConfig::multi_process("first"),
731 );
732
733 // The lock plain works when used with a single holder.
734 let guard = lock.try_lock_once().await?.expect("lock must be obtained successfully");
735 assert_matches!(guard, CrossProcessLockState::Clean(_));
736 assert!(lock.is_dirty().not());
737 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
738
739 // Releasing works.
740 release_lock(guard).await;
741 assert!(lock.is_dirty().not());
742 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
743
744 // Spin locking on the same lock always works, assuming no concurrent access.
745 let guard = lock.spin_lock(None).await?.expect("spin lock must be obtained successfully");
746 assert!(lock.is_dirty().not());
747 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
748
749 // Releasing still works.
750 release_lock(guard).await;
751 assert!(lock.is_dirty().not());
752 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
753
754 Ok(())
755 }
756
757 #[async_test]
758 async fn test_self_recovery() -> TestResult {
759 let store = TestStore::default();
760 let lock = CrossProcessLock::new(
761 store.clone(),
762 "key".to_owned(),
763 CrossProcessLockConfig::multi_process("first"),
764 );
765
766 // When a lock is obtained…
767 let guard = lock.try_lock_once().await?.expect("lock must be obtained successfully");
768 assert_matches!(guard, CrossProcessLockState::Clean(_));
769 assert!(lock.is_dirty().not());
770 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
771
772 // But then forgotten… (note: no need to release the guard)
773 drop(lock);
774
775 // And when rematerializing the lock with the same key/value…
776 let lock = CrossProcessLock::new(
777 store.clone(),
778 "key".to_owned(),
779 CrossProcessLockConfig::multi_process("first"),
780 );
781
782 // We still got it.
783 let guard =
784 lock.try_lock_once().await?.expect("lock (again) must be obtained successfully");
785 assert_matches!(guard, CrossProcessLockState::Clean(_));
786 assert!(lock.is_dirty().not());
787 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
788
789 Ok(())
790 }
791
792 #[async_test]
793 async fn test_multiple_holders_same_process() -> TestResult {
794 let store = TestStore::default();
795 let lock = CrossProcessLock::new(
796 store,
797 "key".to_owned(),
798 CrossProcessLockConfig::multi_process("first"),
799 );
800
801 // Taking the lock twice…
802 let guard1 = lock.try_lock_once().await?.expect("lock must be obtained successfully");
803 assert_matches!(guard1, CrossProcessLockState::Clean(_));
804 let guard2 = lock.try_lock_once().await?.expect("lock must be obtained successfully");
805 assert_matches!(guard2, CrossProcessLockState::Clean(_));
806 assert!(lock.is_dirty().not());
807
808 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2);
809
810 // … means we can release it twice.
811 release_lock(guard1).await;
812 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
813
814 release_lock(guard2).await;
815 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
816
817 assert!(lock.is_dirty().not());
818
819 Ok(())
820 }
821
822 #[async_test]
823 async fn test_multiple_processes() -> TestResult {
824 let store = TestStore::default();
825 let lock1 = CrossProcessLock::new(
826 store.clone(),
827 "key".to_owned(),
828 CrossProcessLockConfig::multi_process("first"),
829 );
830 let lock2 = CrossProcessLock::new(
831 store,
832 "key".to_owned(),
833 CrossProcessLockConfig::multi_process("second"),
834 );
835
836 // `lock1` acquires the lock.
837 let guard1 = lock1.try_lock_once().await?.expect("lock must be obtained successfully");
838 assert_matches!(guard1, CrossProcessLockState::Clean(_));
839 assert!(lock1.is_dirty().not());
840
841 // `lock2` cannot acquire the lock.
842 let err = lock2.try_lock_once().await?.expect_err("lock must NOT be obtained");
843 assert_matches!(err, CrossProcessLockUnobtained::Busy);
844
845 // `lock2` is waiting in a task.
846 let lock2_clone = lock2.clone();
847 let task = spawn(async move { lock2_clone.spin_lock(Some(500)).await });
848
849 yield_now().await;
850
851 drop(guard1);
852
853 // Once `lock1` is released, `lock2` managed to obtain it.
854 let guard2 = task
855 .await
856 .expect("join handle is properly awaited")
857 .expect("lock is successfully attempted")
858 .expect("lock must be obtained successfully");
859 assert_matches!(guard2, CrossProcessLockState::Clean(_));
860
861 // `lock1` and `lock2` are both clean!
862 assert!(lock1.is_dirty().not());
863 assert!(lock2.is_dirty().not());
864
865 // Now if `lock1` tries to obtain the lock with a small timeout, it will fail.
866 assert_matches!(
867 lock1.spin_lock(Some(200)).await,
868 Ok(Err(CrossProcessLockUnobtained::TimedOut))
869 );
870
871 Ok(())
872 }
873
874 #[async_test]
875 async fn test_multiple_processes_up_to_dirty() -> TestResult {
876 let store = TestStore::default();
877 let lock1 = CrossProcessLock::new(
878 store.clone(),
879 "key".to_owned(),
880 CrossProcessLockConfig::multi_process("first"),
881 );
882 let lock2 = CrossProcessLock::new(
883 store,
884 "key".to_owned(),
885 CrossProcessLockConfig::multi_process("second"),
886 );
887
888 // Obtain `lock1` once.
889 {
890 let guard = lock1.try_lock_once().await?.expect("lock must be obtained successfully");
891 assert_matches!(guard, CrossProcessLockState::Clean(_));
892 assert!(lock1.is_dirty().not());
893 drop(guard);
894
895 yield_now().await;
896 }
897
898 // Obtain `lock2` once.
899 {
900 let guard = lock2.try_lock_once().await?.expect("lock must be obtained successfully");
901 assert_matches!(guard, CrossProcessLockState::Clean(_));
902 assert!(lock1.is_dirty().not());
903 drop(guard);
904
905 yield_now().await;
906 }
907
908 for _ in 0..3 {
909 // Obtain `lock1` once more. Now it's dirty because `lock2` has acquired the
910 // lock meanwhile.
911 {
912 let guard =
913 lock1.try_lock_once().await?.expect("lock must be obtained successfully");
914 assert_matches!(guard, CrossProcessLockState::Dirty(_));
915 assert!(lock1.is_dirty());
916
917 drop(guard);
918 yield_now().await;
919 }
920
921 // Obtain `lock1` once more! It still dirty because it has not been marked as
922 // non-dirty.
923 {
924 let guard =
925 lock1.try_lock_once().await?.expect("lock must be obtained successfully");
926 assert_matches!(guard, CrossProcessLockState::Dirty(_));
927 assert!(lock1.is_dirty());
928 lock1.clear_dirty();
929
930 drop(guard);
931 yield_now().await;
932 }
933
934 // Obtain `lock1` once more. Now it's clear!
935 {
936 let guard =
937 lock1.try_lock_once().await?.expect("lock must be obtained successfully");
938 assert_matches!(guard, CrossProcessLockState::Clean(_));
939 assert!(lock1.is_dirty().not());
940
941 drop(guard);
942 yield_now().await;
943 }
944
945 // Same dance with `lock2`!
946 {
947 let guard =
948 lock2.try_lock_once().await?.expect("lock must be obtained successfully");
949 assert_matches!(guard, CrossProcessLockState::Dirty(_));
950 assert!(lock2.is_dirty());
951 lock2.clear_dirty();
952
953 drop(guard);
954 yield_now().await;
955 }
956 }
957
958 Ok(())
959 }
960}
961
962/// Some code that is shared by almost all `MemoryStore` implementations out
963/// there.
964pub mod memory_store_helper {
965 use std::collections::{HashMap, hash_map::Entry};
966
967 use ruma::time::{Duration, Instant};
968
969 use super::{CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION};
970
971 #[derive(Debug)]
972 pub struct Lease {
973 holder: String,
974 expiration: Instant,
975 generation: CrossProcessLockGeneration,
976 }
977
978 pub fn try_take_leased_lock(
979 leases: &mut HashMap<String, Lease>,
980 lease_duration_ms: u32,
981 key: &str,
982 holder: &str,
983 ) -> Option<CrossProcessLockGeneration> {
984 let now = Instant::now();
985 let expiration = now + Duration::from_millis(lease_duration_ms.into());
986
987 match leases.entry(key.to_owned()) {
988 // There is an existing holder.
989 Entry::Occupied(mut entry) => {
990 let Lease {
991 holder: current_holder,
992 expiration: current_expiration,
993 generation: current_generation,
994 } = entry.get_mut();
995
996 if current_holder == holder {
997 // We had the lease before, extend it.
998 *current_expiration = expiration;
999
1000 Some(*current_generation)
1001 } else {
1002 // We didn't have it.
1003 if *current_expiration < now {
1004 // Steal it!
1005 *current_holder = holder.to_owned();
1006 *current_expiration = expiration;
1007 *current_generation += 1;
1008
1009 Some(*current_generation)
1010 } else {
1011 // We tried our best.
1012 None
1013 }
1014 }
1015 }
1016
1017 // There is no holder, easy.
1018 Entry::Vacant(entry) => {
1019 entry.insert(Lease {
1020 holder: holder.to_owned(),
1021 expiration: Instant::now() + Duration::from_millis(lease_duration_ms.into()),
1022 generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
1023 });
1024
1025 Some(FIRST_CROSS_PROCESS_LOCK_GENERATION)
1026 }
1027 }
1028 }
1029}