如何使用每个服务 Spring Cloud Stream Kafka 和数据库实现微服务事件驱动架构
How to implement a microservice Event Driven architecture with Spring Cloud Stream Kafka and Database per service
我正在尝试实现一个事件驱动的架构来处理分布式事务。每个服务都有自己的数据库,并使用 Kafka 发送消息通知其他微服务有关操作。
一个例子:
Order service -------> | Kafka |------->Payment Service
| |
Orders MariaDB DB Payment MariaDB Database
订单收到订单请求。它必须将新订单存储在其数据库中并发布一条消息,以便支付服务意识到它必须对商品收费:
private OrderBusiness orderBusiness;
@PostMapping
public Order createOrder(@RequestBody Order order){
logger.debug("createOrder()");
//a.- Save the order in the DB
orderBusiness.createOrder(order);
//b. Publish in the topic so that Payment Service charges for the item.
try{
orderSource.output().send(MessageBuilder.withPayload(order).build());
}catch(Exception e){
logger.error("{}", e);
}
return order;
}
这些是我的疑惑:
- 步骤 a.-(保存在订单数据库中)和 b.-(发布消息)应该在事务中以原子方式执行。我怎样才能做到这一点?
- 这与上一个相关:我发送消息时使用:orderSource.output().send(MessageBuilder.withPayload(order).build());此操作是异步的,并且始终 returns 为真,无论 Kafka 代理是否关闭。我怎么知道消息已经到达 Kafka broker?
Steps a.- (save in Order DB) and b.- (publish the message) should be
performed in a transaction, atomically. How can I achieve that?
Kafka 目前不支持事务(因此也不支持回滚或提交),您需要同步此类事务。简而言之:你不能做你想做的事。这将在 near-ish 未来发生变化,届时您的系统 KIP-98 is merged, but that might take some time yet. Also, even with transactions in Kafka, an atomic transaction across two systems is a very hard thing to do, everything that follows will only be improved upon by transactional support in Kafka, it will still not entirely solve your issue. For that you would need to look into implementing some form of two phase commit。
您可以通过配置生产者属性来接近一些,但最终您将不得不在至少一次或最多一次[=60]之间做出选择=] 用于您的系统之一(MariaDB 或 Kafka)。
让我们从您可以在 Kafka 中做什么来确保消息的传递开始,然后我们将深入探讨您对整个流程的选择以及结果是什么。
保证送达
您可以配置在使用参数 acks 将请求 return 发送给您之前,有多少经纪人必须确认收到您的消息:通过将此设置为all 你告诉代理等到所有副本都确认了你的消息,然后再 return 给你答复。这仍然不能 100% 保证您的消息不会丢失,因为它只是被写入页面缓存,并且在理论上存在代理在将其持久保存到磁盘之前发生故障的情况,消息可能仍然会丢失。但这是您将要得到的最好的保证。
您可以通过降低代理强制 fsync 到磁盘的间隔来进一步降低数据丢失的风险 (emphasized text and/or flush.ms) 但请注意,这些值可能会带来严重的性能损失。
除了这些设置之外,您还需要等待您的 Kafka 生产者 return 对您的请求的响应,并检查是否发生异常。这种关系与您问题的第二部分有关,因此我将进一步探讨。
如果响应是干净的,您可以尽可能确定您的数据已到达 Kafka 并开始担心 MariaDB。
到目前为止我们所涵盖的一切都只解决了如何确保 Kafka 收到你的消息,但你还需要将数据写入 MariaDB,这也可能会失败,这将有必要召回你可能的消息已经发送到 Kafka - 你不能这样做。
所以基本上你需要选择一个你能够更好地处理 duplicates/missing 值的系统(取决于你是否重新发送部分失败)并且这将影响你做事的顺序。
选项 1
在此选项中,您在 MariaDB 中初始化一个事务,然后将消息发送到 Kafka,等待响应,如果发送成功,您将在 MariaDB 中提交事务。如果发送到 Kafka 失败,您可以在 MariaDB 中回滚您的事务,一切都很好。
但是,如果发送到 Kafka 成功,而您对 MariaDB 的提交由于某种原因失败,则无法从 Kafka 取回消息。因此,如果您稍后重新发送所有内容,您将在 MariaDB 中丢失一条消息,或者在 Kafka 中有一条重复消息。
选项 2
这几乎正好相反,但您可能更能删除用 MariaDB 编写的消息,具体取决于您的数据模型。
当然,您可以通过跟踪失败的发送并稍后重试这些方法来缓解这两种方法,但所有这些更像是解决更大问题的创可贴。
我个人会选择方法 1,因为提交失败的可能性应该比发送本身小一些,并在 Kafka 的另一端实施某种欺骗检查。
This is related to the previous one: I send the message with:
orderSource.output().send(MessageBuilder.withPayload(order).build());
This operations is asynchronous and ALWAYS returns true, no matter if
the Kafka broker is down. How can I know that the message has reached
the Kafka broker?
首先,我承认我不熟悉 Spring,因此这可能对您没有用,但以下代码片段说明了一种检查生成响应是否有异常的方法。
通过调用 flush,您将阻塞直到所有发送都完成(并且失败或成功),然后检查结果。
Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();
for(MessageType message : messages){
producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exceptionList.add(exception);
}
}
});
}
producer.flush();
if (!exceptionList.isEmpty()) {
// do stuff
}
我认为实现事件溯源的正确方法是让 Kafka 直接从一个插件推送的事件中填充,该插件从 RDBMS binlog 中读取,例如使用 Confluent BottledWater (https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/) or more active Debezium (http://debezium.io/)。然后消费微服务可以监听这些事件,消费它们并作用于它们各自的数据库,最终与 RDBMS 数据库保持一致。
在这里查看我的完整回答以获取指南:
我正在尝试实现一个事件驱动的架构来处理分布式事务。每个服务都有自己的数据库,并使用 Kafka 发送消息通知其他微服务有关操作。
一个例子:
Order service -------> | Kafka |------->Payment Service
| |
Orders MariaDB DB Payment MariaDB Database
订单收到订单请求。它必须将新订单存储在其数据库中并发布一条消息,以便支付服务意识到它必须对商品收费:
private OrderBusiness orderBusiness;
@PostMapping
public Order createOrder(@RequestBody Order order){
logger.debug("createOrder()");
//a.- Save the order in the DB
orderBusiness.createOrder(order);
//b. Publish in the topic so that Payment Service charges for the item.
try{
orderSource.output().send(MessageBuilder.withPayload(order).build());
}catch(Exception e){
logger.error("{}", e);
}
return order;
}
这些是我的疑惑:
- 步骤 a.-(保存在订单数据库中)和 b.-(发布消息)应该在事务中以原子方式执行。我怎样才能做到这一点?
- 这与上一个相关:我发送消息时使用:orderSource.output().send(MessageBuilder.withPayload(order).build());此操作是异步的,并且始终 returns 为真,无论 Kafka 代理是否关闭。我怎么知道消息已经到达 Kafka broker?
Steps a.- (save in Order DB) and b.- (publish the message) should be performed in a transaction, atomically. How can I achieve that?
Kafka 目前不支持事务(因此也不支持回滚或提交),您需要同步此类事务。简而言之:你不能做你想做的事。这将在 near-ish 未来发生变化,届时您的系统 KIP-98 is merged, but that might take some time yet. Also, even with transactions in Kafka, an atomic transaction across two systems is a very hard thing to do, everything that follows will only be improved upon by transactional support in Kafka, it will still not entirely solve your issue. For that you would need to look into implementing some form of two phase commit。
您可以通过配置生产者属性来接近一些,但最终您将不得不在至少一次或最多一次[=60]之间做出选择=] 用于您的系统之一(MariaDB 或 Kafka)。
让我们从您可以在 Kafka 中做什么来确保消息的传递开始,然后我们将深入探讨您对整个流程的选择以及结果是什么。
保证送达
您可以配置在使用参数 acks 将请求 return 发送给您之前,有多少经纪人必须确认收到您的消息:通过将此设置为all 你告诉代理等到所有副本都确认了你的消息,然后再 return 给你答复。这仍然不能 100% 保证您的消息不会丢失,因为它只是被写入页面缓存,并且在理论上存在代理在将其持久保存到磁盘之前发生故障的情况,消息可能仍然会丢失。但这是您将要得到的最好的保证。 您可以通过降低代理强制 fsync 到磁盘的间隔来进一步降低数据丢失的风险 (emphasized text and/or flush.ms) 但请注意,这些值可能会带来严重的性能损失。
除了这些设置之外,您还需要等待您的 Kafka 生产者 return 对您的请求的响应,并检查是否发生异常。这种关系与您问题的第二部分有关,因此我将进一步探讨。 如果响应是干净的,您可以尽可能确定您的数据已到达 Kafka 并开始担心 MariaDB。
到目前为止我们所涵盖的一切都只解决了如何确保 Kafka 收到你的消息,但你还需要将数据写入 MariaDB,这也可能会失败,这将有必要召回你可能的消息已经发送到 Kafka - 你不能这样做。
所以基本上你需要选择一个你能够更好地处理 duplicates/missing 值的系统(取决于你是否重新发送部分失败)并且这将影响你做事的顺序。
选项 1
在此选项中,您在 MariaDB 中初始化一个事务,然后将消息发送到 Kafka,等待响应,如果发送成功,您将在 MariaDB 中提交事务。如果发送到 Kafka 失败,您可以在 MariaDB 中回滚您的事务,一切都很好。 但是,如果发送到 Kafka 成功,而您对 MariaDB 的提交由于某种原因失败,则无法从 Kafka 取回消息。因此,如果您稍后重新发送所有内容,您将在 MariaDB 中丢失一条消息,或者在 Kafka 中有一条重复消息。
选项 2
这几乎正好相反,但您可能更能删除用 MariaDB 编写的消息,具体取决于您的数据模型。
当然,您可以通过跟踪失败的发送并稍后重试这些方法来缓解这两种方法,但所有这些更像是解决更大问题的创可贴。
我个人会选择方法 1,因为提交失败的可能性应该比发送本身小一些,并在 Kafka 的另一端实施某种欺骗检查。
This is related to the previous one: I send the message with: orderSource.output().send(MessageBuilder.withPayload(order).build()); This operations is asynchronous and ALWAYS returns true, no matter if the Kafka broker is down. How can I know that the message has reached the Kafka broker?
首先,我承认我不熟悉 Spring,因此这可能对您没有用,但以下代码片段说明了一种检查生成响应是否有异常的方法。 通过调用 flush,您将阻塞直到所有发送都完成(并且失败或成功),然后检查结果。
Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();
for(MessageType message : messages){
producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exceptionList.add(exception);
}
}
});
}
producer.flush();
if (!exceptionList.isEmpty()) {
// do stuff
}
我认为实现事件溯源的正确方法是让 Kafka 直接从一个插件推送的事件中填充,该插件从 RDBMS binlog 中读取,例如使用 Confluent BottledWater (https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/) or more active Debezium (http://debezium.io/)。然后消费微服务可以监听这些事件,消费它们并作用于它们各自的数据库,最终与 RDBMS 数据库保持一致。
在这里查看我的完整回答以获取指南: