如何将消息保存到数据库并将响应发送到主题最终一致?
How to save message into database and send response into topic eventually consistent?
我有以下 rabbitMq 消费者:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
sendNotificationIntoTopic(message);
saveIntoDatabase(message);
}
};
可能会出现以下情况:
- 消息已成功发送到主题
- 与数据库的连接丢失,因此数据库插入失败。
因此我们的数据不一致。
预期结果要么两个动作都成功执行,要么两个都没有执行。
有什么解决办法吗?
P.S.
目前我有以下想法(欢迎评论)
我们可以假设代理不会丢失任何消息。
我们必须订阅要发送的主题。
- 将条目保存到数据库中并将字段
status
设置为值 'pending'
- 尝试向主题发送数据。如果发送成功 - 将字段
status
更新为值 'success'
我们必须有一个计划作业,该作业必须检查处于挂起状态的行。目前可能有2种情况:
3.1 根本没有发送通知
3.2 通知已发送但存入数据库失败(概率很低但有可能)
所以我们必须以某种方式区分这两种情况:我们可以将来自主题的消息存储在集合中,作业可以检查消息是否被接受。因此,如果作业找到与数据库行对应的消息,我们必须将状态更新为 "success"。否则我们必须从数据库中删除条目。
我认为我的想法有一些弱点(例如,如果我们有多节点应用程序,我们必须将消息存储在 hazelcast(或类似物)中,但这是假设失败的额外点)
- 在侦听器中保存带有字段 staus='pending'
的数据库行
- 另一个作业(独立线程)将从数据库中获取所有待处理的行,并为每一行获取以下信息:
2.1 发送数据到topic
2.2 存入数据库
如果我们在第 1 步失败 - 一切正常 - 数据处于一致状态,因为作业不会知道关于该数据的任何信息
如果我们在步骤 2.1 上失败 - 没问题,下一个作业调用将尝试处理它
如果我们在步骤 2.2 上失败了 - 如果我们在这里失败了 - 这意味着下一个作业调用将再次处理相同的数据。乍一看你可以认为这是一个问题。但是您的消费者必须是幂等的——这意味着它必须了解消息已经被处理并跳过处理。此要求是所有消息代理都保证消息将至少传递一次的结果。因此,无论如何,我们的消费者都必须为重复的消息做好准备。没问题了。
如果有足够的时间修改设计,建议使用类似JTA的API来管理2phase commit。甚至 weblogic 和 WebSphere 也支持用于两阶段提交的 XA 资源。
如果timeline比较少,建议按照下面的方式来减少失败间隔。
- 发送数据topic(不提交)(incase topic down, retry to be perform with a interval)
- 将数据写入数据库
- 提交数据库
- 提交主题
这里只有第4步失败才会失败。这将导致再次发送相同的消息。所以接收系统会收到重复的消息。在JMS2.0 结构中,每条消息都有唯一的messageID 和CorrelationID。所以找到重复项有点简单(但这将在接收系统中处理)
这两种情况也适用于集群环境。
严格针对您的情况,认为以下步骤可能有助于解决您的问题
为您的主题订阅一个侦听器 listener-1。
进程 1
- 为消息 msg-1
添加状态为 'to be sent' 的数据库条目
- 向主题发送消息msg-1。在任何主题失败的情况下重试发送
如果在某些重试后步骤 2 失败,process-1 必须在发送任何新消息之前重新发送 msg-1 或要回滚的步骤 1
监听器 1
- 使用订阅的侦听器,从主题中读取引用(meesageID/correlationID),并将数据库状态更新为已发送,并从主题中读取 read/remove 消息。如果 reference-read 成功并且数据库更新失败,主题仍然有消息。所以下一次读取将更新数据库。 Incase 数据库更新成功但消息删除失败。听众将再次阅读并尝试更新已经完成的消息。所以验证后可以忽略。
万一听者自己宕机,topic会有消息,直到听者读到消息。在此之前,SENT 消息的状态为 'to be sent'.
这是我如何做的伪代码:(假设 dao 层具有事务处理能力而您的消息传递层没有)
//Start a transaction
try {
String message = new String(body, "UTF-8");
// Ordering is important here as I'm assuming the database has commit and rollback capabilities, but the messaging system doesnt.
saveIntoDatabase(message);
sendNotificationIntoTopic(message);
} catch (MessageDeliveryException) {
// rollback the transaction
// Throw a domain specific exception
}
//commit the transaction
场景:
1.如果数据库出现故障,消息将不会发送,因为异常会打断代码流程。
2.如果数据库调用成功,消息系统发送失败,捕获异常回滚数据库变更
记录和重放失败所需的所有操作都可以在此方法之外
这里有一个 尝试取消确认 模式 https://servicecomb.apache.org/docs/distributed_saga_3/ 的例子,它应该能够解决你的问题。您应该容忍通过队列重复提交数据的机会。这是一个例子:
- 定义抽象操作并为操作分配 ID 和时间戳。
- 将状态Pending写入数据库(与1相同的步骤即可)
- 编写一个侦听器,轮询数据库中所有状态为未决且早于 "timeout"
的操作
- 对于每个挂起的操作,通过具有分配 ID 的队列发送数据。
- 接收方应该知道 ID,如果 ID 已被处理,则不会发生任何事情。
6A。如果您需要 100% 确认操作已经完成,您需要第二个队列,接收方将在其中 post 一个消息 ID - 完成。如果不需要这种一致性,请跳过此步骤。或者它可以 post ID -Failed 失败原因。
6B。提交方要么等待来自 6A 的消息,要么通过将状态 DONE 写入数据库来完成操作。
- 一旦 sertine 超时已过或某个重试限制已过。您将状态写入操作 FAIL。
- 您可以通过 ID 回滚向收件人发送消息。
请注意,所有这些步骤都不涉及技术事务。您可以使用非事务性数据库执行此操作。
我写的是尝试取消确认模式的变体,其中每个消息接收者都应该知道如何管理自己的数据。
我有以下 rabbitMq 消费者:
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
sendNotificationIntoTopic(message);
saveIntoDatabase(message);
}
};
可能会出现以下情况:
- 消息已成功发送到主题
- 与数据库的连接丢失,因此数据库插入失败。
因此我们的数据不一致。
预期结果要么两个动作都成功执行,要么两个都没有执行。
有什么解决办法吗?
P.S.
目前我有以下想法(欢迎评论)
我们可以假设代理不会丢失任何消息。
我们必须订阅要发送的主题。
- 将条目保存到数据库中并将字段
status
设置为值 'pending' - 尝试向主题发送数据。如果发送成功 - 将字段
status
更新为值 'success' 我们必须有一个计划作业,该作业必须检查处于挂起状态的行。目前可能有2种情况:
3.1 根本没有发送通知
3.2 通知已发送但存入数据库失败(概率很低但有可能)所以我们必须以某种方式区分这两种情况:我们可以将来自主题的消息存储在集合中,作业可以检查消息是否被接受。因此,如果作业找到与数据库行对应的消息,我们必须将状态更新为 "success"。否则我们必须从数据库中删除条目。
我认为我的想法有一些弱点(例如,如果我们有多节点应用程序,我们必须将消息存储在 hazelcast(或类似物)中,但这是假设失败的额外点)
- 在侦听器中保存带有字段 staus='pending' 的数据库行
- 另一个作业(独立线程)将从数据库中获取所有待处理的行,并为每一行获取以下信息:
2.1 发送数据到topic
2.2 存入数据库
如果我们在第 1 步失败 - 一切正常 - 数据处于一致状态,因为作业不会知道关于该数据的任何信息
如果我们在步骤 2.1 上失败 - 没问题,下一个作业调用将尝试处理它
如果我们在步骤 2.2 上失败了 - 如果我们在这里失败了 - 这意味着下一个作业调用将再次处理相同的数据。乍一看你可以认为这是一个问题。但是您的消费者必须是幂等的——这意味着它必须了解消息已经被处理并跳过处理。此要求是所有消息代理都保证消息将至少传递一次的结果。因此,无论如何,我们的消费者都必须为重复的消息做好准备。没问题了。
如果有足够的时间修改设计,建议使用类似JTA的API来管理2phase commit。甚至 weblogic 和 WebSphere 也支持用于两阶段提交的 XA 资源。
如果timeline比较少,建议按照下面的方式来减少失败间隔。
- 发送数据topic(不提交)(incase topic down, retry to be perform with a interval)
- 将数据写入数据库
- 提交数据库
- 提交主题
这里只有第4步失败才会失败。这将导致再次发送相同的消息。所以接收系统会收到重复的消息。在JMS2.0 结构中,每条消息都有唯一的messageID 和CorrelationID。所以找到重复项有点简单(但这将在接收系统中处理)
这两种情况也适用于集群环境。
严格针对您的情况,认为以下步骤可能有助于解决您的问题
为您的主题订阅一个侦听器 listener-1。
进程 1
- 为消息 msg-1 添加状态为 'to be sent' 的数据库条目
- 向主题发送消息msg-1。在任何主题失败的情况下重试发送 如果在某些重试后步骤 2 失败,process-1 必须在发送任何新消息之前重新发送 msg-1 或要回滚的步骤 1
监听器 1
- 使用订阅的侦听器,从主题中读取引用(meesageID/correlationID),并将数据库状态更新为已发送,并从主题中读取 read/remove 消息。如果 reference-read 成功并且数据库更新失败,主题仍然有消息。所以下一次读取将更新数据库。 Incase 数据库更新成功但消息删除失败。听众将再次阅读并尝试更新已经完成的消息。所以验证后可以忽略。
万一听者自己宕机,topic会有消息,直到听者读到消息。在此之前,SENT 消息的状态为 'to be sent'.
这是我如何做的伪代码:(假设 dao 层具有事务处理能力而您的消息传递层没有)
//Start a transaction
try {
String message = new String(body, "UTF-8");
// Ordering is important here as I'm assuming the database has commit and rollback capabilities, but the messaging system doesnt.
saveIntoDatabase(message);
sendNotificationIntoTopic(message);
} catch (MessageDeliveryException) {
// rollback the transaction
// Throw a domain specific exception
}
//commit the transaction
场景:
1.如果数据库出现故障,消息将不会发送,因为异常会打断代码流程。
2.如果数据库调用成功,消息系统发送失败,捕获异常回滚数据库变更
记录和重放失败所需的所有操作都可以在此方法之外
这里有一个 尝试取消确认 模式 https://servicecomb.apache.org/docs/distributed_saga_3/ 的例子,它应该能够解决你的问题。您应该容忍通过队列重复提交数据的机会。这是一个例子:
- 定义抽象操作并为操作分配 ID 和时间戳。
- 将状态Pending写入数据库(与1相同的步骤即可)
- 编写一个侦听器,轮询数据库中所有状态为未决且早于 "timeout" 的操作
- 对于每个挂起的操作,通过具有分配 ID 的队列发送数据。
- 接收方应该知道 ID,如果 ID 已被处理,则不会发生任何事情。
6A。如果您需要 100% 确认操作已经完成,您需要第二个队列,接收方将在其中 post 一个消息 ID - 完成。如果不需要这种一致性,请跳过此步骤。或者它可以 post ID -Failed 失败原因。
6B。提交方要么等待来自 6A 的消息,要么通过将状态 DONE 写入数据库来完成操作。
- 一旦 sertine 超时已过或某个重试限制已过。您将状态写入操作 FAIL。
- 您可以通过 ID 回滚向收件人发送消息。
请注意,所有这些步骤都不涉及技术事务。您可以使用非事务性数据库执行此操作。
我写的是尝试取消确认模式的变体,其中每个消息接收者都应该知道如何管理自己的数据。