是否有针对 kafka 流应用程序中的不同输入主题使用不同的 auto.offset.reset 策略?
Is there anyway to use different auto.offset.reset strategy for different input topics in kafka streams app?
用例是:我有一个 kafka 流应用程序,它从输入主题消费,输出到中间主题,然后在相同的流中,另一个拓扑从这个中间主题消费。
每当更新应用程序ID时,这两个主题都会从最早开始到消费者。我想将中间主题的 auto.offset.reset 更改为最新,同时将输入主题的
保留为最早。
是的。您可以通过以下方式为每个主题设置重置策略:
// Processor API
topology.addSource(AutoOffsetReset offsetReset, String name, String... topics);
// DSL
builder.stream(String topic, Consumed.with(AutoOffsetReset offsetReset));
builder.table(String topic, Consumed.with(AutoOffsetReset offsetReset));
所有这些方法都有一些允许设置它的重载。
用例是:我有一个 kafka 流应用程序,它从输入主题消费,输出到中间主题,然后在相同的流中,另一个拓扑从这个中间主题消费。
每当更新应用程序ID时,这两个主题都会从最早开始到消费者。我想将中间主题的 auto.offset.reset 更改为最新,同时将输入主题的
保留为最早。是的。您可以通过以下方式为每个主题设置重置策略:
// Processor API
topology.addSource(AutoOffsetReset offsetReset, String name, String... topics);
// DSL
builder.stream(String topic, Consumed.with(AutoOffsetReset offsetReset));
builder.table(String topic, Consumed.with(AutoOffsetReset offsetReset));
所有这些方法都有一些允许设置它的重载。