matrix_sdk_common/cross_process_lock.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cross-process lock implementation.
16//!
17//! This is a per-process lock that may be used only for very specific use
18//! cases, where multiple processes might concurrently write to the same
19//! database at the same time; this would invalidate store caches, so
20//! that should be done mindfully. Such a lock can be obtained multiple times by
21//! the same process, and it remains active as long as there's at least one user
22//! in a given process.
23//!
24//! The lock is implemented using time-based leases. The lock maintains the lock
25//! identifier (key), who's the current holder (value), and an expiration
26//! timestamp on the side; see also `CryptoStore::try_take_leased_lock` for more
27//! details.
28//!
29//! The lock is initially obtained for a certain period of time (namely, the
30//! duration of a lease, aka `LEASE_DURATION_MS`), and then a “heartbeat” task
31//! renews the lease to extend its duration, every so often (namely, every
32//! `EXTEND_LEASE_EVERY_MS`). Since the Tokio scheduler might be busy, the
33//! extension request should happen way more frequently than the duration of a
34//! lease, in case a deadline is missed. The current values have been chosen to
35//! reflect that, with a ratio of 1:10 as of 2023-06-23.
36//!
37//! Releasing the lock happens naturally, by not renewing a lease. It happens
38//! automatically after the duration of the last lease, at most.
39
40use std::{
41 error::Error,
42 future::Future,
43 sync::{
44 Arc,
45 atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering},
46 },
47 time::Duration,
48};
49
50use tokio::sync::Mutex;
51use tracing::{debug, error, instrument, trace, warn};
52
53use crate::{
54 SendOutsideWasm,
55 executor::{JoinHandle, spawn},
56 sleep::sleep,
57};
58
59/// A lock generation is an integer incremented each time the lock is taken by
60/// a different holder.
61///
62/// This is used to know if a lock has been dirtied.
63pub type CrossProcessLockGeneration = u64;
64
65/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
66pub trait TryLock {
67 #[cfg(not(target_family = "wasm"))]
68 type LockError: Error + Send + Sync;
69
70 #[cfg(target_family = "wasm")]
71 type LockError: Error;
72
73 /// Try to take a leased lock.
74 ///
75 /// This attempts to take a lock for the given lease duration.
76 ///
77 /// - If we already had the lease, this will extend the lease.
78 /// - If we didn't, but the previous lease has expired, we will obtain the
79 /// lock.
80 /// - If there was no previous lease, we will obtain the lock.
81 /// - Otherwise, we don't get the lock.
82 ///
83 /// Returns `Some(_)` to indicate the lock succeeded, `None` otherwise. The
84 /// cross-process lock generation must be compared to the generation before
85 /// the call to see if the lock has been dirtied: a different generation
86 /// means the lock has been dirtied, i.e. taken by a different holder in
87 /// the meantime.
88 fn try_lock(
89 &self,
90 lease_duration_ms: u32,
91 key: &str,
92 holder: &str,
93 ) -> impl Future<Output = Result<Option<CrossProcessLockGeneration>, Self::LockError>>
94 + SendOutsideWasm;
95}
96
97/// Small state machine to handle wait times.
98#[derive(Clone, Debug)]
99enum WaitingTime {
100 /// Some time to wait, in milliseconds.
101 Some(u32),
102 /// Stop waiting when seeing this value.
103 Stop,
104}
105
106/// A guard of a cross-process lock.
107///
108/// The lock will be automatically released a short period of time after all the
109/// guards have dropped.
110#[derive(Clone, Debug)]
111#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
112pub struct CrossProcessLockGuard {
113 /// A clone of [`CrossProcessLock::num_holders`].
114 num_holders: Arc<AtomicU32>,
115
116 /// A clone of [`CrossProcessLock::is_dirty`].
117 is_dirty: Arc<AtomicBool>,
118}
119
120impl CrossProcessLockGuard {
121 fn new(num_holders: Arc<AtomicU32>, is_dirty: Arc<AtomicBool>) -> Self {
122 Self { num_holders, is_dirty }
123 }
124
125 /// Determine whether the cross-process lock associated to this guard is
126 /// dirty.
127 ///
128 /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
129 /// of _dirty_.
130 pub fn is_dirty(&self) -> bool {
131 self.is_dirty.load(Ordering::SeqCst)
132 }
133
134 /// Clear the dirty state from the cross-process lock associated to this
135 /// guard.
136 ///
137 /// If the cross-process lock is dirtied, it will remain dirtied until
138 /// this method is called. This allows recovering from a dirty state and
139 /// marking that it has recovered.
140 pub fn clear_dirty(&self) {
141 self.is_dirty.store(false, Ordering::SeqCst);
142 }
143}
144
145impl Drop for CrossProcessLockGuard {
146 fn drop(&mut self) {
147 self.num_holders.fetch_sub(1, Ordering::SeqCst);
148 }
149}
150
151/// A cross-process lock implementation.
152///
153/// See the doc-comment of this module for more information.
154#[derive(Clone, Debug)]
155pub struct CrossProcessLock<L>
156where
157 L: TryLock + Clone + SendOutsideWasm + 'static,
158{
159 /// The locker implementation.
160 ///
161 /// `L` is responsible for trying to take the lock, while
162 /// [`CrossProcessLock`] is responsible to make it cross-process, with the
163 /// retry mechanism, plus guard and so on.
164 locker: L,
165
166 /// Number of holders of the lock in this process.
167 ///
168 /// If greater than 0, this means we've already obtained this lock, in this
169 /// process, and the store lock mustn't be touched.
170 ///
171 /// When the number of holders is decreased to 0, then the lock must be
172 /// released in the store.
173 num_holders: Arc<AtomicU32>,
174
175 /// A mutex to control an attempt to take the lock, to avoid making it
176 /// reentrant.
177 locking_attempt: Arc<Mutex<()>>,
178
179 /// Current renew task spawned by `try_lock_once`.
180 renew_task: Arc<Mutex<Option<JoinHandle<()>>>>,
181
182 /// The key used in the key/value mapping for the lock entry.
183 lock_key: String,
184
185 /// A specific value to identify the lock's holder.
186 lock_holder: String,
187
188 /// Backoff time, in milliseconds.
189 backoff: Arc<Mutex<WaitingTime>>,
190
191 /// This lock generation.
192 generation: Arc<AtomicU64>,
193
194 /// Whether the lock has been dirtied.
195 ///
196 /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
197 /// of _dirty_.
198 is_dirty: Arc<AtomicBool>,
199}
200
201/// Amount of time a lease of the lock should last, in milliseconds.
202pub const LEASE_DURATION_MS: u32 = 500;
203
204/// Period of time between two attempts to extend the lease. We'll
205/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
206/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
207/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
208/// that we can miss a deadline without compromising the lock.
209pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
210
211/// Initial backoff, in milliseconds. This is the time we wait the first
212/// time, if taking the lock initially failed.
213const INITIAL_BACKOFF_MS: u32 = 10;
214
215/// Maximal backoff, in milliseconds. This is the maximum amount of time
216/// we'll wait for the lock, *between two attempts*.
217pub const MAX_BACKOFF_MS: u32 = 1000;
218
219/// Sentinel value representing the absence of a lock generation value.
220///
221/// When the lock is created, it has no generation. Once locked, it receives its
222/// first generation from [`TryLock::try_lock`]. Subsequent lockings may
223/// generate new lock generation. The generation is incremented by 1 every time.
224///
225/// The first generation is defined by [`FIRST_CROSS_PROCESS_LOCK_GENERATION`].
226pub const NO_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 0;
227
228/// Describe the first lock generation value (see
229/// [`CrossProcessLockGeneration`]).
230pub const FIRST_CROSS_PROCESS_LOCK_GENERATION: CrossProcessLockGeneration = 1;
231
232impl<L> CrossProcessLock<L>
233where
234 L: TryLock + Clone + SendOutsideWasm + 'static,
235{
236 /// Create a new cross-process lock.
237 ///
238 /// # Parameters
239 ///
240 /// - `lock_key`: key in the key-value store to store the lock's state.
241 /// - `lock_holder`: identify the lock's holder with this given value.
242 pub fn new(locker: L, lock_key: String, lock_holder: String) -> Self {
243 Self {
244 locker,
245 lock_key,
246 lock_holder,
247 backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
248 num_holders: Arc::new(0.into()),
249 locking_attempt: Arc::new(Mutex::new(())),
250 renew_task: Default::default(),
251 generation: Arc::new(AtomicU64::new(NO_CROSS_PROCESS_LOCK_GENERATION)),
252 is_dirty: Arc::new(AtomicBool::new(false)),
253 }
254 }
255
256 /// Determine whether the cross-process lock is dirty.
257 ///
258 /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
259 /// of _dirty_.
260 pub fn is_dirty(&self) -> bool {
261 self.is_dirty.load(Ordering::SeqCst)
262 }
263
264 /// Clear the dirty state from this cross-process lock.
265 ///
266 /// If the cross-process lock is dirtied, it will remain dirtied until
267 /// this method is called. This allows recovering from a dirty state and
268 /// marking that it has recovered.
269 pub fn clear_dirty(&self) {
270 self.is_dirty.store(false, Ordering::SeqCst);
271 }
272
273 /// Try to lock once, returns whether the lock was obtained or not.
274 ///
275 /// The lock can be obtained but it can be dirty. In all cases, the renew
276 /// task will run in the background.
277 #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
278 pub async fn try_lock_once(
279 &self,
280 ) -> Result<Result<CrossProcessLockState, CrossProcessLockUnobtained>, L::LockError> {
281 // Hold onto the locking attempt mutex for the entire lifetime of this
282 // function, to avoid multiple reentrant calls.
283 let mut _attempt = self.locking_attempt.lock().await;
284
285 // If another thread obtained the lock, make sure to only superficially increase
286 // the number of holders, and carry on.
287 if self.num_holders.load(Ordering::SeqCst) > 0 {
288 // Note: between the above load and the fetch_add below, another thread may
289 // decrement `num_holders`. That's fine because that means the lock
290 // was taken by at least one thread, and after this call it will be
291 // taken by at least one thread.
292 trace!("We already had the lock, incrementing holder count");
293
294 self.num_holders.fetch_add(1, Ordering::SeqCst);
295
296 return Ok(Ok(CrossProcessLockState::Clean(CrossProcessLockGuard::new(
297 self.num_holders.clone(),
298 self.is_dirty.clone(),
299 ))));
300 }
301
302 if let Some(new_generation) =
303 self.locker.try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder).await?
304 {
305 match self.generation.swap(new_generation, Ordering::SeqCst) {
306 // If there was no lock generation, it means this is the first time the lock is
307 // obtained. It cannot be dirty.
308 NO_CROSS_PROCESS_LOCK_GENERATION => {
309 trace!(?new_generation, "Setting the lock generation for the first time");
310 }
311
312 // This was NOT the same generation, the lock has been dirtied!
313 previous_generation if previous_generation != new_generation => {
314 warn!(
315 ?previous_generation,
316 ?new_generation,
317 "The lock has been obtained, but it's been dirtied!"
318 );
319 self.is_dirty.store(true, Ordering::SeqCst);
320 }
321
322 // This was the same generation, no problem.
323 _ => {
324 trace!("Same lock generation; no problem");
325 }
326 }
327
328 trace!("Lock obtained!");
329 } else {
330 trace!("Couldn't obtain the lock immediately.");
331 return Ok(Err(CrossProcessLockUnobtained::Busy));
332 }
333
334 trace!("Obtained the lock, spawning the lease extension task.");
335
336 // This is the first time we've obtaind the lock. We're going to spawn the task
337 // that will renew the lease.
338
339 // Clone data to be owned by the task.
340 let this = (*self).clone();
341
342 let mut renew_task = self.renew_task.lock().await;
343
344 // Cancel the previous task, if any. That's safe to do, because:
345 // - either the task was done,
346 // - or it was still running, but taking a lock in the db has to be an atomic
347 // operation running in a transaction.
348
349 if let Some(_prev) = renew_task.take() {
350 #[cfg(not(target_family = "wasm"))]
351 if !_prev.is_finished() {
352 trace!("aborting the previous renew task");
353 _prev.abort();
354 }
355 }
356
357 // Restart a new one.
358 *renew_task = Some(spawn(async move {
359 loop {
360 {
361 // First, check if there are still users of this lock.
362 //
363 // This is not racy, because:
364 // - the `locking_attempt` mutex makes sure we don't have unexpected
365 // interactions with the non-atomic sequence above in `try_lock_once`
366 // (check > 0, then add 1).
367 // - other entities holding onto the `num_holders` atomic will only
368 // decrease it over time.
369
370 let _guard = this.locking_attempt.lock().await;
371
372 // If there are no more users, we can quit.
373 if this.num_holders.load(Ordering::SeqCst) == 0 {
374 trace!("exiting the lease extension loop");
375
376 // Cancel the lease with another 0ms lease.
377 // If we don't get the lock, that's (weird but) fine.
378 let fut = this.locker.try_lock(0, &this.lock_key, &this.lock_holder);
379 let _ = fut.await;
380
381 // Exit the loop.
382 break;
383 }
384 }
385
386 sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
387
388 match this
389 .locker
390 .try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder)
391 .await
392 {
393 Ok(Some(_generation)) => {
394 // It's impossible that the generation can be different
395 // from the previous generation.
396 //
397 // As long as the task runs, the lock is renewed, so the
398 // generation remains the same. If the lock is not
399 // taken, it's because the lease has expired, which is
400 // represented by the `Ok(None)` value, and the task
401 // must stop.
402 }
403
404 Ok(None) => {
405 error!("Failed to renew the lock lease: the lock could not be obtained");
406
407 // Exit the loop.
408 break;
409 }
410
411 Err(err) => {
412 error!("Error when extending the lock lease: {err:#}");
413
414 // Exit the loop.
415 break;
416 }
417 }
418 }
419 }));
420
421 self.num_holders.fetch_add(1, Ordering::SeqCst);
422
423 let guard = CrossProcessLockGuard::new(self.num_holders.clone(), self.is_dirty.clone());
424
425 Ok(Ok(if self.is_dirty() {
426 CrossProcessLockState::Dirty(guard)
427 } else {
428 CrossProcessLockState::Clean(guard)
429 }))
430 }
431
432 /// Attempt to take the lock, with exponential backoff if the lock has
433 /// already been taken before.
434 ///
435 /// The `max_backoff` parameter is the maximum time (in milliseconds) that
436 /// should be waited for, between two attempts. When that time is
437 /// reached a second time, the lock will stop attempting to get the lock
438 /// and will return a timeout error upon locking. If not provided,
439 /// will wait for [`MAX_BACKOFF_MS`].
440 #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
441 pub async fn spin_lock(
442 &self,
443 max_backoff: Option<u32>,
444 ) -> Result<Result<CrossProcessLockState, CrossProcessLockUnobtained>, L::LockError> {
445 let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
446
447 // Note: reads/writes to the backoff are racy across threads in theory, but the
448 // lock in `try_lock_once` should sequentialize it all.
449
450 loop {
451 let lock_result = self.try_lock_once().await?;
452
453 if lock_result.is_ok() {
454 // Reset backoff before returning, for the next attempt to lock.
455 *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
456
457 return Ok(lock_result);
458 }
459
460 // Exponential backoff! Multiply by 2 the time we've waited before, cap it to
461 // max_backoff.
462 let mut backoff = self.backoff.lock().await;
463
464 let wait = match &mut *backoff {
465 WaitingTime::Some(val) => {
466 let wait = *val;
467 *val = val.saturating_mul(2);
468 if *val >= max_backoff {
469 *backoff = WaitingTime::Stop;
470 }
471 wait
472 }
473 WaitingTime::Stop => {
474 // We've reached the maximum backoff, abandon.
475 return Ok(Err(CrossProcessLockUnobtained::TimedOut));
476 }
477 };
478
479 debug!("Waiting {wait} before re-attempting to take the lock");
480 sleep(Duration::from_millis(wait.into())).await;
481 }
482 }
483
484 /// Returns the value in the database that represents the holder's
485 /// identifier.
486 pub fn lock_holder(&self) -> &str {
487 &self.lock_holder
488 }
489}
490
491/// Represent a successful result of a locking attempt, either by
492/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
493#[derive(Debug)]
494#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
495pub enum CrossProcessLockState {
496 /// The lock has been obtained successfully, all good.
497 Clean(CrossProcessLockGuard),
498
499 /// The lock has been obtained successfully, but the lock is dirty!
500 ///
501 /// This holder has obtained this cross-process lock once, then another
502 /// holder has obtained this cross-process lock _before_ this holder
503 /// obtained it again. The lock is marked as dirty. It means the value
504 /// protected by the cross-process lock may need to be reloaded if
505 /// synchronisation is important.
506 ///
507 /// Until [`CrossProcessLock::clear_dirty`] is called,
508 /// [`CrossProcessLock::is_dirty`], [`CrossProcessLock::try_lock_once`] and
509 /// [`CrossProcessLock::spin_lock`] will report the lock as dirty. Put it
510 /// differently: dirty once, dirty forever, unless
511 /// [`CrossProcessLock::clear_dirty`] is called.
512 Dirty(CrossProcessLockGuard),
513}
514
515impl CrossProcessLockState {
516 /// Map this value into the inner [`CrossProcessLockGuard`].
517 pub fn into_guard(self) -> CrossProcessLockGuard {
518 match self {
519 Self::Clean(guard) | Self::Dirty(guard) => guard,
520 }
521 }
522
523 /// Map this [`CrossProcessLockState`] into a
524 /// [`MappedCrossProcessLockState`].
525 ///
526 /// This is helpful when one wants to create its own wrapper over
527 /// [`CrossProcessLockGuard`].
528 pub fn map<F, G>(self, mapper: F) -> MappedCrossProcessLockState<G>
529 where
530 F: FnOnce(CrossProcessLockGuard) -> G,
531 {
532 match self {
533 Self::Clean(guard) => MappedCrossProcessLockState::Clean(mapper(guard)),
534 Self::Dirty(guard) => MappedCrossProcessLockState::Dirty(mapper(guard)),
535 }
536 }
537}
538
539/// A mapped [`CrossProcessLockState`].
540///
541/// Created by [`CrossProcessLockState::map`].
542#[derive(Debug)]
543#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
544pub enum MappedCrossProcessLockState<G> {
545 /// The equivalent of [`CrossProcessLockState::Clean`].
546 Clean(G),
547
548 /// The equivalent of [`CrossProcessLockState::Dirty`].
549 Dirty(G),
550}
551
552impl<G> MappedCrossProcessLockState<G> {
553 /// Return `Some(G)` if `Self` is [`Clean`][Self::Clean].
554 pub fn as_clean(&self) -> Option<&G> {
555 match self {
556 Self::Clean(guard) => Some(guard),
557 Self::Dirty(_) => None,
558 }
559 }
560}
561
562/// Represent an unsuccessful result of a lock attempt, either by
563/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
564#[derive(Debug, thiserror::Error)]
565pub enum CrossProcessLockUnobtained {
566 /// The lock couldn't be obtained immediately because it is busy, i.e. it is
567 /// held by another holder.
568 #[error(
569 "The lock couldn't be obtained immediately because it is busy, i.e. it is held by another holder"
570 )]
571 Busy,
572
573 /// The lock couldn't be obtained after several attempts: locking has timed
574 /// out.
575 #[error("The lock couldn't be obtained after several attempts: locking has timed out")]
576 TimedOut,
577}
578
579/// Union of [`CrossProcessLockUnobtained`] and [`TryLock::LockError`].
580#[derive(Debug, thiserror::Error)]
581pub enum CrossProcessLockError {
582 #[error(transparent)]
583 Unobtained(#[from] CrossProcessLockUnobtained),
584
585 #[error(transparent)]
586 #[cfg(not(target_family = "wasm"))]
587 TryLock(#[from] Box<dyn Error + Send + Sync>),
588
589 #[error(transparent)]
590 #[cfg(target_family = "wasm")]
591 TryLock(#[from] Box<dyn Error>),
592}
593
594#[cfg(test)]
595#[cfg(not(target_family = "wasm"))] // These tests require tokio::time, which is not implemented on wasm.
596mod tests {
597 use std::{
598 collections::HashMap,
599 ops::Not,
600 sync::{Arc, RwLock, atomic},
601 };
602
603 use assert_matches::assert_matches;
604 use matrix_sdk_test_macros::async_test;
605 use tokio::{spawn, task::yield_now};
606
607 use super::{
608 CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockState,
609 CrossProcessLockUnobtained, TryLock,
610 memory_store_helper::{Lease, try_take_leased_lock},
611 };
612
613 #[derive(Clone, Default)]
614 struct TestStore {
615 leases: Arc<RwLock<HashMap<String, Lease>>>,
616 }
617
618 impl TestStore {
619 fn try_take_leased_lock(
620 &self,
621 lease_duration_ms: u32,
622 key: &str,
623 holder: &str,
624 ) -> Option<CrossProcessLockGeneration> {
625 try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
626 }
627 }
628
629 #[derive(Debug, thiserror::Error)]
630 enum DummyError {}
631
632 impl From<DummyError> for CrossProcessLockError {
633 fn from(value: DummyError) -> Self {
634 Self::TryLock(Box::new(value))
635 }
636 }
637
638 impl TryLock for TestStore {
639 type LockError = DummyError;
640
641 /// Try to take a lock using the given store.
642 async fn try_lock(
643 &self,
644 lease_duration_ms: u32,
645 key: &str,
646 holder: &str,
647 ) -> Result<Option<CrossProcessLockGeneration>, Self::LockError> {
648 Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
649 }
650 }
651
652 async fn release_lock(lock: CrossProcessLockState) {
653 drop(lock);
654 yield_now().await;
655 }
656
657 type TestResult = Result<(), CrossProcessLockError>;
658
659 #[async_test]
660 async fn test_simple_lock_unlock() -> TestResult {
661 let store = TestStore::default();
662 let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned());
663
664 // The lock plain works when used with a single holder.
665 let guard = lock.try_lock_once().await?.expect("lock must be obtained successfully");
666 assert_matches!(guard, CrossProcessLockState::Clean(_));
667 assert!(lock.is_dirty().not());
668 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
669
670 // Releasing works.
671 release_lock(guard).await;
672 assert!(lock.is_dirty().not());
673 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
674
675 // Spin locking on the same lock always works, assuming no concurrent access.
676 let guard = lock.spin_lock(None).await?.expect("spin lock must be obtained successfully");
677 assert!(lock.is_dirty().not());
678 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
679
680 // Releasing still works.
681 release_lock(guard).await;
682 assert!(lock.is_dirty().not());
683 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
684
685 Ok(())
686 }
687
688 #[async_test]
689 async fn test_self_recovery() -> TestResult {
690 let store = TestStore::default();
691 let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
692
693 // When a lock is obtained…
694 let guard = lock.try_lock_once().await?.expect("lock must be obtained successfully");
695 assert_matches!(guard, CrossProcessLockState::Clean(_));
696 assert!(lock.is_dirty().not());
697 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
698
699 // But then forgotten… (note: no need to release the guard)
700 drop(lock);
701
702 // And when rematerializing the lock with the same key/value…
703 let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
704
705 // We still got it.
706 let guard =
707 lock.try_lock_once().await?.expect("lock (again) must be obtained successfully");
708 assert_matches!(guard, CrossProcessLockState::Clean(_));
709 assert!(lock.is_dirty().not());
710 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
711
712 Ok(())
713 }
714
715 #[async_test]
716 async fn test_multiple_holders_same_process() -> TestResult {
717 let store = TestStore::default();
718 let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned());
719
720 // Taking the lock twice…
721 let guard1 = lock.try_lock_once().await?.expect("lock must be obtained successfully");
722 assert_matches!(guard1, CrossProcessLockState::Clean(_));
723 let guard2 = lock.try_lock_once().await?.expect("lock must be obtained successfully");
724 assert_matches!(guard2, CrossProcessLockState::Clean(_));
725 assert!(lock.is_dirty().not());
726
727 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2);
728
729 // … means we can release it twice.
730 release_lock(guard1).await;
731 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
732
733 release_lock(guard2).await;
734 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
735
736 assert!(lock.is_dirty().not());
737
738 Ok(())
739 }
740
741 #[async_test]
742 async fn test_multiple_processes() -> TestResult {
743 let store = TestStore::default();
744 let lock1 = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
745 let lock2 = CrossProcessLock::new(store, "key".to_owned(), "second".to_owned());
746
747 // `lock1` acquires the lock.
748 let guard1 = lock1.try_lock_once().await?.expect("lock must be obtained successfully");
749 assert_matches!(guard1, CrossProcessLockState::Clean(_));
750 assert!(lock1.is_dirty().not());
751
752 // `lock2` cannot acquire the lock.
753 let err = lock2.try_lock_once().await?.expect_err("lock must NOT be obtained");
754 assert_matches!(err, CrossProcessLockUnobtained::Busy);
755
756 // `lock2` is waiting in a task.
757 let lock2_clone = lock2.clone();
758 let task = spawn(async move { lock2_clone.spin_lock(Some(500)).await });
759
760 yield_now().await;
761
762 drop(guard1);
763
764 // Once `lock1` is released, `lock2` managed to obtain it.
765 let guard2 = task
766 .await
767 .expect("join handle is properly awaited")
768 .expect("lock is successfully attempted")
769 .expect("lock must be obtained successfully");
770 assert_matches!(guard2, CrossProcessLockState::Clean(_));
771
772 // `lock1` and `lock2` are both clean!
773 assert!(lock1.is_dirty().not());
774 assert!(lock2.is_dirty().not());
775
776 // Now if `lock1` tries to obtain the lock with a small timeout, it will fail.
777 assert_matches!(
778 lock1.spin_lock(Some(200)).await,
779 Ok(Err(CrossProcessLockUnobtained::TimedOut))
780 );
781
782 Ok(())
783 }
784
785 #[async_test]
786 async fn test_multiple_processes_up_to_dirty() -> TestResult {
787 let store = TestStore::default();
788 let lock1 = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
789 let lock2 = CrossProcessLock::new(store, "key".to_owned(), "second".to_owned());
790
791 // Obtain `lock1` once.
792 {
793 let guard = lock1.try_lock_once().await?.expect("lock must be obtained successfully");
794 assert_matches!(guard, CrossProcessLockState::Clean(_));
795 assert!(lock1.is_dirty().not());
796 drop(guard);
797
798 yield_now().await;
799 }
800
801 // Obtain `lock2` once.
802 {
803 let guard = lock2.try_lock_once().await?.expect("lock must be obtained successfully");
804 assert_matches!(guard, CrossProcessLockState::Clean(_));
805 assert!(lock1.is_dirty().not());
806 drop(guard);
807
808 yield_now().await;
809 }
810
811 for _ in 0..3 {
812 // Obtain `lock1` once more. Now it's dirty because `lock2` has acquired the
813 // lock meanwhile.
814 {
815 let guard =
816 lock1.try_lock_once().await?.expect("lock must be obtained successfully");
817 assert_matches!(guard, CrossProcessLockState::Dirty(_));
818 assert!(lock1.is_dirty());
819
820 drop(guard);
821 yield_now().await;
822 }
823
824 // Obtain `lock1` once more! It still dirty because it has not been marked as
825 // non-dirty.
826 {
827 let guard =
828 lock1.try_lock_once().await?.expect("lock must be obtained successfully");
829 assert_matches!(guard, CrossProcessLockState::Dirty(_));
830 assert!(lock1.is_dirty());
831 lock1.clear_dirty();
832
833 drop(guard);
834 yield_now().await;
835 }
836
837 // Obtain `lock1` once more. Now it's clear!
838 {
839 let guard =
840 lock1.try_lock_once().await?.expect("lock must be obtained successfully");
841 assert_matches!(guard, CrossProcessLockState::Clean(_));
842 assert!(lock1.is_dirty().not());
843
844 drop(guard);
845 yield_now().await;
846 }
847
848 // Same dance with `lock2`!
849 {
850 let guard =
851 lock2.try_lock_once().await?.expect("lock must be obtained successfully");
852 assert_matches!(guard, CrossProcessLockState::Dirty(_));
853 assert!(lock2.is_dirty());
854 lock2.clear_dirty();
855
856 drop(guard);
857 yield_now().await;
858 }
859 }
860
861 Ok(())
862 }
863}
864
865/// Some code that is shared by almost all `MemoryStore` implementations out
866/// there.
867pub mod memory_store_helper {
868 use std::collections::{HashMap, hash_map::Entry};
869
870 use ruma::time::{Duration, Instant};
871
872 use super::{CrossProcessLockGeneration, FIRST_CROSS_PROCESS_LOCK_GENERATION};
873
874 #[derive(Debug)]
875 pub struct Lease {
876 holder: String,
877 expiration: Instant,
878 generation: CrossProcessLockGeneration,
879 }
880
881 pub fn try_take_leased_lock(
882 leases: &mut HashMap<String, Lease>,
883 lease_duration_ms: u32,
884 key: &str,
885 holder: &str,
886 ) -> Option<CrossProcessLockGeneration> {
887 let now = Instant::now();
888 let expiration = now + Duration::from_millis(lease_duration_ms.into());
889
890 match leases.entry(key.to_owned()) {
891 // There is an existing holder.
892 Entry::Occupied(mut entry) => {
893 let Lease {
894 holder: current_holder,
895 expiration: current_expiration,
896 generation: current_generation,
897 } = entry.get_mut();
898
899 if current_holder == holder {
900 // We had the lease before, extend it.
901 *current_expiration = expiration;
902
903 Some(*current_generation)
904 } else {
905 // We didn't have it.
906 if *current_expiration < now {
907 // Steal it!
908 *current_holder = holder.to_owned();
909 *current_expiration = expiration;
910 *current_generation += 1;
911
912 Some(*current_generation)
913 } else {
914 // We tried our best.
915 None
916 }
917 }
918 }
919
920 // There is no holder, easy.
921 Entry::Vacant(entry) => {
922 entry.insert(Lease {
923 holder: holder.to_owned(),
924 expiration: Instant::now() + Duration::from_millis(lease_duration_ms.into()),
925 generation: FIRST_CROSS_PROCESS_LOCK_GENERATION,
926 });
927
928 Some(FIRST_CROSS_PROCESS_LOCK_GENERATION)
929 }
930 }
931 }
932}