在编写器步骤中使用异步调用时如何确保 Spring 批处理作业完成

how to ensure Spring batch job completion when using async call in writer step

我的 spring 批处理在读取步骤中从数据库中读取数百万行, 并在写入步骤中将其发布到 kafka 主题。

在我工作的公司中,我们在 kafka 上使用包装器,我目前正在调用一个 sendAsync() 方法,该方法接收 eventSupplier 消息和回调:org.apache.kafka.clients.producer.Callback

我的问题是,因为它是一个批处理,所以我担心作业执行会在我获得关于以异步模式发送的所有消息的指示之前结束。 (可能在最后一条消息中抛出了异常)

是否有 spring 批处理平台支持的方法来确保步骤完成? 或者我应该只在写入步骤中实现 Future.isDone() 吗? 我在 spring 批处理文档中看到有一个 KafkaItemWriter 我应该在我的案例中使用它吗?我目前正在实施 ItemWriter 谢谢

Spring 提供的 KafkaItemWriter 批处理等待对 return 的异步调用(直到可配置的超时)。这是为了确保写入操作成功或失败。

如果您正在编写自定义 ItemWriter,您可以从中获得灵感。