Kafka SpringBoot StreamListener - 如何按顺序消费多个主题?

Kafka SpringBoot StreamListener - how to consume multiple topics in order?

我有多个使用不同主题的 StreamListener 注释方法。但是其中一些主题需要从 "earliest" 偏移量中读取以填充内存映射(类似于状态机),然后从其他主题中消费,这些主题中可能包含应该针对 "latest"状态机。

当前代码类似于:

@Component
@AllArgsConstructor
@EnableBinding({InputChannel.class, OutputChannel.class})
@Slf4j
public class KafkaListener {

    @StreamListener(target = InputChannel.EVENTS)
    public void event(Event event) {
        // do something with the event
    }

    @StreamListener(target = InputChannel.COMMANDS)
    public void command(Command command) {
        // do something with the command only after all events have been processed
    }

}

我试图添加一些可怕的代码,从传入的事件消息中获取 kafka 主题偏移量元数据,然后使用信号量来阻止命令,直到事件达到总偏移量的特定百分比。它有点管用,但让我很难过,一旦我们有 20 个左右相互依赖的主题,维护起来就会很糟糕。

SpringBoot / Spring Streams 是否有任何内置机制来执行此操作,或者是否存在一些我不知道的人们使用的常见模式?

TL;DR:如何在消费来自主题 B 的任何消息之前处理来自主题 A 的所有消息,而不做一些肮脏的事情,比如粘贴 Thread.sleep(60000)在主题 B 的消费者中?

查看 kafka consumer binding property resetOffsets

resetOffsets

Whether to reset offsets on the consumer to the value provided by startOffset. Must be false if a KafkaRebalanceListener is provided; see Using a KafkaRebalanceListener.

Default: false.

startOffset

The starting offset for new groups. Allowed values: earliest and latest. If the consumer group is set explicitly for the consumer 'binding' (through spring.cloud.stream.bindings..group), 'startOffset' is set to earliest. Otherwise, it is set to latest for the anonymous consumer group. Also see resetOffsets (earlier in this list).

Default: null (equivalent to earliest).

您还可以添加一个 KafkaBindingRebalanceListener 并在消费者上执行查找。

编辑

您还可以在第二个侦听器上将 autoStartup 设置为 false,并在准备就绪时开始绑定。这是一个例子:

@SpringBootApplication
@EnableBinding(Sink.class)
public class Gitter55Application {

    public static void main(String[] args) {
        SpringApplication.run(Gitter55Application.class, args);
    }

    @Bean
    public ConsumerEndpointCustomizer<KafkaMessageDrivenChannelAdapter<?, ?>> customizer() {
        return (endpoint, dest, group) -> {
            endpoint.setOnPartitionsAssignedSeekCallback((assignments, callback) -> {
                assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
            });
        };
    }

    @StreamListener(Sink.INPUT)
    public void listen(String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key) {
        System.out.println(new String(key) + ":" + value);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template,
            BindingsEndpoint bindings) {

        return args -> {
            while (true) {
                template.send("gitter55", "foo".getBytes(), "bar".getBytes());

                System.out.println("Hit enter to start");
                System.in.read();
                bindings.changeState("input", State.STARTED);
            }
        };

    }

}
spring.cloud.stream.bindings.input.group=gitter55
spring.cloud.stream.bindings.input.destination=gitter55
spring.cloud.stream.bindings.input.content-type=text/plain

spring.cloud.stream.bindings.input.consumer.auto-startup=false