如何将消息保存到数据库并将响应发送到主题最终一致?

How to save message into database and send response into topic eventually consistent?

我有以下 rabbitMq 消费者:

Consumer consumer = new DefaultConsumer(channel) {
    @Override
     public void handleDelivery(String consumerTag, Envelope envelope, MQP.BasicProperties properties, byte[] body) throws IOException {
            String message = new String(body, "UTF-8");
            sendNotificationIntoTopic(message);
            saveIntoDatabase(message);
     }
};

可能会出现以下情况:

  1. 消息已成功发送到主题
  2. 与数据库的连接丢失,因此数据库插入失败。

因此我们的数据不一致。

预期结果要么两个动作都成功执行,要么两个都没有执行。

有什么解决办法吗?

P.S.

目前我有以下想法(欢迎评论)

我们可以假设代理不会丢失任何消息。

我们必须订阅要发送的主题。

  1. 将条目保存到数据库中并将字段 status 设置为值 'pending'
  2. 尝试向主题发送数据。如果发送成功 - 将字段 status 更新为值 'success'
  3. 我们必须有一个计划作业,该作业必须检查处于挂起状态的行。目前可能有2种情况:
    3.1 根本没有发送通知
    3.2 通知已发送但存入数据库失败(概率很低但有可能)

    所以我们必须以某种方式区分这两种情况:我们可以将来自主题的消息存储在集合中,作业可以检查消息是否被接受。因此,如果作业找到与数据库行对应的消息,我们必须将状态更新为 "success"。否则我们必须从数据库中删除条目。

我认为我的想法有一些弱点(例如,如果我们有多节点应用程序,我们必须将消息存储在 hazelcast(或类似物)中,但这是假设失败的额外点)

  1. 在侦听器中保存带有字段 staus='pending'
  2. 的数据库行
  3. 另一个作业(独立线程)将从数据库中获取所有待处理的行,并为每一行获取以下信息:
    2.1 发送数据到topic
    2.2 存入数据库

如果我们在第 1 步失败 - 一切正常 - 数据处于一致状态,因为作业不会知道关于该数据的任何信息

如果我们在步骤 2.1 上失败 - 没问题,下一个作业调用将尝试处理它

如果我们在步骤 2.2 上失败了 - 如果我们在这里失败了 - 这意味着下一个作业调用将再次处理相同的数据。乍一看你可以认为这是一个问题。但是您的消费者必须是幂等的——这意味着它必须了解消息已经被处理并跳过处理。此要求是所有消息代理都保证消息将至少传递一次的结果。因此,无论如何,我们的消费者都必须为重复的消息做好准备。没问题了。

如果有足够的时间修改设计,建议使用类似JTA的API来管理2phase commit。甚至 weblogic 和 WebSphere 也支持用于两阶段提交的 XA 资源。

如果timeline比较少,建议按照下面的方式来减少失败间隔。

  • 发送数据topic(不提交)(incase topic down, retry to be perform with a interval)
  • 将数据写入数据库
  • 提交数据库
  • 提交主题

这里只有第4步失败才会失败。这将导致再次发送相同的消息。所以接收系统会收到重复的消息。在JMS2.0 结构中,每条消息都有唯一的messageID 和CorrelationID。所以找到重复项有点简单(但这将在接收系统中处理)

这两种情况也适用于集群环境。


严格针对您的情况,认为以下步骤可能有助于解决您的问题

为您的主题订阅一个侦听器 listener-1。

进程 1

  • 为消息 msg-1
  • 添加状态为 'to be sent' 的数据库条目
  • 向主题发送消息msg-1。在任何主题失败的情况下重试发送 如果在某些重试后步骤 2 失败,process-1 必须在发送任何新消息之前重新发送 msg-1 或要回滚的步骤 1

监听器 1

  • 使用订阅的侦听器,从主题中读取引用(meesageID/correlationID),并将数据库状态更新为已发送,并从主题中读取 read/remove 消息。如果 reference-read 成功并且数据库更新失败,主题仍然有消息。所以下一次读取将更新数据库。 Incase 数据库更新成功但消息删除失败。听众将再次阅读并尝试更新已经完成的消息。所以验证后可以忽略。

万一听者自己宕机,topic会有消息,直到听者读到消息。在此之前,SENT 消息的状态为 'to be sent'.

这是我如何做的伪代码:(假设 dao 层具有事务处理能力而您的消息传递层没有)

    //Start a transaction
    try {
                String message = new String(body, "UTF-8");
               // Ordering is important here as I'm assuming the database has commit and rollback capabilities, but the messaging system doesnt. 
                saveIntoDatabase(message);
                sendNotificationIntoTopic(message);

    } catch (MessageDeliveryException) {
        // rollback the transaction
        // Throw a domain specific exception
    }
   //commit the transaction

场景:
1.如果数据库出现故障,消息将不会发送,因为异常会打断代码流程。
2.如果数据库调用成功,消息系统发送失败,捕获异常回滚数据库变更

记录和重放失败所需的所有操作都可以在此方法之外

这里有一个 尝试取消确认 模式 https://servicecomb.apache.org/docs/distributed_saga_3/ 的例子,它应该能够解决你的问题。您应该容忍通过队列重复提交数据的机会。这是一个例子:

  1. 定义抽象操作并为操作分配 ID 和时间戳。
  2. 将状态Pending写入数据库(与1相同的步骤即可)
  3. 编写一个侦听器,轮询数据库中所有状态为未决且早于 "timeout"
  4. 的操作
  5. 对于每个挂起的操作,通过具有分配 ID 的队列发送数据。
  6. 接收方应该知道 ID,如果 ID 已被处理,则不会发生任何事情。

6A。如果您需要 100% 确认操作已经完成,您需要第二个队列,接收方将在其中 post 一个消息 ID - 完成。如果不需要这种一致性,请跳过此步骤。或者它可以 post ID -Failed 失败原因。

6B。提交方要么等待来自 6A 的消息,要么通过将状态 DONE 写入数据库来完成操作。

  • 一旦 sertine 超时已过或某个重试限制已过。您将状态写入操作 FAIL。
  • 您可以通过 ID 回滚向收件人发送消息。

请注意,所有这些步骤都不涉及技术事务。您可以使用非事务性数据库执行此操作。

我写的是尝试取消确认模式的变体,其中每个消息接收者都应该知道如何管理自己的数据。