是否可以使用 Firebase 实时数据库来实现分布式互斥锁?

Is it possible to use Firebase Realtime Database to implement a distributed mutex?

我正在考虑使用这样的事务来实现一种分布式锁:

const lockId = 'myLock';
const lockRef = firebaseAdmin.database().ref(`/locks/${lockId}`);
lockRef.transaction(function(current) {
  if (current === null) {
    return '1';
  }
}, function(error, committed) {
  if (committed) {
    // .... Do the synchronized work I need ...
    lockRef.remove();
  }
});

我的问题是:update函数只有在数据不存在的情况下才会调用null吗?

一般来说,这是实现分布式锁的有效方式吗?

最初将调用一个交易,并使用客户对当前值的最佳猜测。如果客户端在内存中没有当前值,它最好的猜测是没有当前值。

这意味着如果您得到 null.

,则无法保证数据库中实际不存在任何值

另见:

  • Firebase transaction api call current data is null

由于@FrankvanPuffelen 在他们的回答中陈述的原因,您最初的尝试不会奏效。

但是有可能(虽然不是那么简单)实现这一点。我与不同的边缘情况进行了长时间的斗争,最终想出了这个解决方案,它通过了无数不同的测试,验证了这可以防止所有可能的竞争条件和死锁:

import crypto from 'crypto';
import { promisify } from 'util';

import * as firebaseAdmin from 'firebase-admin';

const randomBytes = promisify(crypto.randomBytes);

// A string which is stored in the place of the value to signal that the mutex holder has
// encountered an error. This must be unique value for each mutex so that we can distinguish old,
// stale rejection states from the failures of the mutex that we are currently waiting for.
const rejectionSignal = (mutexId: string): string => `rejected${mutexId}`;

const isValidValue = (value: unknown): boolean => {
  // `value` could be string in the form `rejected...` which signals failure,
  // using this function makes sure we don't return that as "valid" value.
  return !!value && (typeof value !== 'string' || !value.startsWith('rejected'));
};

export const getOrSetValueWithLocking = async <T>(id: string, value: T): Promise<T> => {
  const ref = firebaseAdmin.database().ref(`/myValues/${id}`);

  const mutexRef = firebaseAdmin.database().ref(`/mutexes/myValues/${id}`);

  const attemptingMutexId = (await randomBytes(16)).toString('hex');

  const mutexTransaction = await mutexRef.transaction((data) => {
    if (data === null) {
      return attemptingMutexId;
    }
  });

  const owningMutexId = mutexTransaction.snapshot.val();

  if (mutexTransaction.committed) {
    // We own the mutex (meaning that `attemptingMutexId` equals `owningMutexId`).
    try {
      const existing = (await ref.once('value')).val();
      if (isValidValue(existing)) {
        return existing;
      }
      /*
        --- YOU CAN DO ANYTHING HERE ---
        E.g. create `value` here instead of passing it as an argument.
      */
      await ref.set(value);
      return value;
    } catch (e) {
      await ref.set(rejectionSignal(owningMutexId));
      throw e;
    } finally {
      // Since we own the mutex, we MUST make sure to release it, no matter what happens.
      await mutexRef.remove();
    }
  } else {
    // Some other caller owns the mutex -> if the value is not yet
    // available, wait for them to insert it or to signal a failure.
    return new Promise((resolve, reject) => {
      ref.on('value', (snapshot) => {
        const val = snapshot.val();
        if (isValidValue(val)) {
          resolve(val);
        } else if (val === rejectionSignal(owningMutexId)) {
          reject(new Error('Mutex holder encountered an error and was not able to set a value.'));
        } // else: Wait for a new value.
      });
    });
  }
};

我的用例是我在 Vercel 中有 Next.js API 路由 运行,其中并行执行的无服务器函数的唯一共享状态是 Firebase 实时数据库。