使 KnexJS 交易与 async/await 一起工作

Make KnexJS Transactions work with async/await

我正在尝试使用 async/await 和 knexjs 进行交易,但无济于事。

代码(片段是为了缩短post):

const updateOrder = (req, res, db, logger) => {
  let {
    status,
    trx_id,
    orNumber,
    returnReason
  } = req.body;

  const updateStatus = () => {
    return db('cart')
      .returning('*')
      .where('trx_id', '=', trx_id)
      .update({
        status: status,
        or_num: orNumber,
        return_reason: returnReason
      });
  }

  const updateDate = () => {
    return db('cart')
      .returning('*')
      .where('trx_id', '=', trx_id)
      .update({
        date_purchased: new Date()
      });
  }

  const selectItems = (order) => {
    return db
      .select('*')
      .from('cart_items')
      .where({
        cart_id: order.id,
        trx_id: order.trx_id
      });
  }

  const selectProduct = (item) => {
    const queries = [];
    item.forEach(item => {
      const query = db.select('*')
        .from('product')
        .where('item_code', '=', item.item_code);
      queries.push(query);
    })
    return Promise.all(queries);
  }

  const updateQuantity = (product, cart) => {
    const prodQuantity = product.map(product => parseInt(product.stock));
    const cartQuantity = cart.map(cart => parseInt(cart.quantity));
    const newQuantity = [];
    const queries = [];
    for (let i = 0; i < product.length; i++) {
      newQuantity.push(prodQuantity[i] - cartQuantity[i]);
    }
    cart.map((cart, index) => {
      const query = db('products')
        .returning('*')
        .where('item_code', '=', cart.item_code)
        .update({
          stock: newQuantity[index]
        })
      queries.push(query);
    })
    return queries;
  }

  const updateLogs = () => {
    return db('activity_order_logs')
      .returning('*')
      .insert({
        date: new Date(),
        employee_id: req.session.emp_id,
        module: "MONITORING",
        trx_id: trx_id,
        activity: status,
        or_num: orNumber
      })
  }

  const sendResponse = (result) => {
    if (result) {
      res.json({
        isSuccess: true
      });
      return;
    } else {
      res.json({
        isSuccess: false
      });
      return;
    }
  }

  (async() => {
    const first = await updateStatus();

    if (first[0].status == 'Success') {
      const second = await updateDate().catch(err => {
        throw err
      });

      const third = await selectItems(second[0]).catch(err => {
        throw err
      });

      const fourth = await selectProduct(third).catch(err => {
        throw err
      });
      const fourth2 = [].concat(...fourth);

      const fifth = await updateQuantity(fourth2, third)
      const decreaseStock = async() => {
        const finalResult = [];
        for (let i = 0; i < fifth.length; i++) {
          const finalQuery = await Promise.resolve(fifth[i]);
          finalResult.push(finalQuery);
        }
        return finalResult;
      };

      const result = await decreaseStock().catch(err => {
        throw err
      });
      const result2 = [].concat(...result);
      const logs = await updateLogs().catch(err => {
        throw err
      });
      const sendRes = await sendResponse(logs);

    } else if (first[0].status == 'Returned') {
      const logs = await updateLogs().catch(err => {
        throw err
      });
      const sendRes = await sendResponse(logs);
    } else {
      const logs = await updateLogs().catch(err => {
        throw err
      });
      const sendRes = await sendResponse(logs);
    }
  })().catch(err => {
    console.log(err);
    res.json({
      isSuccess: false
    })
  });
}

module.exports = {
  updateOrder
}

我尝试过的:

第一次尝试 - 错误,returns 交易被拒绝,没有错误

//knex is initialized as db
const createTransaction = () => {
  return new Promise((resolve) => {
    return db.transaction(resolve);
  });
};

(async() => {
  const trx = await createTransaction();
  const first = await updateStatus();

  if (first[0].status == 'Success') {
    const second = await updateDate().catch(err => {
      throw err
    });
 
    const third = await selectItems(second[0]).catch(err => {
      throw err
    });

    const fourth = await selectProduct(third).catch(err => {
      throw err
    });
    const fourth2 = [].concat(...fourth);

    const fifth = await updateQuantity(fourth2, third)
    const decreaseStock = async() => {
      const finalResult = [];
      for (let i = 0; i < fifth.length; i++) {
        const finalQuery = await Promise.resolve(fifth[i]);
        finalResult.push(finalQuery);
      }
      return finalResult;
    };

    const result = await decreaseStock().catch(err => {
      throw err
    });
    const result2 = [].concat(...result);
    const logs = await updateLogs().catch(err => {
      throw err
    });
    const sendRes = await sendResponse(logs);

  } else if (first[0].status == 'Returned') {
    const logs = await updateLogs().catch(err => {
      throw err
    });
    const sendRes = await sendResponse(logs);
  } else {
    const logs = await updateLogs().catch(err => {
      throw err
    });
    const sendRes = await sendResponse(logs);
  }

  trx.commit();

})().catch(err => {
  trx.rollback();
  console.log(err);
  res.json({
    isSuccess: false
  })
});

第二次尝试 - 结果:事务仍然提交并且没有回滚,即使存在故意错误。

//knex is initalized as db
(async() => {
  try {
    return await db.transaction(async trx => {
      const first = await updateStatus();

      if (first[0].status == 'Success') {
        const second = await updateDate().catch(err => {
          throw err
        });

        const third = await selectItems(second[0]).catch(err => {
          throw err
        });

        const fourth = await selectProduct(third).catch(err => {
          throw err
        });
        const fourth2 = [].concat(...fourth);

        const fifth = await updateQuantity(fourth2, third)
        const decreaseStock = async() => {
          const finalResult = [];
          for (let i = 0; i < fifth.length; i++) {
            const finalQuery = await Promise.resolve(fifth[i]);
            finalResult.push(finalQuery);
          }
          return finalResult;
        };

        const result = await decreaseStock().catch(err => {
          throw err
        });
        const result2 = [].concat(...result);
        const logs = await updateLogs().catch(err => {
          throw err
        });
        const sendRes = await sendResponse(logs);

      } else if (first[0].status == 'Returned') {
        const logs = await updateLogs().catch(err => {
          throw err
        });
        const sendRes = await sendResponse(logs);
      } else {
        const logs = await updateLogs().catch(err => {
          throw err
        });
        const sendRes = await sendResponse(logs);
      }
    })
  } catch (err) {
    console.log(err);
    res.json({
      isSuccess: false
    })
  }
})

看起来你正在创建事务,但你没有向它发送任何查询,而是向 knex 连接池中的其他数据库连接发送查询。

您应该如何使用 knex 进行交易:

async () {
  try {
    const trxResult = await db.transaction(async (trx) => {
      const queryResult = await trx('table').where(... etc. ...);
      // do some more queries to trx
    });
    console.log("transaction was committed");
  } catch (e) {
    console.log("transaction was rolled back");
  }
}

此外,在将问题发布到 Whosebug 之前,您应该尝试将代码量减少到最少。将太多代码隐藏到片段中根本没有帮助。