matrix_sdk_common/store_locks.rs
1// Copyright 2023 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Collection of small helpers that implement store-based locks.
16//!
17//! This is a per-process lock that may be used only for very specific use
18//! cases, where multiple processes might concurrently write to the same
19//! database at the same time; this would invalidate store caches, so
20//! that should be done mindfully. Such a lock can be acquired multiple times by
21//! the same process, and it remains active as long as there's at least one user
22//! in a given process.
23//!
24//! The lock is implemented using time-based leases to values inserted in a
25//! store. The store maintains the lock identifier (key), who's the
26//! current holder (value), and an expiration timestamp on the side; see also
27//! `CryptoStore::try_take_leased_lock` for more details.
28//!
29//! The lock is initially acquired for a certain period of time (namely, the
30//! duration of a lease, aka `LEASE_DURATION_MS`), and then a "heartbeat" task
31//! renews the lease to extend its duration, every so often (namely, every
32//! `EXTEND_LEASE_EVERY_MS`). Since the tokio scheduler might be busy, the
33//! extension request should happen way more frequently than the duration of a
34//! lease, in case a deadline is missed. The current values have been chosen to
35//! reflect that, with a ratio of 1:10 as of 2023-06-23.
36//!
37//! Releasing the lock happens naturally, by not renewing a lease. It happens
38//! automatically after the duration of the last lease, at most.
39
40use std::{
41 error::Error,
42 sync::{
43 atomic::{self, AtomicU32},
44 Arc,
45 },
46 time::Duration,
47};
48
49use tokio::sync::Mutex;
50use tracing::{debug, error, info, instrument, trace};
51
52use crate::{
53 executor::{spawn, JoinHandle},
54 sleep::sleep,
55 SendOutsideWasm,
56};
57
58/// Backing store for a cross-process lock.
59#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
60#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
61pub trait BackingStore {
62 type LockError: Error + Send + Sync;
63
64 /// Try to take a lock using the given store.
65 async fn try_lock(
66 &self,
67 lease_duration_ms: u32,
68 key: &str,
69 holder: &str,
70 ) -> Result<bool, Self::LockError>;
71}
72
73/// Small state machine to handle wait times.
74#[derive(Clone, Debug)]
75enum WaitingTime {
76 /// Some time to wait, in milliseconds.
77 Some(u32),
78 /// Stop waiting when seeing this value.
79 Stop,
80}
81
82/// A guard on the store lock.
83///
84/// The lock will be automatically released a short period of time after all the
85/// guards have dropped.
86#[derive(Debug)]
87pub struct CrossProcessStoreLockGuard {
88 num_holders: Arc<AtomicU32>,
89}
90
91impl Drop for CrossProcessStoreLockGuard {
92 fn drop(&mut self) {
93 self.num_holders.fetch_sub(1, atomic::Ordering::SeqCst);
94 }
95}
96
97/// A store-based lock for a `Store`.
98///
99/// See the doc-comment of this module for more information.
100#[derive(Clone, Debug)]
101pub struct CrossProcessStoreLock<S: BackingStore + Clone + SendOutsideWasm + 'static> {
102 /// The store we're using to lock.
103 store: S,
104
105 /// Number of holders of the lock in this process.
106 ///
107 /// If greater than 0, this means we've already acquired this lock, in this
108 /// process, and the store lock mustn't be touched.
109 ///
110 /// When the number of holders is decreased to 0, then the lock must be
111 /// released in the store.
112 num_holders: Arc<AtomicU32>,
113
114 /// A mutex to control an attempt to take the lock, to avoid making it
115 /// reentrant.
116 locking_attempt: Arc<Mutex<()>>,
117
118 /// Current renew task spawned by `try_lock_once`.
119 renew_task: Arc<Mutex<Option<JoinHandle<()>>>>,
120
121 /// The key used in the key/value mapping for the lock entry.
122 lock_key: String,
123
124 /// A specific value to identify the lock's holder.
125 lock_holder: String,
126
127 /// Backoff time, in milliseconds.
128 backoff: Arc<Mutex<WaitingTime>>,
129}
130
131/// Amount of time a lease of the lock should last, in milliseconds.
132pub const LEASE_DURATION_MS: u32 = 500;
133
134/// Period of time between two attempts to extend the lease. We'll
135/// re-request a lease for an entire duration of `LEASE_DURATION_MS`
136/// milliseconds, every `EXTEND_LEASE_EVERY_MS`, so this has to
137/// be an amount safely low compared to `LEASE_DURATION_MS`, to make sure
138/// that we can miss a deadline without compromising the lock.
139pub const EXTEND_LEASE_EVERY_MS: u64 = 50;
140
141/// Initial backoff, in milliseconds. This is the time we wait the first
142/// time, if taking the lock initially failed.
143const INITIAL_BACKOFF_MS: u32 = 10;
144
145/// Maximal backoff, in milliseconds. This is the maximum amount of time
146/// we'll wait for the lock, *between two attempts*.
147pub const MAX_BACKOFF_MS: u32 = 1000;
148
149impl<S: BackingStore + Clone + SendOutsideWasm + 'static> CrossProcessStoreLock<S> {
150 /// Create a new store-based lock implemented as a value in the store.
151 ///
152 /// # Parameters
153 ///
154 /// - `lock_key`: key in the key-value store to store the lock's state.
155 /// - `lock_holder`: identify the lock's holder with this given value.
156 pub fn new(store: S, lock_key: String, lock_holder: String) -> Self {
157 Self {
158 store,
159 lock_key,
160 lock_holder,
161 backoff: Arc::new(Mutex::new(WaitingTime::Some(INITIAL_BACKOFF_MS))),
162 num_holders: Arc::new(0.into()),
163 locking_attempt: Arc::new(Mutex::new(())),
164 renew_task: Default::default(),
165 }
166 }
167
168 /// Try to lock once, returns whether the lock was obtained or not.
169 #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
170 pub async fn try_lock_once(
171 &self,
172 ) -> Result<Option<CrossProcessStoreLockGuard>, LockStoreError> {
173 // Hold onto the locking attempt mutex for the entire lifetime of this
174 // function, to avoid multiple reentrant calls.
175 let mut _attempt = self.locking_attempt.lock().await;
176
177 // If another thread obtained the lock, make sure to only superficially increase
178 // the number of holders, and carry on.
179 if self.num_holders.load(atomic::Ordering::SeqCst) > 0 {
180 // Note: between the above load and the fetch_add below, another thread may
181 // decrement `num_holders`. That's fine because that means the lock
182 // was taken by at least one thread, and after this call it will be
183 // taken by at least one thread.
184 trace!("We already had the lock, incrementing holder count");
185 self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
186 let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() };
187 return Ok(Some(guard));
188 }
189
190 let acquired = self
191 .store
192 .try_lock(LEASE_DURATION_MS, &self.lock_key, &self.lock_holder)
193 .await
194 .map_err(|err| LockStoreError::BackingStoreError(Box::new(err)))?;
195
196 if !acquired {
197 trace!("Couldn't acquire the lock immediately.");
198 return Ok(None);
199 }
200
201 trace!("Acquired the lock, spawning the lease extension task.");
202
203 // This is the first time we've acquired the lock. We're going to spawn the task
204 // that will renew the lease.
205
206 // Clone data to be owned by the task.
207 let this = (*self).clone();
208
209 let mut renew_task = self.renew_task.lock().await;
210
211 // Cancel the previous task, if any. That's safe to do, because:
212 // - either the task was done,
213 // - or it was still running, but taking a lock in the db has to be an atomic
214 // operation running in a transaction.
215
216 if let Some(_prev) = renew_task.take() {
217 #[cfg(not(target_arch = "wasm32"))]
218 if !_prev.is_finished() {
219 info!("aborting the previous renew task");
220 _prev.abort();
221 }
222 }
223
224 // Restart a new one.
225 *renew_task = Some(spawn(async move {
226 loop {
227 {
228 // First, check if there are still users of this lock.
229 //
230 // This is not racy, because:
231 // - the `locking_attempt` mutex makes sure we don't have unexpected
232 // interactions with the non-atomic sequence above in `try_lock_once`
233 // (check > 0, then add 1).
234 // - other entities holding onto the `num_holders` atomic will only
235 // decrease it over time.
236
237 let _guard = this.locking_attempt.lock().await;
238
239 // If there are no more users, we can quit.
240 if this.num_holders.load(atomic::Ordering::SeqCst) == 0 {
241 info!("exiting the lease extension loop");
242
243 // Cancel the lease with another 0ms lease.
244 // If we don't get the lock, that's (weird but) fine.
245 let fut = this.store.try_lock(0, &this.lock_key, &this.lock_holder);
246 let _ = fut.await;
247
248 // Exit the loop.
249 break;
250 }
251 }
252
253 sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
254
255 let fut = this.store.try_lock(LEASE_DURATION_MS, &this.lock_key, &this.lock_holder);
256 if let Err(err) = fut.await {
257 error!("error when extending lock lease: {err:#}");
258 // Exit the loop.
259 break;
260 }
261 }
262 }));
263
264 self.num_holders.fetch_add(1, atomic::Ordering::SeqCst);
265
266 let guard = CrossProcessStoreLockGuard { num_holders: self.num_holders.clone() };
267 Ok(Some(guard))
268 }
269
270 /// Attempt to take the lock, with exponential backoff if the lock has
271 /// already been taken before.
272 ///
273 /// The `max_backoff` parameter is the maximum time (in milliseconds) that
274 /// should be waited for, between two attempts. When that time is
275 /// reached a second time, the lock will stop attempting to get the lock
276 /// and will return a timeout error upon locking. If not provided,
277 /// will wait for [`MAX_BACKOFF_MS`].
278 #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
279 pub async fn spin_lock(
280 &self,
281 max_backoff: Option<u32>,
282 ) -> Result<CrossProcessStoreLockGuard, LockStoreError> {
283 let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);
284
285 // Note: reads/writes to the backoff are racy across threads in theory, but the
286 // lock in `try_lock_once` should sequentialize it all.
287
288 loop {
289 if let Some(guard) = self.try_lock_once().await? {
290 // Reset backoff before returning, for the next attempt to lock.
291 *self.backoff.lock().await = WaitingTime::Some(INITIAL_BACKOFF_MS);
292 return Ok(guard);
293 }
294
295 // Exponential backoff! Multiply by 2 the time we've waited before, cap it to
296 // max_backoff.
297 let mut backoff = self.backoff.lock().await;
298
299 let wait = match &mut *backoff {
300 WaitingTime::Some(ref mut val) => {
301 let wait = *val;
302 *val = val.saturating_mul(2);
303 if *val >= max_backoff {
304 *backoff = WaitingTime::Stop;
305 }
306 wait
307 }
308 WaitingTime::Stop => {
309 // We've reached the maximum backoff, abandon.
310 return Err(LockStoreError::LockTimeout);
311 }
312 };
313
314 debug!("Waiting {wait} before re-attempting to take the lock");
315 sleep(Duration::from_millis(wait.into())).await;
316 }
317 }
318
319 /// Returns the value in the database that represents the holder's
320 /// identifier.
321 pub fn lock_holder(&self) -> &str {
322 &self.lock_holder
323 }
324}
325
326/// Error related to the locking API of the store.
327#[derive(Debug, thiserror::Error)]
328pub enum LockStoreError {
329 /// Spent too long waiting for a database lock.
330 #[error("a lock timed out")]
331 LockTimeout,
332
333 #[error(transparent)]
334 BackingStoreError(#[from] Box<dyn Error + Send + Sync>),
335}
336
337#[cfg(test)]
338#[cfg(not(target_arch = "wasm32"))] // These tests require tokio::time, which is not implemented on wasm.
339mod tests {
340 use std::{
341 collections::HashMap,
342 sync::{atomic, Arc, RwLock},
343 time::Instant,
344 };
345
346 use assert_matches::assert_matches;
347 use matrix_sdk_test_macros::async_test;
348 use tokio::{
349 spawn,
350 time::{sleep, Duration},
351 };
352
353 use super::{
354 memory_store_helper::try_take_leased_lock, BackingStore, CrossProcessStoreLock,
355 CrossProcessStoreLockGuard, LockStoreError, EXTEND_LEASE_EVERY_MS,
356 };
357
358 #[derive(Clone, Default)]
359 struct TestStore {
360 leases: Arc<RwLock<HashMap<String, (String, Instant)>>>,
361 }
362
363 impl TestStore {
364 fn try_take_leased_lock(&self, lease_duration_ms: u32, key: &str, holder: &str) -> bool {
365 try_take_leased_lock(&mut self.leases.write().unwrap(), lease_duration_ms, key, holder)
366 }
367 }
368
369 #[derive(Debug, thiserror::Error)]
370 enum DummyError {}
371
372 #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
373 #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
374 impl BackingStore for TestStore {
375 type LockError = DummyError;
376
377 /// Try to take a lock using the given store.
378 async fn try_lock(
379 &self,
380 lease_duration_ms: u32,
381 key: &str,
382 holder: &str,
383 ) -> Result<bool, Self::LockError> {
384 Ok(self.try_take_leased_lock(lease_duration_ms, key, holder))
385 }
386 }
387
388 async fn release_lock(guard: Option<CrossProcessStoreLockGuard>) {
389 drop(guard);
390 sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
391 }
392
393 type TestResult = Result<(), LockStoreError>;
394
395 #[async_test]
396 async fn test_simple_lock_unlock() -> TestResult {
397 let store = TestStore::default();
398 let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned());
399
400 // The lock plain works when used with a single holder.
401 let acquired = lock.try_lock_once().await?;
402 assert!(acquired.is_some());
403 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
404
405 // Releasing works.
406 release_lock(acquired).await;
407 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
408
409 // Spin locking on the same lock always works, assuming no concurrent access.
410 let acquired = lock.spin_lock(None).await.unwrap();
411
412 // Releasing still works.
413 release_lock(Some(acquired)).await;
414 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
415
416 Ok(())
417 }
418
419 #[async_test]
420 async fn test_self_recovery() -> TestResult {
421 let store = TestStore::default();
422 let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
423
424 // When a lock is acquired...
425 let acquired = lock.try_lock_once().await?;
426 assert!(acquired.is_some());
427 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
428
429 // But then forgotten... (note: no need to release the guard)
430 drop(lock);
431
432 // And when rematerializing the lock with the same key/value...
433 let lock = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
434
435 // We still got it.
436 let acquired = lock.try_lock_once().await?;
437 assert!(acquired.is_some());
438 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
439
440 Ok(())
441 }
442
443 #[async_test]
444 async fn test_multiple_holders_same_process() -> TestResult {
445 let store = TestStore::default();
446 let lock = CrossProcessStoreLock::new(store, "key".to_owned(), "first".to_owned());
447
448 // Taking the lock twice...
449 let acquired = lock.try_lock_once().await?;
450 assert!(acquired.is_some());
451
452 let acquired2 = lock.try_lock_once().await?;
453 assert!(acquired2.is_some());
454
455 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 2);
456
457 // ...means we can release it twice.
458 release_lock(acquired).await;
459 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 1);
460
461 release_lock(acquired2).await;
462 assert_eq!(lock.num_holders.load(atomic::Ordering::SeqCst), 0);
463
464 Ok(())
465 }
466
467 #[async_test]
468 async fn test_multiple_processes() -> TestResult {
469 let store = TestStore::default();
470 let lock1 = CrossProcessStoreLock::new(store.clone(), "key".to_owned(), "first".to_owned());
471 let lock2 = CrossProcessStoreLock::new(store, "key".to_owned(), "second".to_owned());
472
473 // When the first process takes the lock...
474 let acquired1 = lock1.try_lock_once().await?;
475 assert!(acquired1.is_some());
476
477 // The second can't take it immediately.
478 let acquired2 = lock2.try_lock_once().await?;
479 assert!(acquired2.is_none());
480
481 let lock2_clone = lock2.clone();
482 let handle = spawn(async move { lock2_clone.spin_lock(Some(1000)).await });
483
484 sleep(Duration::from_millis(100)).await;
485
486 drop(acquired1);
487
488 // lock2 in the background manages to get the lock at some point.
489 let _acquired2 = handle
490 .await
491 .expect("join handle is properly awaited")
492 .expect("lock was obtained after spin-locking");
493
494 // Now if lock1 tries to get the lock with a small timeout, it will fail.
495 assert_matches!(lock1.spin_lock(Some(200)).await, Err(LockStoreError::LockTimeout));
496
497 Ok(())
498 }
499}
500
501/// Some code that is shared by almost all `MemoryStore` implementations out
502/// there.
503pub mod memory_store_helper {
504 use std::collections::{hash_map::Entry, HashMap};
505
506 use ruma::time::{Duration, Instant};
507
508 pub fn try_take_leased_lock(
509 leases: &mut HashMap<String, (String, Instant)>,
510 lease_duration_ms: u32,
511 key: &str,
512 holder: &str,
513 ) -> bool {
514 let now = Instant::now();
515 let expiration = now + Duration::from_millis(lease_duration_ms.into());
516
517 match leases.entry(key.to_owned()) {
518 // There is an existing holder.
519 Entry::Occupied(mut entry) => {
520 let (current_holder, current_expiration) = entry.get_mut();
521
522 if current_holder == holder {
523 // We had the lease before, extend it.
524 *current_expiration = expiration;
525
526 true
527 } else {
528 // We didn't have it.
529 if *current_expiration < now {
530 // Steal it!
531 *current_holder = holder.to_owned();
532 *current_expiration = expiration;
533
534 true
535 } else {
536 // We tried our best.
537 false
538 }
539 }
540 }
541
542 // There is no holder, easy.
543 Entry::Vacant(entry) => {
544 entry.insert((
545 holder.to_owned(),
546 Instant::now() + Duration::from_millis(lease_duration_ms.into()),
547 ));
548
549 true
550 }
551 }
552 }
553}