'SELECT FOR UPDATE' 在 pg-promise 任务中有效吗?

Does 'SELECT FOR UPDATE' work in pg-promise task?

我想 SELECT .. FOR UPDATE 锁定 table 中的一行,以便它可以以原子方式更新,因为可能会发生相同类型的并发请求。

我一直在做一些测试,但不清楚 FOR UPDATEpg-promise.task 中是否有效。我试图避免使用 pg-promise.tx,因为这需要更多的逻辑和可能的递归,我想避免这两种情况,因为用例将是高吞吐量。

更新: 经过更多的研究和测试,我发现将 taskSELECT .. FOR UPDATE 一起使用会给我带来意想不到的结果。下面的代码解释。

submitUserOnline:(pgdb, u_uuid, socketConnectionList, user_websock) =>{
  return new Promise((resolve,reject) =>{
// pgdb.tx({mode} t => {
   pgdb.task( t => {
    return t.one('SELECT * FROM users WHERE user_uuid = ', [u_uuid]
    .then(user =>{
// nothing happens here right now, but may in future, including for completeness. May cancel request based off some comparisons.
     return t.any('SELECT * FROM users_online WHERE user_uuid =  FOR UPDATE',[u_uuid]
     .then(result =>{
      if(result.length > 0){
        console.error('duplicate connection found ' + user_websock.uuid);
        if(socketConnectionList[result[0].web_sock_uuid] !== undefined){
          console.error('drop connection' + result[0].web_sock_uuid);
          socketConnectionList[result[0].web_sock_uuid].wsc.close(4020, 'USER_RECONN');
        }
        console.error('duplicate connection found - update next ' + user_websock.uuid);
        return t.none('UPDATE users_online SET web_sock_uuid =  WHERE user_uuid = ', [user_websock, u_uuid])
        .then(res =>{
          console.error('UPDATE res: ' + res);
        })
        .catch(err =>{
          console.error('UPDATE err: ' + err);});

       }else{
         // not reached in test case
         console.error('no duplicate found ' + user_websock.uuid);
         return t.none(INSERT INTO  users_online.....ect ect
       }
      });
   })
   .then(res =>{
     console.error('>>>task/tx res: ' + res);
     resolve({msg: "OK"});
   })
   .catch(err =>{
     console.error('>>>task/tx err: ' + err);
     if(err.code ==== '40001'){// recursion for when called as 'tx'
       console.error('>>>task/tx err - call recurse');
       module.exports.submitUserOnline(pgdb, u_uuid, socketConnectionList, user_websock)
       .then(res =>{
         console.error('>>>task/tx err - call recurse - res ' + res);
         resolve({msg: "OK"});
        })
        .catch( err =>{
          console.error('>>>task/tx err - call recurse - err: ' + err);
          reject({msg:"FAILED"});
        });
     }
    });
  });
}
const mode = new TransactionMode({
    tiLevel: isolationLevel.serializable,
    readOnly: false,
    deferrable: true
});

submitUserOnline 由 websocket 处理程序调用。在我的测试用例中,我有一个 10 元素数组(相同 user_uuid),它在 for 循环中触发所有客户端连接。基本上它与服务器 websocket 建立连接,该套接字检查特定用户的 users_online table,如果该用户已经在 table 中,则终止陈旧的连接并更新 web_sock_uuid 在 table 中,这就是问题所在(有时它工作有时不工作,10 个并发连接中的一个 运行 显示问题)。当用户行 web_sock_uuidUPDATE 编辑时,其他并发连接似乎在 SELECT .. FOR UPDATE(SLFU) 上正确阻塞,当 UPDATE 执行时,then() 并不总是运行 在下一个 SLFU 发布之前。这似乎以挂起 SLFU 的形式自我呈现,返回先前 UPDATE 之前行的旧 web_sock_uuid。在一个实例中,相同的陈旧 'web_sock_uuid' 连续返回 4 次。

如果我从 task 方法切换到 tx 方法,这需要递归调用,上面的代码会按预期工作,尽管它确实需要很多回调。

pg-promise tasks 只是共享连接。它们不会隐式创建交易。

要在 Postgres 中使用锁,您需要创建事务,因为在事务中创建的所有锁都会在事务结束时释放。如果您没有显式创建事务,则每个查询都将是其自己的单独事务。

当然,您不必使用tx方法。您可以使用 task,并自行管理交易。

我早就想通了,但我当时忘了 post 回来。 我的解决方案是使用 tx 方法,没有提供模式选项。 这给出了所需的操作。会详细解释一下,但确切的细节已经从我的脑海中消失了。