重试 KafkaStreams 拓扑中的消息

Retry a message in KafkaStreams Topology

我有一个 kafkaStreams 拓扑,其中有一个处理器 API。在处理器内部,有调用外部API.

的逻辑

Incase API returns 503,已尝试的消息将需要重试。

现在,我正在尝试将此消息推送到不同的 kafka 主题并使用“Punctuate”方法每分钟从失败的主题中提取一批消息,重试。

这个问题有更好的way/approach吗?

一种不同但稳健的方法是使用状态存储。它们由 Kafka 作为压缩的更新日志主题支持。

你可以将失败的消息存储在状态存储中,并通过调用调度(punctuate)处理它们,然后删除所有成功处理的消息。

例如:

public class MyProcessor {

    private final long schedulerIntervalMs = 60000;
    private final String entityStoreName = "failed-message-store";
    private KeyValueStore<String, Object> entityStore;

    @Override
    public void init(ProcessorContext context) {
        this.entityStore = (KeyValueStore) context().getStateStore(entityStoreName);
        context().schedule(Duration.ofMillis(this.schedulerIntervalMs), PunctuationType.WALL_CLOCK_TIME,
                timestamp -> processFailedMessagesStore());
    }

    @Override
    public void process(String key, Object value) {
        boolean apiCallSuccessful = // call API

        if (!apiCallSuccesfull) {
            entityStore.put(key, value);
        }
    }

    private void processFailedMessagesStore() {
        try (KeyValueIterator<String, Object> allItems = entityStore.all()) {
            allItems.forEachRemaining(item -> {
                boolean successfullyProcessed = // re-process
                
                if (successfullyProcessed) {
                    entityStore.delete(item.key);
                }
            });
        }
    }
}