Kafka-Streams 加入 2 个具有 JSON 个值的主题 |背压机制?
Kafka-Streams Join 2 topics with JSON values | backpressure mechanism?
我正在学习 Kafka Streams 并尝试实现以下目标:
创建了 2 个 Kafka 主题(例如 topic1、topic2),其中 null 作为键,JSONString 作为值。 topic1 中的数据(无重复)在 topic2 中有多个匹配条目。 IE。 topic1有一些主流数据,加入topic2后会产生新的多数据流。
示例:
topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}
预期输出:{"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}
想要 persist/hold 来自主题 1 的数据流以供将来参考,而来自主题 2 的数据流仅用于实现上述用例,不需要任何 persistence/holding背部。
我有几个问题:
1) hold/store topic1 数据流应该持续几天(可能吗?),以便可以加入来自 topic2 的传入数据流。可能吗?
2)我应该用什么来实现这个,KStream 或 KTable?
3)这叫背压机制吗?
Kafka Stream 是否支持这个用例,或者我应该寻找其他东西吗?请建议。
我用 5 分钟尝试了一段使用 KStream 的代码 window,但看起来我无法在流中保存 topic1 数据。
请帮助我做出正确的选择并加入。我正在使用来自 Confluent 的 Kafka 和 Docker 个实例。
public void run() {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
// Hold data from this topic to 30 days
KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
cs.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
// Data is involved in one time process.
KStream<String, JsonNode> css = builder.stream("topic2", consumed);
css.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
KStream<String, JsonNode> resultStream = cs.leftJoin(css,
valueJoiner,
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
Joined.with(
Serdes.String(), /* key */
jsonSerde, /* left value */
jsonSerde) /* right value */
);
resultStream.foreach((k, v) -> {
System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
});
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
}
Kafka 中的联接始终基于键。(*) 因此,要使任何联接工作,您需要先将要联接的字段提取到键中您执行实际的连接(唯一的部分例外是 KStream-GlobalKTable 连接)。在您的代码示例中,您不会得到任何结果,因为所有记录都有一个 null
键,因此无法连接。
对于联接本身,KStream-KTable 联接似乎是您用例的正确选择。要完成这项工作,您需要:
- 为
topic1
正确设置连接键并将数据写入另一个主题(我们称之为 topic1Keyed
)
- 阅读
topic1Keyed
作为table
- 为
topic2
正确设置连接键
- 加入
topic2
与 KTable
有关联接语义的完整详细信息,请查看此博客 post:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
(*) 更新:
从2.4版本开始,Kafka Streams也支持外键table-table连接
我正在学习 Kafka Streams 并尝试实现以下目标:
创建了 2 个 Kafka 主题(例如 topic1、topic2),其中 null 作为键,JSONString 作为值。 topic1 中的数据(无重复)在 topic2 中有多个匹配条目。 IE。 topic1有一些主流数据,加入topic2后会产生新的多数据流。
示例:
topic1={"name": "abc", "age":2}, {"name": "xyz", "age":3} and so on.
topic2={"name": "abc", "address"="xxxxxx"}, {"name": "abc", "address"="yyyyyy"}, {"name": "xyz", "address"="jjjjjj"}, {"name": "xyz", "address"="xxxkkkkk"}
预期输出:{"name": "abc", "age":2, "address"="xxxxxx"}, {"name": "abc", "age":2, "address"="yyyyyy"}, {"name": "xyz", "age":3, "address"="jjjjjj"}, {"name": "xyz", "age":3, "address"="xxxkkkkk"}
想要 persist/hold 来自主题 1 的数据流以供将来参考,而来自主题 2 的数据流仅用于实现上述用例,不需要任何 persistence/holding背部。
我有几个问题: 1) hold/store topic1 数据流应该持续几天(可能吗?),以便可以加入来自 topic2 的传入数据流。可能吗? 2)我应该用什么来实现这个,KStream 或 KTable? 3)这叫背压机制吗?
Kafka Stream 是否支持这个用例,或者我应该寻找其他东西吗?请建议。
我用 5 分钟尝试了一段使用 KStream 的代码 window,但看起来我无法在流中保存 topic1 数据。
请帮助我做出正确的选择并加入。我正在使用来自 Confluent 的 Kafka 和 Docker 个实例。
public void run() {
final StreamsBuilder builder = new StreamsBuilder();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(new JsonSerializer(), new JsonDeserializer());
final Consumed<String, JsonNode> consumed = Consumed.with(Serdes.String(), jsonSerde);
// Hold data from this topic to 30 days
KStream<String, JsonNode> cs = builder.stream("topic1", consumed);
cs.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
// Data is involved in one time process.
KStream<String, JsonNode> css = builder.stream("topic2", consumed);
css.foreach((k,v) -> {
System.out.println( k + " --->" + v);
});
KStream<String, JsonNode> resultStream = cs.leftJoin(css,
valueJoiner,
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
Joined.with(
Serdes.String(), /* key */
jsonSerde, /* left value */
jsonSerde) /* right value */
);
resultStream.foreach((k, v) -> {
System.out.println("JOIN STREAM: KEY="+k+ ", VALUE=" + v);
});
KafkaStreams streams = new KafkaStreams(builder.build(), properties);
streams.start();
}
Kafka 中的联接始终基于键。(*) 因此,要使任何联接工作,您需要先将要联接的字段提取到键中您执行实际的连接(唯一的部分例外是 KStream-GlobalKTable 连接)。在您的代码示例中,您不会得到任何结果,因为所有记录都有一个 null
键,因此无法连接。
对于联接本身,KStream-KTable 联接似乎是您用例的正确选择。要完成这项工作,您需要:
- 为
topic1
正确设置连接键并将数据写入另一个主题(我们称之为topic1Keyed
) - 阅读
topic1Keyed
作为table - 为
topic2
正确设置连接键
- 加入
topic2
与KTable
有关联接语义的完整详细信息,请查看此博客 post:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/
(*) 更新:
从2.4版本开始,Kafka Streams也支持外键table-table连接