来自单个主主题的多个流
Multiple streams from a single master topic
如何从一个主主题创建多个流?当我做这样的事情时:
KStreamBuilder builder = new KStreamBuilder();
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output1");
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
我收到以下错误:
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)
是否需要为来自 "master" 的每个流创建另一个 KafkaStreams 实例?
您可以创建一个可以重复使用的 KStream:
KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");
那么你可以重复使用它:
inputStream.filter(..logic1)
.to(Serdes.String(), Serdes.String(), "output1");
inputStream.filter(..logic2)
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
你也可以使用分支功能来实现这个
KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");
然后使用分支将创建结果集数组
final KStream<String, String>[] splitStream = inputStream.branch(new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
//write logic to filter
return true;
},
new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
//write logic to filter
return true;
},....
//you can write multiple predicate to filter inputStream
});
最终在分支完成后,splitStream[0] 将包含第一个过滤器输出,splitStream[1] 将包含第二个过滤器输出,依此类推。
要将此发送到任何输出主题,您可以使用以下代码。
splitStream[0].to("out_topic1");
splitStream[1].to("out_topic2");
如何从一个主主题创建多个流?当我做这样的事情时:
KStreamBuilder builder = new KStreamBuilder();
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output1");
builder.stream(Serdes.String(), Serdes.String(), "master")
/* Filtering logic */
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
我收到以下错误:
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic master has already been registered by another source.
at org.apache.kafka.streams.processor.TopologyBuilder.addSource(TopologyBuilder.java:347)
at org.apache.kafka.streams.kstream.KStreamBuilder.stream(KStreamBuilder.java:92)
是否需要为来自 "master" 的每个流创建另一个 KafkaStreams 实例?
您可以创建一个可以重复使用的 KStream:
KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");
那么你可以重复使用它:
inputStream.filter(..logic1)
.to(Serdes.String(), Serdes.String(), "output1");
inputStream.filter(..logic2)
.to(Serdes.String(), Serdes.String(), "output2");
KafkaStreams streams = new KafkaStreams(builder, /* config */);
streams.start();
你也可以使用分支功能来实现这个
KStream<String, String> inputStream = builder.stream(Serdes.String(), Serdes.String(), "master");
然后使用分支将创建结果集数组
final KStream<String, String>[] splitStream = inputStream.branch(new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
//write logic to filter
return true;
},
new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
//write logic to filter
return true;
},....
//you can write multiple predicate to filter inputStream
});
最终在分支完成后,splitStream[0] 将包含第一个过滤器输出,splitStream[1] 将包含第二个过滤器输出,依此类推。 要将此发送到任何输出主题,您可以使用以下代码。
splitStream[0].to("out_topic1");
splitStream[1].to("out_topic2");