Firestore 并发交易冻结

Firestore Concurrent Transactions Freezing

我遇到了 Firestore 事务在 运行 同时冻结的问题。作为背景,我有一个 Firestore 集合,其中每个文档都有一个金额和一个时间。我正在创建一个函数,从最旧的文档开始,从该集合中释放所需的美元金额。

例如,释放 $150 的函数调用将遍历集合,从集合中删除美元金额,直到总共删除 $150。我使用递归函数执行此操作,该函数 1) 找到最早的美元金额,2) 从该数字中删除输入的金额(即 150 美元),或者如果输入的金额大于该数字,则删除该数字,以及 3) 如果有则重复仍然是要删除的剩余金额。我在步骤 (2) 中使用 Firestore 事务,因为此集合有可能同时被多个用户更改(请注意,如果我将 (1) 和 (2) 结合起来以将查询包含在事务中,代码行为未更改)。

下面的代码正确地更新了集合。但是,如果在较早的实例已经 运行ning 时调用它会花费很长时间:如果我调用它一次,然后在第一次调用完成之前再次调用它,它会冻结并有可能占用 20 -30 分钟而不是通常的 1-5 秒。虽然并发事务之间的争用很明显导致冻结(当我删除事务的写入部分时,没有冻结),但未知的问题是争用具体是什么导致冻结以及如何解决它。

补充:看来这次冻结可能与https://github.com/firebase/firebase-tools/issues/2452有关。与 post 一致,我面临每笔交易 30 秒的冻结,考虑到单个版本有多个交易,这会变成很多分钟。

 function releaseAmountFromStack(amount) {
  return new Promise((resolve, reject) => {
    let db = admin.firestore();
    let stackRef = db.collection("stack");

    stackRef.orderBy("expirationTime", "asc").limit(1)
      .get().then((querySnapshot) => {
        if(querySnapshot.empty) {
          return reject("None left in stack");
        }

        let itemToRelease = querySnapshot.docs[0];

        releaseItem(itemToRelease.ref, amount)
        .then((actualReleaseAmount) => {
          // If there is still more to release, trigger the next recursion
          // If the full amount has been released, return it
          if (amount > actualReleaseAmount) {
            releaseAmountFromStack(amount-actualReleaseAmount)
            .then((nextActualReleaseAmount) => {
              return resolve(actualReleaseAmount + nextActualReleaseAmount);
            })
            .catch(() => {
              return resolve(actualReleaseAmount);
            });
          } else {
            return resolve(actualReleaseAmount);
          }
        });
    });
  });
}

function releaseItem(itemRef, amountToRelease) {
  let db = admin.firestore();
  return db.runTransaction((transaction) => {
    return transaction.get(itemRef).then((itemDoc) => {
      let itemAmount = itemDoc.data().amount;
      let actualReleaseAmount = Math.min(amountToRelease, itemAmount);

      // If item is exhausted, delete it. Else, update amount
      if (actualReleaseAmount >= itemAmount) {
        transaction.delete(itemDoc.ref);
      } else {
        transaction.set(itemDoc.ref, {
          amount: admin.firestore.FieldValue.increment(-1*Number(actualReleaseAmount)),
        }, {merge: true});
      }
      return actualReleaseAmount;
      });
  });
}

以下是迄今为止调试过程中的一些有用信息。非常感谢。

首先,让我们修复您的 releaseAmountFromStack 函数,这样您就不会将 Promise 构造函数用于 Promise-returning API(称为 Explicit Promise Construction Antipattern ).如果您的 stackRef 查询或 releaseItem 函数抛出错误,您的代码将遇到 UnhandledPromiseRejection,因为这两个 Promise 链都没有 catch 处理程序。

function releaseAmountFromStack(amount) {
  const db = admin.firestore();
  const stackRef = db.collection("stack");

  return stackRef
    .orderBy("expirationTime", "asc")
    .limit(1)
    .get()
    .then((querySnapshot) => {
      if(querySnapshot.empty) {
        return Promise.reject("Out of stock");
      }

      const itemToRelease = querySnapshot.docs[0];

      return releaseItem(itemToRelease.ref, amount);
    })
    .then((releasedAmount) => {
       const amountLeft = amount - releasedAmount;

       if (amountLeft <= 0)
         return releasedAmount;

       return releaseAmountFromStack(amountLeft)
         .then((nextReleasedAmount) => nextReleasedAmount + releasedAmount)
         .catch((err) => {
           if (err === "Out of stock") {
             return releasedAmount;
           } else {
             // rethrow unexpected errors
             throw err;
           }
         });
     });
}

因为该函数涉及嵌套的 Promise 链,切换到 async/await 语法可以将其扁平化为:

async function releaseAmountFromStack(amount) {
  const db = admin.firestore();
  const stackRef = db.collection("stack");

  const querySnapshot = await stackRef
    .orderBy("expirationTime", "asc")
    .limit(1)
    .get();
  
  if (querySnapshot.empty)
    return 0; // out of stock

  const itemToRelease = querySnapshot.docs[0];

  const releasedAmount = await releaseItem(itemToRelease.ref, amount);

  const amountLeft = amount - releasedAmount;
  
  if (amountLeft <= 0) {
    // nothing left to release, return released amount
    return releasedAmount; 
  }

  // If here, there is more to release, trigger the next recursion
  const nextReleasedAmount = await releaseAmountFromStack(amountLeft);

  return nextReleasedAmount + releasedAmount;
}

注意: 在上面的扁平化版本中,我们不需要处理 catch,因为我们可以直接 return 0 代替。这意味着任何不相关的错误都会被正常抛出。

接下来我们可以继续 releaseItem,这是您问题的实际原因。在这里,您没有处理另一个实例正在删除您正在阅读的项目的情况,您只是假设它存在然后最终处理 NaN and/or 负余额。作为最终结果的示例,您可以得到以下事件序列(它比这更复杂,因为您使用的是 FieldValue.increment() - 添加服务器客户端也做交易):

 Client A                      Server                      Client B
Release 50                                                Release 80
    ┌┐         Get Doc #1        ┌┐                           ┌┐
    ││     ────────────────►     ││                           ││
    ││                           ││                           ││
    ││                           ││                           ││
    ││      Doc #1 Snapshot      ││         Get Doc #1        ││
    ││       { amount: 10 }      ││     ◄────────────────     ││
    ││     ◄────────────────     ││                           ││
    ││                           ││                           ││
    ││                           ││      Doc #1 Snapshot      ││
    ││                           ││       { amount: 10 }      ││
    ││   Result: Delete Doc #1   ││     ────────────────►     ││
    ││     ────────────────►     ││                           ││
    ││          ACCEPTED         ││                           ││
    ││     ◄────────────────     ││                           ││
    ││                           ││   Result: Delete Doc #1   ││
    ││         Get Doc #2        ││     ◄────────────────     ││
    ││     ────────────────►     ││                           ││
    ││                           ││         REJECTED          ││
    ││                           ││    New Doc #1 Snapshot    ││
    ││                           ││          <null>           ││
    ││      Doc #2 Snapshot      ││     ────────────────►     ││
    ││      { amount: 15 }       ││                           ││
    ││     ◄────────────────     ││                           ││
    ││                           ││                           ││
    ││                           ││ Result: Set Doc #1 to -10 ││
    ││                           ││     ◄────────────────     ││
    ││   Result: Delete Doc #2   ││             ▲             ││
    ││     ────────────────►     ││             │             ││
    ││          ACCEPTED         ││           ERROR           ││
    ││     ◄────────────────     ││                           ││
    └┘                           └┘                           └┘

因为您正在使用交易,所以您已经知道 amount 的当前值,因此您可以在客户端上进行计算并写入新金额,而不是使用 FieldValue.increment()。该运算符更适合在不需要知道当前值的情况下对值进行简单更新。

function releaseItem(itemRef, amountToRelease) {
  const db = admin.firestore();
  return db.runTransaction((transaction) => {
    return transaction.get(itemRef).then((itemDoc) => {
      if (!itemDoc.exists) {
        // target has been deleted, do nothing & return
        // amount that was released (0)
        return 0;
      }
  
      const itemAmount = itemDoc.get("amount");
      const actualReleaseAmount = Math.min(amountToRelease, itemAmount);

      if (actualReleaseAmount >= itemAmount) {
        // exhausted supply. delete item
        transaction.delete(itemDoc.ref);
      } else {
        // have leftover supply. update amount
        transaction.set(itemDoc.ref, {
          amount: itemAmount - actualReleaseAmount, 
        }, {merge: true});
      }

      // return amount that was released
      return actualReleaseAmount;
    });
  });
}

这不会完全解决您的问题,因为如果两个或多个客户端同时尝试从同一堆栈中取出项目,您仍然会遇到数据争用问题。作为这方面的一个例子,请看下面的事件流程(再一次,时机将是事情如何发展的主要因素):

 Client A                      Server                      Client B
Release 50                                                Release 80
    ┌┐         Get Doc #1        ┌┐                           ┌┐
    ││     ────────────────►     ││                           ││
    ││                           ││                           ││
    ││                           ││                           ││
    ││      Doc #1 Snapshot      ││         Get Doc #1        ││
    ││       { amount: 10 }      ││     ◄────────────────     ││
    ││     ◄────────────────     ││                           ││
    ││                           ││                           ││
    ││                           ││      Doc #1 Snapshot      ││
    ││                           ││       { amount: 10 }      ││
    ││   Result: Delete Doc #1   ││     ────────────────►     ││
    ││     ────────────────►     ││                           ││
    ││          ACCEPTED         ││                           ││
    ││     ◄────────────────     ││                           ││
    ││                           ││   Result: Delete Doc #1   ││
    ││         Get Doc #2        ││     ◄────────────────     ││
    ││     ────────────────►     ││                           ││
    ││                           ││         REJECTED          ││
    ││                           ││    New Doc #1 Snapshot    ││
    ││                           ││          <null>           ││
    ││      Doc #2 Snapshot      ││     ────────────────►     ││
    ││      { amount: 15 }       ││                           ││
    ││     ◄────────────────     ││     Result: Cancelled     ││
    ││                           ││     ◄────────────────     ││
    ││                           ││                           ││
    ││                           ││                           ││
    ││   Result: Delete Doc #2   ││         Get Doc #2        ││
    ││     ────────────────►     ││     ◄────────────────     ││
    ││          ACCEPTED         ││                           ││
    ││     ◄────────────────     ││                           ││
    ││                           ││      Doc #2 Snapshot      ││
    ││         Get Doc #3        ││          <null>           ││
    ││     ────────────────►     ││     ────────────────►     ││
    ││                           ││                           ││
    ││                           ││     Result: Cancelled     ││
    ││                           ││     ◄────────────────     ││
    ││      Doc #3 Snapshot      ││                           ││
    ││      { amount: 10 }       ││                           ││
    ││     ◄────────────────     ││         Get Doc #3        ││
    ││                           ││     ◄────────────────     ││
    ││                           ││                           ││
    ││                           ││                           ││
    ││   Result: Delete Doc #3   ││      Doc #3 Snapshot      ││
    ││     ────────────────►     ││          <null>           ││
    ││          ACCEPTED         ││     ────────────────►     ││
    ││     ◄────────────────     ││                           ││
    ││                           ││     Result: Cancelled     ││
    ││         Get Doc #4        ││     ◄────────────────     ││
    └┘     ────────────────►     └┘                           └┘

解决这个问题的一种方法是一次拉下多个项目,适当地使用它们并将更改写回,所有这些都在一个事务中。再说一次,不是解决问题,只是降低客户一遍又一遍地争夺相同文件的可能性。

async function releaseAmountFromStack(amount) {
  const db = admin.firestore();
  const stackQuery = db
    .collection("stack")
    .orderBy("expirationTime", "asc")
    .limit(1);

  const releasedAmount = await _releaseAmountFromStack(stackQuery, amount);

  const amountLeft = amount - releasedAmount;

  if (amountLeft <= 0) {
    // nothing left to release, return released amount
    return releasedAmount; 
  }

  // If here, there is more to release, trigger the next recursion
  const nextReleasedAmount = await releaseAmountFromStack(amountLeft)
    .catch((err) => {
      if (err !== "Empty stack") throw err;
      return 0;
    });

  return nextReleasedAmount + releasedAmount;
}

function _releaseAmountFromStack(query, amountToRelease) => {
  return db.runTransaction((transaction) => {
    return transaction.get(query).then((querySnapshot) => {
      if (!querySnapshot.empty) {
        // nothing in stack, return released amount (0)
        return Promise.reject("Empty stack");
      }

      let remainingAmountToRelease = amountToRelease;

      for (const doc of querySnapshot.docs) {
        const itemAmount = doc.get("amount");
        const amountChange = Math.min(itemAmount, remainingAmountToRelease);

        if (amountChange >= itemAmount) {
          transaction.delete(doc.ref);
        } else {
          remainingAmountToRelease -= amountChange;
          transaction.set(doc.ref, {
            amount: itemAmount - amountChange
          }, { merge: true });
        }

        if (remainingAmountToRelease <= 0) break; // stop iterating early
      }
  
      // return amount that was released
      return /* totalAmountReleased = */ amountToRelease - remainingAmountToRelease;
    });
  });
}

我弄清楚了导致难以捉摸的冻结的原因。我注意到每笔交易的冻结时间一直是 30 秒,这促使我搜索有关 30 秒交易冻结的讨论。我发现 (https://github.com/firebase/firebase-tools/issues/2452) 表明这是 Firebase 模拟器如何处理并发事务的问题。确实,当我部署代码而不是使用模拟器时,卡顿现象就没有了!

TLDR:Firebase 模拟器对并发事务有不自然的延迟。