Spring 批量集成消费 RabbitMq 消息
Spring Batch Integration consuming RabbitMq Message
我目前正在使用 IntegrationFlow 在 RabbitMq 消息到达队列时触发作业执行。 IntegrationFlow 的 AmqpInboundChannelAdapter 和作业第一步的 ItemReader 都配置为从同一队列读取消息。
我遇到的问题是 IntegrationFlow 的 AmqpInboundChannelAdapter 读取 RabbitMQ 消息,然后 ItemReader 无法再找到该消息。可能是因为 IntegrationFlow 在作业启动之前确认了消息。
有没有办法让 IntegrationFlow 远离 consuming/acknowledging 消息,将其留在队列中,以便 ItemReader 可以作为 intendend 工作?我尝试配置 AmqpInboundChannelAdapter 以重新排队消息,但这只会导致适配器无限循环重新读取它自己的消息。
这个问题在某种程度上描述了我的问题,除了我没有做任何处理,我只是试图将 IntegrationFlow 用作 JobLaunching 触发器。所以解决方案似乎是一种反模式。
如有任何帮助,我们将不胜感激
如果批处理作业只需要来自一条消息的信息,我建议绑定第二个队列,使用相同的路由键;一个队列用于触发器,一个队列用于项目 reader.
如果第一条消息是触发器,然后项目 reader 读取多条消息,您可以将消息内容添加到 JobParameters
。您还需要将适配器的预取设置为 1,以便在处理此消息时不会发送任何其他消息。
我目前正在使用 IntegrationFlow 在 RabbitMq 消息到达队列时触发作业执行。 IntegrationFlow 的 AmqpInboundChannelAdapter 和作业第一步的 ItemReader 都配置为从同一队列读取消息。
我遇到的问题是 IntegrationFlow 的 AmqpInboundChannelAdapter 读取 RabbitMQ 消息,然后 ItemReader 无法再找到该消息。可能是因为 IntegrationFlow 在作业启动之前确认了消息。
有没有办法让 IntegrationFlow 远离 consuming/acknowledging 消息,将其留在队列中,以便 ItemReader 可以作为 intendend 工作?我尝试配置 AmqpInboundChannelAdapter 以重新排队消息,但这只会导致适配器无限循环重新读取它自己的消息。
这个问题在某种程度上描述了我的问题,除了我没有做任何处理,我只是试图将 IntegrationFlow 用作 JobLaunching 触发器。所以解决方案似乎是一种反模式。
如有任何帮助,我们将不胜感激
如果批处理作业只需要来自一条消息的信息,我建议绑定第二个队列,使用相同的路由键;一个队列用于触发器,一个队列用于项目 reader.
如果第一条消息是触发器,然后项目 reader 读取多条消息,您可以将消息内容添加到 JobParameters
。您还需要将适配器的预取设置为 1,以便在处理此消息时不会发送任何其他消息。