Spring 集成声明检查模式 - 失败时重新发送

Spring integration claim-check pattern - resend on failure

所以,我的设置是:

<int-amqp:inbound-channel-adapter channel="input-channel" queue-names="probni" message-converter="jsonMessageConverter"
                                  channel-transacted="true"
                                  transaction-manager="dataSourceTransactionManager"/>
<int:chain input-channel="input-channel" output-channel="oc2">
    <int:service-activator ref="h1" method="handle" />
    <int:service-activator ref="h2" method="handle" />
    <int:service-activator ref="h3" method="handle" />
    <int:splitter  />
    <int:claim-check-in message-store="messageStore" />
</int:chain>


<int:channel id="oc2">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="oc3" />

<int:claim-check-out message-store="messageStore" input-channel="oc2" output-channel="oc3" auto-startup="true" remove-message="true" />

<int-amqp:outbound-channel-adapter exchange-name="someexc" channel="oc3" amqp-template="rabbitTemplate" />

我希望能够在与 int-amqp:inbound-channel-adapter 相同的事务中将消息存储到 messageStore,然后有单独的线程实际将此消息发送到其他 amqp 交换。此外,如果第一个线程将消息保存在 messageStore 中,然后整个进程被终止,在这种情况下,重新启动 claim-check-out 应该知道它需要加载未发送的消息并将其发送到 oc3 通道。

还有一件事 - 如果我有多个 claim-check-in/out 工作人员共享同一个数据库 table,他们如何知道哪条消息属于哪个索赔检查工作人员(当我们有组件启动时来自不同索赔签入的多条消息)?

谢谢!

首先让我们再看一遍Claim-Check的描述:

In the above configuration the Message that is received on the input-channel will be persisted to the Message Store identified with the message-store attribute and indexed with generated ID. That ID is the Claim Check for that Message. The Claim Check will also become the payload of the new (transformed) Message that will be sent to the output-channel.

因此,在 Claim-Check-In 中进行转换后,我们有一个用于存储消息的 id,Claim-Check-Out 使用它来恢复存储的消息。

为此,您可以使用 QueueChannel 而不是具有持久性 MessageStore 的执行程序。

这是一个想法。

另一个就像是通过常规 <int-jdbc:inbound-channel-adapter> 真正轮询 INT_MESSAGE table MESSAGE_ID 列并将结果发送到 <claim-check-out>.