重试 KafkaStreams 拓扑中的消息
Retry a message in KafkaStreams Topology
我有一个 kafkaStreams 拓扑,其中有一个处理器 API。在处理器内部,有调用外部API.
的逻辑
Incase API returns 503,已尝试的消息将需要重试。
现在,我正在尝试将此消息推送到不同的 kafka 主题并使用“Punctuate”方法每分钟从失败的主题中提取一批消息,重试。
这个问题有更好的way/approach吗?
一种不同但稳健的方法是使用状态存储。它们由 Kafka 作为压缩的更新日志主题支持。
你可以将失败的消息存储在状态存储中,并通过调用调度(punctuate)处理它们,然后删除所有成功处理的消息。
例如:
public class MyProcessor {
private final long schedulerIntervalMs = 60000;
private final String entityStoreName = "failed-message-store";
private KeyValueStore<String, Object> entityStore;
@Override
public void init(ProcessorContext context) {
this.entityStore = (KeyValueStore) context().getStateStore(entityStoreName);
context().schedule(Duration.ofMillis(this.schedulerIntervalMs), PunctuationType.WALL_CLOCK_TIME,
timestamp -> processFailedMessagesStore());
}
@Override
public void process(String key, Object value) {
boolean apiCallSuccessful = // call API
if (!apiCallSuccesfull) {
entityStore.put(key, value);
}
}
private void processFailedMessagesStore() {
try (KeyValueIterator<String, Object> allItems = entityStore.all()) {
allItems.forEachRemaining(item -> {
boolean successfullyProcessed = // re-process
if (successfullyProcessed) {
entityStore.delete(item.key);
}
});
}
}
}
我有一个 kafkaStreams 拓扑,其中有一个处理器 API。在处理器内部,有调用外部API.
的逻辑Incase API returns 503,已尝试的消息将需要重试。
现在,我正在尝试将此消息推送到不同的 kafka 主题并使用“Punctuate”方法每分钟从失败的主题中提取一批消息,重试。
这个问题有更好的way/approach吗?
一种不同但稳健的方法是使用状态存储。它们由 Kafka 作为压缩的更新日志主题支持。
你可以将失败的消息存储在状态存储中,并通过调用调度(punctuate)处理它们,然后删除所有成功处理的消息。
例如:
public class MyProcessor {
private final long schedulerIntervalMs = 60000;
private final String entityStoreName = "failed-message-store";
private KeyValueStore<String, Object> entityStore;
@Override
public void init(ProcessorContext context) {
this.entityStore = (KeyValueStore) context().getStateStore(entityStoreName);
context().schedule(Duration.ofMillis(this.schedulerIntervalMs), PunctuationType.WALL_CLOCK_TIME,
timestamp -> processFailedMessagesStore());
}
@Override
public void process(String key, Object value) {
boolean apiCallSuccessful = // call API
if (!apiCallSuccesfull) {
entityStore.put(key, value);
}
}
private void processFailedMessagesStore() {
try (KeyValueIterator<String, Object> allItems = entityStore.all()) {
allItems.forEachRemaining(item -> {
boolean successfullyProcessed = // re-process
if (successfullyProcessed) {
entityStore.delete(item.key);
}
});
}
}
}