matrix_sdk_common/store_locks.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Collection of small helpers that implement store-based locks.
16//!
17//! This is a per-process lock that may be used only for very specific use
18//! cases, where multiple processes might concurrently write to the same
19//! database at the same time; this would invalidate store caches, so
20//! that should be done mindfully. Such a lock can be acquired multiple times by
21//! the same process, and it remains active as long as there's at least one user
22//! in a given process.
23//!
24//! The lock is implemented using time-based leases to values inserted in a
25//! store. The store maintains the lock identifier (key), who's the
26//! current holder (value), and an expiration timestamp on the side; see also
27//! `CryptoStore::try_take_leased_lock` for more details.
28//!
29//! The lock is initially acquired for a certain period of time (namely, the
30//! duration of a lease, aka `LEASE_DURATION_MS`), and then a "heartbeat" task
31//! renews the lease to extend its duration, every so often (namely, every
32//! `EXTEND_LEASE_EVERY_MS`). Since the tokio scheduler might be busy, the
33//! extension request should happen way more frequently than the duration of a
34//! lease, in case a deadline is missed. The current values have been chosen to
35//! reflect that, with a ratio of 1:10 as of 2023-06-23.
36//!
37//! Releasing the lock happens naturally, by not renewing a lease. It happens
38//! automatically after the duration of the last lease, at most.
39
40use std::{
41 error::Error,
42 future::Future,
43 sync::{
44 atomic::{self, AtomicU32},
45 Arc,
46 },
47 time::Duration,
48};
49
50use tokio::sync::Mutex;
51use tracing::{debug, error, instrument, trace};
52
53use crate::{
54 executor::{spawn, JoinHandle},
55 sleep::sleep,
56 SendOutsideWasm,
57};
58
59/// Backing store for a cross-process lock.
60pub trait BackingStore {
61 #[cfg(not(target_family = "wasm"))]
62 type LockError: Error + Send + Sync;
63
64 #[cfg(target_family = "wasm")]
65 type LockError: Error;
66
67 /// Try to take a lock using the given store.
68 fn try_lock(
69 &self,
70 lease_duration_ms: u32,
71 key: &str,
72 holder: &str,
73 ) -> impl Future<Output = Result<bool, Self::LockError>> + SendOutsideWasm;
74}
75
76/// Small state machine to handle wait times.
77#[derive(Clone, Debug)]
78enum WaitingTime {
79 /// Some time to wait, in milliseconds.
80 Some(u32),
81 /// Stop waiting when seeing this value.
82 Stop,
83}
84
85/// A guard on the store lock.
86///
87/// The lock will be automatically released a short period of time after all the
88/// guards have dropped.
89#[derive(Debug)]
90pub struct CrossProcessStoreLockGuard {
91 num_holders: Arc<AtomicU32>,
92}
93
94impl Drop for CrossProcessStoreLockGuard {
95 fn drop(&mut self) {
96 self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst);
97 }
98}
99
100/// A store-based lock for a `Store`.
101///
102/// See the doc-comment of this module for more information.
103#[derive(Clone, Debug)]
104pub struct CrossProcessStoreLock<S: BackingStore + Clone + SendOutsideWasm + 'static> {
105 /// The store we're using to lock.
106 store: S,
107
108 /// Number of holders of the lock in this process.
109 ///
110 /// If greater than 0, this means we've already acquired this lock, in this
111 /// process, and the store lock mustn't be touched.
112 ///
113 /// When the number of holders is decreased to 0, then the lock must be
114 /// released in the store.
115 num_holders: Arc<AtomicU32>,
116
117 /// A mutex to control an attempt to take the lock, to avoid making it
118 /// reentrant.
119 locking_attempt: Arc<Mutex<()>>,
120
121 /// Current renew task spawned by `try_lock_once`.
122 renew_task: Arc<Mutex<Option<JoinHandle<()>>>>,
123
124 /// The key used in the key/value mapping for the lock entry.
125 lock_key: String,
126
127 /// A specific value to identify the lock's holder.
128 lock_holder: String,
129
130 /// Backoff time, in milliseconds.
131 backoff: Arc<Mutex<WaitingTime>>,
132}
133
134/// Amount of time a lease of the lock should last, in milliseconds.
135pub const LEASE_DURATION_MS: u32 = 500;
136
137/// Period of time between two attempts to extend the lease. We'll
138/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
139/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
140/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
141/// that we can miss a deadline without compromising the lock.
142pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
143
144/// Initial backoff, in milliseconds. This is the time we wait the first
145/// time, if taking the lock initially failed.
146const INITIAL_BACKOFF_MS: u32 = 10;
147
148/// Maximal backoff, in milliseconds. This is the maximum amount of time
149/// we'll wait for the lock, *between two attempts*.
150pub const MAX_BACKOFF_MS: u32 = 1000;
151
152impl<S: BackingStore + Clone + SendOutsideWasm + 'static> CrossProcessStoreLock<S> {
153 /// Create a new store-based lock implemented as a value in the store.
154 ///
155 /// # Parameters
156 ///
157 /// - `lock_key`: key in the key-value store to store the lock's state.
158 /// - `lock_holder`: identify the lock's holder with this given value.
159 pub fn new(store: S, lock_key: String, lock_holder: String) -> Self {
160 Self {
161 store,
162 lock_key,
163 lock_holder,
164 backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
165 num_holders: Arc::new(0.into()),
166 locking_attempt: Arc::new(Mutex::new(())),
167 renew_task: Default::default(),
168 }
169 }
170
171 /// Try to lock once, returns whether the lock was obtained or not.
172 #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
173 pub async fn try_lock_once(
174 &self,
175 ) -> Result<Option<CrossProcessStoreLockGuard>, LockStoreError> {
176 // Hold onto the locking attempt mutex for the entire lifetime of this
177 // function, to avoid multiple reentrant calls.
178 let mut _attempt = self.locking_attempt.lock().await;
179
180 // If another thread obtained the lock, make sure to only superficially increase
181 // the number of holders, and carry on.
182 if self.num_holders.load(atomic::Ordering::SeqCst) > 0 {
183 // Note: between the above load and the fetch_add below, another thread may
184 // decrement `num_holders`. That's fine because that means the lock
185 // was taken by at least one thread, and after this call it will be
186 // taken by at least one thread.
187 trace!("We already had the lock, incrementing holder count");
188 self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
189 let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() };
190 return Ok(Some(guard));
191 }
192
193 let acquired = self
194 .store
195 .try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder)
196 .await
197 .map_err(|err| {
198 #[cfg(not(target_family = "wasm"))]
199 {
200 LockStoreError::BackingStoreError(Box::new(err))
201 }
202 #[cfg(target_family = "wasm")]
203 {
204 LockStoreError::BackingStoreError(Box::new(err))
205 }
206 })?;
207
208 if !acquired {
209 trace!("Couldn't acquire the lock immediately.");
210 return Ok(None);
211 }
212
213 trace!("Acquired the lock, spawning the lease extension task.");
214
215 // This is the first time we've acquired the lock. We're going to spawn the task
216 // that will renew the lease.
217
218 // Clone data to be owned by the task.
219 let this = (*self).clone();
220
221 let mut renew_task = self.renew_task.lock().await;
222
223 // Cancel the previous task, if any. That's safe to do, because:
224 // - either the task was done,
225 // - or it was still running, but taking a lock in the db has to be an atomic
226 // operation running in a transaction.
227
228 if let Some(_prev) = renew_task.take() {
229 #[cfg(not(target_family = "wasm"))]
230 if !_prev.is_finished() {
231 trace!("aborting the previous renew task");
232 _prev.abort();
233 }
234 }
235
236 // Restart a new one.
237 *renew_task = Some(spawn(async move {
238 loop {
239 {
240 // First, check if there are still users of this lock.
241 //
242 // This is not racy, because:
243 // - the `locking_attempt` mutex makes sure we don't have unexpected
244 // interactions with the non-atomic sequence above in `try_lock_once`
245 // (check > 0, then add 1).
246 // - other entities holding onto the `num_holders` atomic will only
247 // decrease it over time.
248
249 let _guard = this.locking_attempt.lock().await;
250
251 // If there are no more users, we can quit.
252 if this.num_holders.load(atomic::Ordering::SeqCst) == 0 {
253 trace!("exiting the lease extension loop");
254
255 // Cancel the lease with another 0ms lease.
256 // If we don't get the lock, that's (weird but) fine.
257 let fut = this.store.try_lock(0, &this.lock_key, &this.lock_holder);
258 let _ = fut.await;
259
260 // Exit the loop.
261 break;
262 }
263 }
264
265 sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
266
267 let fut = this.store.try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder);
268 if let Err(err) = fut.await {
269 error!("error when extending lock lease: {err:#}");
270 // Exit the loop.
271 break;
272 }
273 }
274 }));
275
276 self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
277
278 let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() };
279 Ok(Some(guard))
280 }
281
282 /// Attempt to take the lock, with exponential backoff if the lock has
283 /// already been taken before.
284 ///
285 /// The `max_backoff` parameter is the maximum time (in milliseconds) that
286 /// should be waited for, between two attempts. When that time is
287 /// reached a second time, the lock will stop attempting to get the lock
288 /// and will return a timeout error upon locking. If not provided,
289 /// will wait for [`MAX_BACKOFF_MS`].
290 #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
291 pub async fn spin_lock(
292 &self,
293 max_backoff: Option<u32>,
294 ) -> Result<CrossProcessStoreLockGuard, LockStoreError> {
295 let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
296
297 // Note: reads/writes to the backoff are racy across threads in theory, but the
298 // lock in `try_lock_once` should sequentialize it all.
299
300 loop {
301 if let Some(guard) = self.try_lock_once().await? {
302 // Reset backoff before returning, for the next attempt to lock.
303 *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
304 return Ok(guard);
305 }
306
307 // Exponential backoff! Multiply by 2 the time we've waited before, cap it to
308 // max_backoff.
309 let mut backoff = self.backoff.lock().await;
310
311 let wait = match &mut *backoff {
312 WaitingTime::Some(ref mut val) => {
313 let wait = *val;
314 *val = val.saturating_mul(2);
315 if *val >= max_backoff {
316 *backoff = WaitingTime::Stop;
317 }
318 wait
319 }
320 WaitingTime::Stop => {
321 // We've reached the maximum backoff, abandon.
322 return Err(LockStoreError::LockTimeout);
323 }
324 };
325
326 debug!("Waiting {wait} before re-attempting to take the lock");
327 sleep(Duration::from_millis(wait.into())).await;
328 }
329 }
330
331 /// Returns the value in the database that represents the holder's
332 /// identifier.
333 pub fn lock_holder(&self) -> &str {
334 &self.lock_holder
335 }
336}
337
338/// Error related to the locking API of the store.
339#[derive(Debug, thiserror::Error)]
340pub enum LockStoreError {
341 /// Spent too long waiting for a database lock.
342 #[error("a lock timed out")]
343 LockTimeout,
344
345 #[error(transparent)]
346 #[cfg(not(target_family = "wasm"))]
347 BackingStoreError(#[from] Box<dyn Error + Send + Sync>),
348
349 #[error(transparent)]
350 #[cfg(target_family = "wasm")]
351 BackingStoreError(Box<dyn Error>),
352}
353
354#[cfg(test)]
355#[cfg(not(target_family = "wasm"))] // These tests require tokio::time, which is not implemented on wasm.
356mod tests {
357 use std::{
358 collections::HashMap,
359 sync::{atomic, Arc, RwLock},
360 time::Instant,
361 };
362
363 use assert_matches::assert_matches;
364 use matrix_sdk_test_macros::async_test;
365 use tokio::{
366 spawn,
367 time::{sleep, Duration},
368 };
369
370 use super::{
371 memory_store_helper::try_take_leased_lock, BackingStore, CrossProcessStoreLock,
372 CrossProcessStoreLockGuard, LockStoreError, EXTEND_LEASE_EVERY_MS,
373 };
374
375 #[derive(Clone, Default)]
376 struct TestStore {
377 leases: Arc<RwLock<HashMap<String, (String, Instant)>>>,
378 }
379
380 impl TestStore {
381 fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool {
382 try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
383 }
384 }
385
386 #[derive(Debug, thiserror::Error)]
387 enum DummyError {}
388
389 impl BackingStore for TestStore {
390 type LockError = DummyError;
391
392 /// Try to take a lock using the given store.
393 async fn try_lock(
394 &self,
395 lease_duration_ms: u32,
396 key: &str,
397 holder: &str,
398 ) -> Result<bool, Self::LockError> {
399 Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
400 }
401 }
402
403 async fn release_lock(guard: Option<CrossProcessStoreLockGuard>) {
404 drop(guard);
405 sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
406 }
407
408 type TestResult = Result<(), LockStoreError>;
409
410 #[async_test]
411 async fn test_simple_lock_unlock() -> TestResult {
412 let store = TestStore::default();
413 let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned());
414
415 // The lock plain works when used with a single holder.
416 let acquired = lock.try_lock_once().await?;
417 assert!(acquired.is_some());
418 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
419
420 // Releasing works.
421 release_lock(acquired).await;
422 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
423
424 // Spin locking on the same lock always works, assuming no concurrent access.
425 let acquired = lock.spin_lock(None).await.unwrap();
426
427 // Releasing still works.
428 release_lock(Some(acquired)).await;
429 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
430
431 Ok(())
432 }
433
434 #[async_test]
435 async fn test_self_recovery() -> TestResult {
436 let store = TestStore::default();
437 let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
438
439 // When a lock is acquired...
440 let acquired = lock.try_lock_once().await?;
441 assert!(acquired.is_some());
442 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
443
444 // But then forgotten... (note: no need to release the guard)
445 drop(lock);
446
447 // And when rematerializing the lock with the same key/value...
448 let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
449
450 // We still got it.
451 let acquired = lock.try_lock_once().await?;
452 assert!(acquired.is_some());
453 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
454
455 Ok(())
456 }
457
458 #[async_test]
459 async fn test_multiple_holders_same_process() -> TestResult {
460 let store = TestStore::default();
461 let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned());
462
463 // Taking the lock twice...
464 let acquired = lock.try_lock_once().await?;
465 assert!(acquired.is_some());
466
467 let acquired2 = lock.try_lock_once().await?;
468 assert!(acquired2.is_some());
469
470 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2);
471
472 // ...means we can release it twice.
473 release_lock(acquired).await;
474 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
475
476 release_lock(acquired2).await;
477 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
478
479 Ok(())
480 }
481
482 #[async_test]
483 async fn test_multiple_processes() -> TestResult {
484 let store = TestStore::default();
485 let lock1 = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
486 let lock2 = CrossProcessStoreLock::new(store, "key".to_owned(), "second".to_owned());
487
488 // When the first process takes the lock...
489 let acquired1 = lock1.try_lock_once().await?;
490 assert!(acquired1.is_some());
491
492 // The second can't take it immediately.
493 let acquired2 = lock2.try_lock_once().await?;
494 assert!(acquired2.is_none());
495
496 let lock2_clone = lock2.clone();
497 let handle = spawn(async move { lock2_clone.spin_lock(Some(1000)).await });
498
499 sleep(Duration::from_millis(100)).await;
500
501 drop(acquired1);
502
503 // lock2 in the background manages to get the lock at some point.
504 let _acquired2 = handle
505 .await
506 .expect("join handle is properly awaited")
507 .expect("lock was obtained after spin-locking");
508
509 // Now if lock1 tries to get the lock with a small timeout, it will fail.
510 assert_matches!(lock1.spin_lock(Some(200)).await, Err(LockStoreError::LockTimeout));
511
512 Ok(())
513 }
514}
515
516/// Some code that is shared by almost all `MemoryStore` implementations out
517/// there.
518pub mod memory_store_helper {
519 use std::collections::{hash_map::Entry, HashMap};
520
521 use ruma::time::{Duration, Instant};
522
523 pub fn try_take_leased_lock(
524 leases: &mut HashMap<String, (String, Instant)>,
525 lease_duration_ms: u32,
526 key: &str,
527 holder: &str,
528 ) -> bool {
529 let now = Instant::now();
530 let expiration = now + Duration::from_millis(lease_duration_ms.into());
531
532 match leases.entry(key.to_owned()) {
533 // There is an existing holder.
534 Entry::Occupied(mut entry) => {
535 let (current_holder, current_expiration) = entry.get_mut();
536
537 if current_holder == holder {
538 // We had the lease before, extend it.
539 *current_expiration = expiration;
540
541 true
542 } else {
543 // We didn't have it.
544 if *current_expiration < now {
545 // Steal it!
546 *current_holder = holder.to_owned();
547 *current_expiration = expiration;
548
549 true
550 } else {
551 // We tried our best.
552 false
553 }
554 }
555 }
556
557 // There is no holder, easy.
558 Entry::Vacant(entry) => {
559 entry.insert((
560 holder.to_owned(),
561 Instant::now() + Duration::from_millis(lease_duration_ms.into()),
562 ));
563
564 true
565 }
566 }
567 }
568}