读取并处理来自 Kafka 的一批消息

Read and process a batch of messages from Kafka

我想从一个kafka主题中定时读取一批消息,或者当读取的消息数量达到一定数量时,将它们作为一个批次发送到下游系统。目前,我的 kafka 拓扑由一个处理器终止,该处理器保存消息,然后使用 punctuate 方法增量处理批处理。

不过,我不确定这是否完美,因为如果应用程序在调用 punctuate 方法之前崩溃,我认为某些消息会丢失(即消费者认为它已经完成但它们不会出现在下游系统中)。

batchQueue = new LinkedBlockingQueue<String>(batchSize);

KStream<String, String> inputStream = builder
    .stream(Serdes.String(), Serdes.String(), "source-topic")
    .process(new ProcessorSupplier<String, String>() {

            @Override
            public Processor<String, String> get() {
                return new AbstractProcessor<String, Wrapper>() {

                    @Override
                    public void init(ProcessorContext context) {
                        super.init(context);
                        context.schedule(flushPeriod);
                    }

                    @Override
                    public void process(String key, String value) {
                        batchQueue.add(value);
                        if (batchQueue.remainingCapacity() == 0) {
                            processQueue();
                        }
                    }

                    @Override
                    public void punctuate(long timestamp) {
                        processQueue();
                        context().commit();
                    }
                }

                @Override
                public void close() {}
            };
        }
    });

Is there a way to make this approach more robust? Perhaps windowing but I don't really understand this.

我建议将数据转换部分(为此我将使用 Kafka 的 Streams API)和写入下游系统的数据摄取部分(为此我将使用 Kafka 的连接 API).

简而言之,为什么您的转换逻辑应该与此数据最终将转发到的下游系统之一的细节(此处:昂贵的插入!)相结​​合并且需要担心?理想情况下,转换的责任应该只是转换,而不应该与外部下游系统的操作方面有关。例如,如果您最终想要将转换后的数据转发到第二个下游系统(或第三个,...),那么耦合方法将意味着您必须 update/redeploy/... 您的应用程序,即使 none 的转换逻辑发生了变化。

解耦转换和摄取的另一个好处是您的转换逻辑将更加简单,因为它不必考虑由于下游系统缓慢、不可用等原因导致的故障。例如,它确实不需要 implement/test 复杂的重试逻辑。

Do I have to use Kafka connect for this.

不,您不需要为此使用 Kafka Connect,但它可以说是完成此任务的最佳工具。

I'm leaning away from [Kafka Connect] due to it's error handling capability: https://groups.google.com/forum/#!topic/confluent-platform/OBuLbVHbuyI

在最新版本的 Kafka Connect 中,错误处理实际上非常好。此外,linked 讨论中的问题可以通过为 Connect 提供更强大的转换器(思考:serializer/deserializer)轻松解决。

此外,如 link 中所述,当您在将数据写入 Kafka 之前验证数据的兼容性时,那里讨论的具体问题就变得不那么重要了。您可以通过利用 Confluent 的模式注册表来实现这一点(https://github.com/confluentinc/schema-registry, docs or similar tools. Since you raised the question "how can I make this more robust", thinking about data serialization and evolution 是我在部署到生产环境之前要考虑的另一个重要方面。

希望对您有所帮助!