Spring 集成确认消息处理返回到以前的服务
Spring integration acknowledge message processing back to previous service
在 Spring 集成中,我有一个服务链,如下所示:
message -> A -> B -> C -> D -> ... -> output
这很好用。我想让每个服务异步并使它们变得悲观。他们每个人都会收到一条消息,对其进行处理并将其发送到链中的下一个服务。但是,它不会等到整个链完成。它将继续处理下一条消息,依此类推。此处为标准异步。
但是,假设服务 B 比 A 慢,并且它在其入站通道队列中累积了 10k 条消息,这时系统崩溃了。我希望能够通过找出我离开的位置并重新处理消息来恢复系统。出于这个原因,我希望每个服务都知道它处理的哪些消息已被以下服务成功使用。已发送与已处理的区别。
我的想法是像这样(花哨的 ascii):
-> A --> B -> C -> ...
^ |
| ack |
\-----/
也就是说,A 将发送给 B,B 将进行处理,当它成功完成后,它将向 A 发送一个确认。然后 A 将从存储中删除该特定消息,以便下次运行时,它不会重新处理它。我以为我会在 B 之后放置一个拆分器,它将在服务 A 上调用不同的方法(即 ackProcessed
)。
这是在 SI 中应该如何完成的,还是我缺少的另一种方式?我主要是想确认我没有遗漏开箱即用的支持或不会强迫我在每个服务之后创建分离器的东西。
它不会是分离器;更有可能是发布-订阅通道和 ack 可能想要转到 A 中的不同方法(即引用相同 bean、不同方法的不同服务激活器;并且这些方法共享一些状态)。
更简单的解决方案是使用持久消息通道(例如 JMS、RabbitMQ 或消息存储支持的 QueueChannel)。这样框架就会为你处理好一切。
在 Spring 集成中,我有一个服务链,如下所示:
message -> A -> B -> C -> D -> ... -> output
这很好用。我想让每个服务异步并使它们变得悲观。他们每个人都会收到一条消息,对其进行处理并将其发送到链中的下一个服务。但是,它不会等到整个链完成。它将继续处理下一条消息,依此类推。此处为标准异步。
但是,假设服务 B 比 A 慢,并且它在其入站通道队列中累积了 10k 条消息,这时系统崩溃了。我希望能够通过找出我离开的位置并重新处理消息来恢复系统。出于这个原因,我希望每个服务都知道它处理的哪些消息已被以下服务成功使用。已发送与已处理的区别。
我的想法是像这样(花哨的 ascii):
-> A --> B -> C -> ...
^ |
| ack |
\-----/
也就是说,A 将发送给 B,B 将进行处理,当它成功完成后,它将向 A 发送一个确认。然后 A 将从存储中删除该特定消息,以便下次运行时,它不会重新处理它。我以为我会在 B 之后放置一个拆分器,它将在服务 A 上调用不同的方法(即 ackProcessed
)。
这是在 SI 中应该如何完成的,还是我缺少的另一种方式?我主要是想确认我没有遗漏开箱即用的支持或不会强迫我在每个服务之后创建分离器的东西。
它不会是分离器;更有可能是发布-订阅通道和 ack 可能想要转到 A 中的不同方法(即引用相同 bean、不同方法的不同服务激活器;并且这些方法共享一些状态)。
更简单的解决方案是使用持久消息通道(例如 JMS、RabbitMQ 或消息存储支持的 QueueChannel)。这样框架就会为你处理好一切。