如何使用 kafka 集成在 spring 批处理中调用 StepExcecutionListener?

How to call StepExcecutionListener in spring batch with kafka integration?

下面是etl.xml

中job的配置

<batch:step id="Produce">

    <batch:partition partitioner="partitioner">

        <batch:handler grid-size="${ partitioner.limit}"></batch:handler>

        <batch:step>

            <batch:tasklet>

                <batch:chunk reader="Reader" writer="kafkaProducer"
                             commit-interval="20000">

                </batch:chunk>

                <batch:listeners>

                    <batch:listener ref="producingListener" />

                    
                </batch:listeners>

            </batch:tasklet>

        </batch:step>

    </batch:partition>

</batch:step>

下面是用于向主题发送消息的代码。

ListenableFuture> listenableFuture = kafkaTemplate.send(消息);

listenableFuture.addCallback(new ListenableFutureCallback>() {

@Override
public void onSuccess(SendResult<String, message > result) {
    log.info("marking as SUCCESS");
    manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
}

@Override
public void onFailure(Throwable ex) {
    log.info("marking as FAILURE");
    manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
}

}

一旦 kafkaTemplate.send(message) 被执行,侦听器被调用并且作业完成。我看到 onSuccess(), onFailure() 被调用 post 作业完成。 如何更改作业的配置,以便在收到来自 kafka 主题的确认后调用侦听器?

Would you like to some code example of what you suggested to block waiting for future. That might be of help.

我没有尝试以下但我的想法是:

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(message);
try {
    SendResult<String, Message> sendResult = future.get();
    // inspect sendResult
    log.info("marking as SUCCESS");
    manager.updateStatus(“someTable”, KafkaResponse.SUCCESS);
} catch (Exception e) {
     log.info("marking as FAILURE");
     manager.updateKafkaStatus(someTable, KafkaResponse.FAILURE);
     // do something with e
}