将 KTable 与 KStream 连接起来,输出主题中没有任何内容
Joining a KTable with a KStream and nothing arrives in the output topic
我用 KTable 左连接了一个 KStream,但是我没有看到输出主题的任何输出:
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[java.lang.Long] = Serdes.Long()
val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()
val builder = new KStreamBuilder()
val networkImprStream: KStream[Long, GenericRecord] = builder
.stream(dfpGcsNetworkImprEnhanced)
// Create a global table for advertisers. The data from this global table
// will be fully replicated on each instance of this application.
val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store")
// Join the network impr stream to the advertiser global table. As this is global table
// we can use a non-key based join with out needing to repartition the input stream
val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable,
(_, networkImpr) => {
println(networkImpr)
networkImpr.get("advertiserId").asInstanceOf[java.lang.Long]
},
(networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => {
println(networkImpr)
networkImpr.put("advertiserName", adertiserIdToName.get("name"))
networkImpr
}
)
networkImprWithAdvertiserNameKStream.to(networkImprProcessed)
val streams = new KafkaStreams(builder, streamsConfiguration)
streams.cleanUp()
streams.start()
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(15000L)
如果我绕过连接并直接将输入主题输出到输出,我会看到消息到达。我已经将连接更改为左连接,添加了一些 printlns 以查看何时提取密钥(虽然控制台上没有打印任何内容)。此外,我每次都使用 kafka 流重置工具,所以从头开始。我 运行 没主意了。我还添加了一些对商店的测试访问,它可以工作并包含来自流的键(尽管这不应该因为左连接而禁止任何输出)。
在我的源流中,键为空。虽然我不是用这个key来加入table这个key不能为null。因此,使用虚拟密钥创建中间流是可行的。所以即使我在这里有一个全局 KTable,流消息的键的限制也适用于这里:
http://docs.confluent.io/current/streams/developer-guide.html#kstream-ktable-join
Input records for the stream with a null key or a null value are ignored and do not trigger the join.
我用 KTable 左连接了一个 KStream,但是我没有看到输出主题的任何输出:
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[java.lang.Long] = Serdes.Long()
val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()
val builder = new KStreamBuilder()
val networkImprStream: KStream[Long, GenericRecord] = builder
.stream(dfpGcsNetworkImprEnhanced)
// Create a global table for advertisers. The data from this global table
// will be fully replicated on each instance of this application.
val advertiserTable: GlobalKTable[java.lang.Long, GenericRecord]= builder.globalTable(advertiserTopicName, "advertiser-store")
// Join the network impr stream to the advertiser global table. As this is global table
// we can use a non-key based join with out needing to repartition the input stream
val networkImprWithAdvertiserNameKStream: KStream[java.lang.Long, GenericRecord] = networkImprStream.leftJoin(advertiserTable,
(_, networkImpr) => {
println(networkImpr)
networkImpr.get("advertiserId").asInstanceOf[java.lang.Long]
},
(networkImpr: GenericRecord, adertiserIdToName: GenericRecord) => {
println(networkImpr)
networkImpr.put("advertiserName", adertiserIdToName.get("name"))
networkImpr
}
)
networkImprWithAdvertiserNameKStream.to(networkImprProcessed)
val streams = new KafkaStreams(builder, streamsConfiguration)
streams.cleanUp()
streams.start()
// usually the stream application would be running forever,
// in this example we just let it run for some time and stop since the input data is finite.
Thread.sleep(15000L)
如果我绕过连接并直接将输入主题输出到输出,我会看到消息到达。我已经将连接更改为左连接,添加了一些 printlns 以查看何时提取密钥(虽然控制台上没有打印任何内容)。此外,我每次都使用 kafka 流重置工具,所以从头开始。我 运行 没主意了。我还添加了一些对商店的测试访问,它可以工作并包含来自流的键(尽管这不应该因为左连接而禁止任何输出)。
在我的源流中,键为空。虽然我不是用这个key来加入table这个key不能为null。因此,使用虚拟密钥创建中间流是可行的。所以即使我在这里有一个全局 KTable,流消息的键的限制也适用于这里: http://docs.confluent.io/current/streams/developer-guide.html#kstream-ktable-join
Input records for the stream with a null key or a null value are ignored and do not trigger the join.