KStream 将记录发送到多个流(不是分支)
KStream send record to multiple streams (not Branch)
有没有一种方法可以进行类似分支的操作,但在谓词计算结果为真的每个输出流中放置记录? Brach 将记录放入第一个匹配项(文档:在第一个匹配项中,一条记录被放置到一个且只有一个输出流中)。
我想你可以使用这样的东西:
KStream<String, String> inputStream = builder.stream("input");
List<Predicate<String, String>> predicates = new ArrayList<>(); // <-- list of predicates
List<KStream<String, String>> kStreams = predicates.stream()
.map(inputStream::branch)
.map(Arrays::asList)
.map(listOfOneElementKStreams -> listOfOneElementKStreams.get(0)).collect(Collectors.toList());
您可以 "broadcast" 并单独过滤每个流:
KStream stream = ...
stream1 = stream.filter(...);
stream2 = stream.filter(...);
// and so on...
如果多次使用stream
变量,所有记录都会广播到所有下游过滤器(或任何其他操作符),即每个记录都会执行每个过滤器。
有没有一种方法可以进行类似分支的操作,但在谓词计算结果为真的每个输出流中放置记录? Brach 将记录放入第一个匹配项(文档:在第一个匹配项中,一条记录被放置到一个且只有一个输出流中)。
我想你可以使用这样的东西:
KStream<String, String> inputStream = builder.stream("input");
List<Predicate<String, String>> predicates = new ArrayList<>(); // <-- list of predicates
List<KStream<String, String>> kStreams = predicates.stream()
.map(inputStream::branch)
.map(Arrays::asList)
.map(listOfOneElementKStreams -> listOfOneElementKStreams.get(0)).collect(Collectors.toList());
您可以 "broadcast" 并单独过滤每个流:
KStream stream = ...
stream1 = stream.filter(...);
stream2 = stream.filter(...);
// and so on...
如果多次使用stream
变量,所有记录都会广播到所有下游过滤器(或任何其他操作符),即每个记录都会执行每个过滤器。