matrix_sdk_common/cross_process_lock.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cross-process lock implementation.
16//!
17//! This is a per-process lock that may be used only for very specific use
18//! cases, where multiple processes might concurrently write to the same
19//! database at the same time; this would invalidate store caches, so
20//! that should be done mindfully. Such a lock can be acquired multiple times by
21//! the same process, and it remains active as long as there's at least one user
22//! in a given process.
23//!
24//! The lock is implemented using time-based leases. The lock maintains the lock
25//! identifier (key), who's the current holder (value), and an expiration
26//! timestamp on the side; see also `CryptoStore::try_take_leased_lock` for more
27//! details.
28//!
29//! The lock is initially acquired for a certain period of time (namely, the
30//! duration of a lease, aka `LEASE_DURATION_MS`), and then a “heartbeat” task
31//! renews the lease to extend its duration, every so often (namely, every
32//! `EXTEND_LEASE_EVERY_MS`). Since the Tokio scheduler might be busy, the
33//! extension request should happen way more frequently than the duration of a
34//! lease, in case a deadline is missed. The current values have been chosen to
35//! reflect that, with a ratio of 1:10 as of 2023-06-23.
36//!
37//! Releasing the lock happens naturally, by not renewing a lease. It happens
38//! automatically after the duration of the last lease, at most.
39
40use std::{
41 error::Error,
42 future::Future,
43 sync::{
44 Arc,
45 atomic::{self, AtomicU32},
46 },
47 time::Duration,
48};
49
50use tokio::sync::Mutex;
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54 SendOutsideWasm,
55 executor::{JoinHandle, spawn},
56 sleep::sleep,
57};
58
59/// Trait used to try to take a lock. Foundation of [`CrossProcessLock`].
60pub trait TryLock {
61 #[cfg(not(target_family = "wasm"))]
62 type LockError: Error + Send + Sync;
63
64 #[cfg(target_family = "wasm")]
65 type LockError: Error;
66
67 /// Try to take a leased lock.
68 ///
69 /// This attempts to take a lock for the given lease duration.
70 ///
71 /// - If we already had the lease, this will extend the lease.
72 /// - If we didn't, but the previous lease has expired, we will acquire the
73 /// lock.
74 /// - If there was no previous lease, we will acquire the lock.
75 /// - Otherwise, we don't get the lock.
76 ///
77 /// Returns whether taking the lock succeeded.
78 fn try_lock(
79 &self,
80 lease_duration_ms: u32,
81 key: &str,
82 holder: &str,
83 ) -> impl Future<Output = Result<bool, Self::LockError>> + SendOutsideWasm;
84}
85
86/// Small state machine to handle wait times.
87#[derive(Clone, Debug)]
88enum WaitingTime {
89 /// Some time to wait, in milliseconds.
90 Some(u32),
91 /// Stop waiting when seeing this value.
92 Stop,
93}
94
95/// A guard of a cross-process lock.
96///
97/// The lock will be automatically released a short period of time after all the
98/// guards have dropped.
99#[derive(Debug)]
100pub struct CrossProcessLockGuard {
101 num_holders: Arc<AtomicU32>,
102}
103
104impl Drop for CrossProcessLockGuard {
105 fn drop(&mut self) {
106 self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst);
107 }
108}
109
110/// A store-based lock for a `Store`.
111///
112/// See the doc-comment of this module for more information.
113#[derive(Clone, Debug)]
114pub struct CrossProcessLock<L>
115where
116 L: TryLock + Clone + SendOutsideWasm + 'static,
117{
118 /// The locker implementation.
119 ///
120 /// `L` is responsible for trying to take the lock, while
121 /// [`CrossProcessLock`] is responsible to make it cross-process, with the
122 /// retry mechanism, plus guard and so on.
123 locker: L,
124
125 /// Number of holders of the lock in this process.
126 ///
127 /// If greater than 0, this means we've already acquired this lock, in this
128 /// process, and the store lock mustn't be touched.
129 ///
130 /// When the number of holders is decreased to 0, then the lock must be
131 /// released in the store.
132 num_holders: Arc<AtomicU32>,
133
134 /// A mutex to control an attempt to take the lock, to avoid making it
135 /// reentrant.
136 locking_attempt: Arc<Mutex<()>>,
137
138 /// Current renew task spawned by `try_lock_once`.
139 renew_task: Arc<Mutex<Option<JoinHandle<()>>>>,
140
141 /// The key used in the key/value mapping for the lock entry.
142 lock_key: String,
143
144 /// A specific value to identify the lock's holder.
145 lock_holder: String,
146
147 /// Backoff time, in milliseconds.
148 backoff: Arc<Mutex<WaitingTime>>,
149}
150
151/// Amount of time a lease of the lock should last, in milliseconds.
152pub const LEASE_DURATION_MS: u32 = 500;
153
154/// Period of time between two attempts to extend the lease. We'll
155/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
156/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
157/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
158/// that we can miss a deadline without compromising the lock.
159pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
160
161/// Initial backoff, in milliseconds. This is the time we wait the first
162/// time, if taking the lock initially failed.
163const INITIAL_BACKOFF_MS: u32 = 10;
164
165/// Maximal backoff, in milliseconds. This is the maximum amount of time
166/// we'll wait for the lock, *between two attempts*.
167pub const MAX_BACKOFF_MS: u32 = 1000;
168
169impl<L> CrossProcessLock<L>
170where
171 L: TryLock + Clone + SendOutsideWasm + 'static,
172{
173 /// Create a new cross-process lock.
174 ///
175 /// # Parameters
176 ///
177 /// - `lock_key`: key in the key-value store to store the lock's state.
178 /// - `lock_holder`: identify the lock's holder with this given value.
179 pub fn new(locker: L, lock_key: String, lock_holder: String) -> Self {
180 Self {
181 locker,
182 lock_key,
183 lock_holder,
184 backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
185 num_holders: Arc::new(0.into()),
186 locking_attempt: Arc::new(Mutex::new(())),
187 renew_task: Default::default(),
188 }
189 }
190
191 /// Try to lock once, returns whether the lock was obtained or not.
192 #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
193 pub async fn try_lock_once(
194 &self,
195 ) -> Result<Option<CrossProcessLockGuard>, CrossProcessLockError> {
196 // Hold onto the locking attempt mutex for the entire lifetime of this
197 // function, to avoid multiple reentrant calls.
198 let mut _attempt = self.locking_attempt.lock().await;
199
200 // If another thread obtained the lock, make sure to only superficially increase
201 // the number of holders, and carry on.
202 if self.num_holders.load(atomic::Ordering::SeqCst) > 0 {
203 // Note: between the above load and the fetch_add below, another thread may
204 // decrement `num_holders`. That's fine because that means the lock
205 // was taken by at least one thread, and after this call it will be
206 // taken by at least one thread.
207 trace!("We already had the lock, incrementing holder count");
208 self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
209 let guard = CrossProcessLockGuard { num_holders: self.num_holders.clone() };
210 return Ok(Some(guard));
211 }
212
213 let acquired = self
214 .locker
215 .try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder)
216 .await
217 .map_err(|err| CrossProcessLockError::TryLockError(Box::new(err)))?;
218
219 if !acquired {
220 trace!("Couldn't acquire the lock immediately.");
221 return Ok(None);
222 }
223
224 trace!("Acquired the lock, spawning the lease extension task.");
225
226 // This is the first time we've acquired the lock. We're going to spawn the task
227 // that will renew the lease.
228
229 // Clone data to be owned by the task.
230 let this = (*self).clone();
231
232 let mut renew_task = self.renew_task.lock().await;
233
234 // Cancel the previous task, if any. That's safe to do, because:
235 // - either the task was done,
236 // - or it was still running, but taking a lock in the db has to be an atomic
237 // operation running in a transaction.
238
239 if let Some(_prev) = renew_task.take() {
240 #[cfg(not(target_family = "wasm"))]
241 if !_prev.is_finished() {
242 trace!("aborting the previous renew task");
243 _prev.abort();
244 }
245 }
246
247 // Restart a new one.
248 *renew_task = Some(spawn(async move {
249 loop {
250 {
251 // First, check if there are still users of this lock.
252 //
253 // This is not racy, because:
254 // - the `locking_attempt` mutex makes sure we don't have unexpected
255 // interactions with the non-atomic sequence above in `try_lock_once`
256 // (check > 0, then add 1).
257 // - other entities holding onto the `num_holders` atomic will only
258 // decrease it over time.
259
260 let _guard = this.locking_attempt.lock().await;
261
262 // If there are no more users, we can quit.
263 if this.num_holders.load(atomic::Ordering::SeqCst) == 0 {
264 trace!("exiting the lease extension loop");
265
266 // Cancel the lease with another 0ms lease.
267 // If we don't get the lock, that's (weird but) fine.
268 let fut = this.locker.try_lock(0, &this.lock_key, &this.lock_holder);
269 let _ = fut.await;
270
271 // Exit the loop.
272 break;
273 }
274 }
275
276 sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
277
278 let fut =
279 this.locker.try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder);
280
281 if let Err(err) = fut.await {
282 error!("error when extending lock lease: {err:#}");
283 // Exit the loop.
284 break;
285 }
286 }
287 }));
288
289 self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
290
291 let guard = CrossProcessLockGuard { num_holders: self.num_holders.clone() };
292 Ok(Some(guard))
293 }
294
295 /// Attempt to take the lock, with exponential backoff if the lock has
296 /// already been taken before.
297 ///
298 /// The `max_backoff` parameter is the maximum time (in milliseconds) that
299 /// should be waited for, between two attempts. When that time is
300 /// reached a second time, the lock will stop attempting to get the lock
301 /// and will return a timeout error upon locking. If not provided,
302 /// will wait for [`MAX_BACKOFF_MS`].
303 #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
304 pub async fn spin_lock(
305 &self,
306 max_backoff: Option<u32>,
307 ) -> Result<CrossProcessLockGuard, CrossProcessLockError> {
308 let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
309
310 // Note: reads/writes to the backoff are racy across threads in theory, but the
311 // lock in `try_lock_once` should sequentialize it all.
312
313 loop {
314 if let Some(guard) = self.try_lock_once().await? {
315 // Reset backoff before returning, for the next attempt to lock.
316 *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
317 return Ok(guard);
318 }
319
320 // Exponential backoff! Multiply by 2 the time we've waited before, cap it to
321 // max_backoff.
322 let mut backoff = self.backoff.lock().await;
323
324 let wait = match &mut *backoff {
325 WaitingTime::Some(val) => {
326 let wait = *val;
327 *val = val.saturating_mul(2);
328 if *val >= max_backoff {
329 *backoff = WaitingTime::Stop;
330 }
331 wait
332 }
333 WaitingTime::Stop => {
334 // We've reached the maximum backoff, abandon.
335 return Err(CrossProcessLockError::LockTimeout);
336 }
337 };
338
339 debug!("Waiting {wait} before re-attempting to take the lock");
340 sleep(Duration::from_millis(wait.into())).await;
341 }
342 }
343
344 /// Returns the value in the database that represents the holder's
345 /// identifier.
346 pub fn lock_holder(&self) -> &str {
347 &self.lock_holder
348 }
349}
350
351/// Error related to the locking API of the store.
352#[derive(Debug, thiserror::Error)]
353pub enum CrossProcessLockError {
354 /// Spent too long waiting for a database lock.
355 #[error("a lock timed out")]
356 LockTimeout,
357
358 #[error(transparent)]
359 #[cfg(not(target_family = "wasm"))]
360 TryLockError(#[from] Box<dyn Error + Send + Sync>),
361
362 #[error(transparent)]
363 #[cfg(target_family = "wasm")]
364 TryLockError(Box<dyn Error>),
365}
366
367#[cfg(test)]
368#[cfg(not(target_family = "wasm"))] // These tests require tokio::time, which is not implemented on wasm.
369mod tests {
370 use std::{
371 collections::HashMap,
372 sync::{Arc, RwLock, atomic},
373 time::Instant,
374 };
375
376 use assert_matches::assert_matches;
377 use matrix_sdk_test_macros::async_test;
378 use tokio::{
379 spawn,
380 time::{Duration, sleep},
381 };
382
383 use super::{
384 CrossProcessLock, CrossProcessLockError, CrossProcessLockGuard, EXTEND_LEASE_EVERY_MS,
385 TryLock, memory_store_helper::try_take_leased_lock,
386 };
387
388 #[derive(Clone, Default)]
389 struct TestStore {
390 leases: Arc<RwLock<HashMap<String, (String, Instant)>>>,
391 }
392
393 impl TestStore {
394 fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool {
395 try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
396 }
397 }
398
399 #[derive(Debug, thiserror::Error)]
400 enum DummyError {}
401
402 impl TryLock for TestStore {
403 type LockError = DummyError;
404
405 /// Try to take a lock using the given store.
406 async fn try_lock(
407 &self,
408 lease_duration_ms: u32,
409 key: &str,
410 holder: &str,
411 ) -> Result<bool, Self::LockError> {
412 Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
413 }
414 }
415
416 async fn release_lock(guard: Option<CrossProcessLockGuard>) {
417 drop(guard);
418 sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
419 }
420
421 type TestResult = Result<(), CrossProcessLockError>;
422
423 #[async_test]
424 async fn test_simple_lock_unlock() -> TestResult {
425 let store = TestStore::default();
426 let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned());
427
428 // The lock plain works when used with a single holder.
429 let acquired = lock.try_lock_once().await?;
430 assert!(acquired.is_some());
431 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
432
433 // Releasing works.
434 release_lock(acquired).await;
435 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
436
437 // Spin locking on the same lock always works, assuming no concurrent access.
438 let acquired = lock.spin_lock(None).await.unwrap();
439
440 // Releasing still works.
441 release_lock(Some(acquired)).await;
442 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
443
444 Ok(())
445 }
446
447 #[async_test]
448 async fn test_self_recovery() -> TestResult {
449 let store = TestStore::default();
450 let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
451
452 // When a lock is acquired...
453 let acquired = lock.try_lock_once().await?;
454 assert!(acquired.is_some());
455 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
456
457 // But then forgotten... (note: no need to release the guard)
458 drop(lock);
459
460 // And when rematerializing the lock with the same key/value...
461 let lock = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
462
463 // We still got it.
464 let acquired = lock.try_lock_once().await?;
465 assert!(acquired.is_some());
466 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
467
468 Ok(())
469 }
470
471 #[async_test]
472 async fn test_multiple_holders_same_process() -> TestResult {
473 let store = TestStore::default();
474 let lock = CrossProcessLock::new(store, "key".to_owned(), "first".to_owned());
475
476 // Taking the lock twice...
477 let acquired = lock.try_lock_once().await?;
478 assert!(acquired.is_some());
479
480 let acquired2 = lock.try_lock_once().await?;
481 assert!(acquired2.is_some());
482
483 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2);
484
485 // ...means we can release it twice.
486 release_lock(acquired).await;
487 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
488
489 release_lock(acquired2).await;
490 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
491
492 Ok(())
493 }
494
495 #[async_test]
496 async fn test_multiple_processes() -> TestResult {
497 let store = TestStore::default();
498 let lock1 = CrossProcessLock::new(store.clone(), "key".to_owned(), "first".to_owned());
499 let lock2 = CrossProcessLock::new(store, "key".to_owned(), "second".to_owned());
500
501 // When the first process takes the lock...
502 let acquired1 = lock1.try_lock_once().await?;
503 assert!(acquired1.is_some());
504
505 // The second can't take it immediately.
506 let acquired2 = lock2.try_lock_once().await?;
507 assert!(acquired2.is_none());
508
509 let lock2_clone = lock2.clone();
510 let handle = spawn(async move { lock2_clone.spin_lock(Some(1000)).await });
511
512 sleep(Duration::from_millis(100)).await;
513
514 drop(acquired1);
515
516 // lock2 in the background manages to get the lock at some point.
517 let _acquired2 = handle
518 .await
519 .expect("join handle is properly awaited")
520 .expect("lock was obtained after spin-locking");
521
522 // Now if lock1 tries to get the lock with a small timeout, it will fail.
523 assert_matches!(lock1.spin_lock(Some(200)).await, Err(CrossProcessLockError::LockTimeout));
524
525 Ok(())
526 }
527}
528
529/// Some code that is shared by almost all `MemoryStore` implementations out
530/// there.
531pub mod memory_store_helper {
532 use std::collections::{HashMap, hash_map::Entry};
533
534 use ruma::time::{Duration, Instant};
535
536 pub fn try_take_leased_lock(
537 leases: &mut HashMap<String, (String, Instant)>,
538 lease_duration_ms: u32,
539 key: &str,
540 holder: &str,
541 ) -> bool {
542 let now = Instant::now();
543 let expiration = now + Duration::from_millis(lease_duration_ms.into());
544
545 match leases.entry(key.to_owned()) {
546 // There is an existing holder.
547 Entry::Occupied(mut entry) => {
548 let (current_holder, current_expiration) = entry.get_mut();
549
550 if current_holder == holder {
551 // We had the lease before, extend it.
552 *current_expiration = expiration;
553
554 true
555 } else {
556 // We didn't have it.
557 if *current_expiration < now {
558 // Steal it!
559 *current_holder = holder.to_owned();
560 *current_expiration = expiration;
561
562 true
563 } else {
564 // We tried our best.
565 false
566 }
567 }
568 }
569
570 // There is no holder, easy.
571 Entry::Vacant(entry) => {
572 entry.insert((
573 holder.to_owned(),
574 Instant::now() + Duration::from_millis(lease_duration_ms.into()),
575 ));
576
577 true
578 }
579 }
580 }
581}