如何在kafka中同步多条日志?

How to make multiple logs sync in kafka?

假设我有 2 种类型的日志,它们有一个共同的字段 'uid',如果这两种包含 uid 的日志都到达时,我想输出日志,就像一个连接,是卡夫卡有可能吗?

是的,绝对是。查看 Kafka Streams,特别是 DSL API。它是这样的:

 StreamsBuilder builder = new StreamsBuilder();

 KStream<byte[], Foo> fooStream = builder.stream("foo");

 KStream<byte[], Bar> barStream = builder.stream("bar");

 fooStream.join(barStream,
                (foo, bar) -> {
                    foo.baz = bar.baz;
                    return foo;
                },
                JoinWindows.of(1000))
          .to("buzz");

这个简单的应用程序使用两个输入主题("foo" 和 "bar"),连接它们并将它们写入主题 "buzz"。由于流是无限的,当加入两个流时,你需要指定一个加入window(上面1000毫秒),这是各自流上两条消息之间的相对时间差,使它们有资格加入。

这是一个更完整的例子:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java

这是文档:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html。您会发现可以执行多种不同类型的联接:

重要的是要注意,尽管上面的示例将确定性地同步流——如果您重置并重新处理拓扑,每次都会得到相同的结果——并非 Kafka Streams 中的所有连接操作都是确定性的。从 1.0.0 版及之前的版本开始,大约一半是不确定的,可能取决于从底层主题分区消耗的数据的顺序。具体来说,内部 KStream-KStream 和所有 KTable-KTable 联接都是确定性的。其他连接,如所有 KStream-KTable 连接和 left/outer KStream-KStream 连接是不确定的,取决于消费者使用的数据顺序。如果您将拓扑设计为可重新处理,请牢记这一点。如果您使用这些非确定性操作,当您的拓扑 运行 处于活动状态时,事件到达的顺序将产生一个结果,但如果您正在重新处理您的拓扑,您可能会得到另一个结果。另请注意,KStream#merge() 等操作也不会产生确定性结果。有关此问题的更多信息,请参阅 and this mailing list post