将多个源流连接到同一组分支
Connecting Multiple Source Streams to The Same Set of Branches
这是对 的概括。
假设我有多个源流,它们应用了同一组谓词。我想设置分支流,以便满足谓词的记录,无论哪个源流,都由同一个分支流处理。如下图所示,每个分支流就像一个转换传入记录的通用处理器。
以下代码块无法正常工作,因为它为每个源流创建了一组不同的分支流。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream("x");
KStream<String, String> source2 = builder.stream("y");
Predicate<String, String>[] branchPredicates = new Predicate[forkCount];
for (int i = 0; i < forkCount; ++i) {
int idx = i;
branchPredicates[i] = ((key, value) ->
key.hashCode() % forkCount == idx);
}
Kstream<String, String>[] forkStreams = Arrays.asList(source1, source2)
.map(srcStream -> srcStream.branch(branchPredicates)
.flatMap(x -> Arrays.stream())
.collect(Collectors.toList());
抱歉,我主要是一名 Scala 开发人员 :)
在上面的例子中,forkStreams.length == branchPredicates.length x 2 通常,与源流的数量成正比。 Kafka 流中是否有一个技巧可以让我在谓词和分支流之间保持一对一的关系?
2018 年 11 月 27 日更新
我可以取得一些进展:
- 使用一个源流读取所有源主题
- 将源流连接到多个分支
- 将消息平均分配到分支机构。
但是,正如下面的代码块所展示的,ALL 分叉流存在于同一个线程中。我想要实现的是将每个 fork 流放入不同的线程以允许更好的 CPU 利用率
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(Arrays.asList("a", "b", "c")
// Create workers
// Need to have predicates for the branches
int totalPerdicates = Integer
.parseInt(props.getProperty(WORKER_PROCESSOR_COUNT));
Predicate<String, String>[] predicates = new Predicate[totalPerdicates];
IntStream
.range(0, totalPerdicates)
.forEach(i -> {
int idx = i;
predicates[i] = (key, value) ->
key.hashCode() % totalPerdicates == idx;
});
forkStreams = Arrays.asList(sourceStreams.branch(predicates));
// Hack- Dump the number of messages processed every 10 seconds
forkStreams
.forEach(fork -> {
KStream<Windowed<String>, Long> tbl =
fork.transformValues(new SourceTopicValueTransformerSupplier())
.selectKey((key, value) -> "foobar")
.groupByKey()
.windowedBy(TimeWindows.of(2000L))
.count()
.toStream();
tbl
.foreach((key, count) -> {
String fromTo = String.format("%d-%d",
key.window().start(),
key.window().end());
System.out.printf("(Thread %d, Index %d) %s - %s: %d\n",
Thread.currentThread().getId(),
forkStreams.indexOf(fork),
fromTo, key.key(), count);
});
这是输出的片段
<snip>
(Thread 13, Index 1) 1542132126000-1542132128000 - foobar: 2870
(Thread 13, Index 1) 1542132024000-1542132026000 - foobar: 2955
(Thread 13, Index 1) 1542132106000-1542132108000 - foobar: 1914
(Thread 13, Index 1) 1542132054000-1542132056000 - foobar: 546
<snip>
(Thread 13, Index 2) 1542132070000-1542132072000 - foobar: 524
(Thread 13, Index 2) 1542132012000-1542132014000 - foobar: 2491
(Thread 13, Index 2) 1542132042000-1542132044000 - foobar: 261
(Thread 13, Index 2) 1542132022000-1542132024000 - foobar: 2823
<snip>
(Thread 13, Index 3) 1542132088000-1542132090000 - foobar: 2170
(Thread 13, Index 3) 1542132010000-1542132012000 - foobar: 2962
(Thread 13, Index 3) 1542132008000-1542132010000 - foobar: 2847
(Thread 13, Index 3) 1542132022000-1542132024000 - foobar: 2797
<snip>
(Thread 13, Index 4) 1542132046000-1542132048000 - foobar: 2846
(Thread 13, Index 4) 1542132096000-1542132098000 - foobar: 3216
(Thread 13, Index 4) 1542132108000-1542132110000 - foobar: 2696
(Thread 13, Index 4) 1542132010000-1542132012000 - foobar: 2881
<snip>
如有任何关于如何将每个分支流放置在不同线程中的建议,我们将不胜感激。
11/27/2018的更新已经回答了这个问题。话虽如此,该解决方案对我不起作用,因为我希望每个 fork 流都作为单独的线程 运行 。调用 stream.branch()
在同一个线程 space 中创建多个子流。因此,一个分区内的所有记录都在同一个线程中处理 space.
为了实现子分区处理,我最终使用 kafka 客户端 API 结合 java 线程和并发队列。
这是对
假设我有多个源流,它们应用了同一组谓词。我想设置分支流,以便满足谓词的记录,无论哪个源流,都由同一个分支流处理。如下图所示,每个分支流就像一个转换传入记录的通用处理器。
以下代码块无法正常工作,因为它为每个源流创建了一组不同的分支流。
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source1 = builder.stream("x");
KStream<String, String> source2 = builder.stream("y");
Predicate<String, String>[] branchPredicates = new Predicate[forkCount];
for (int i = 0; i < forkCount; ++i) {
int idx = i;
branchPredicates[i] = ((key, value) ->
key.hashCode() % forkCount == idx);
}
Kstream<String, String>[] forkStreams = Arrays.asList(source1, source2)
.map(srcStream -> srcStream.branch(branchPredicates)
.flatMap(x -> Arrays.stream())
.collect(Collectors.toList());
抱歉,我主要是一名 Scala 开发人员 :)
在上面的例子中,forkStreams.length == branchPredicates.length x 2 通常,与源流的数量成正比。 Kafka 流中是否有一个技巧可以让我在谓词和分支流之间保持一对一的关系?
2018 年 11 月 27 日更新 我可以取得一些进展:
- 使用一个源流读取所有源主题
- 将源流连接到多个分支
- 将消息平均分配到分支机构。
但是,正如下面的代码块所展示的,ALL 分叉流存在于同一个线程中。我想要实现的是将每个 fork 流放入不同的线程以允许更好的 CPU 利用率
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream(Arrays.asList("a", "b", "c")
// Create workers
// Need to have predicates for the branches
int totalPerdicates = Integer
.parseInt(props.getProperty(WORKER_PROCESSOR_COUNT));
Predicate<String, String>[] predicates = new Predicate[totalPerdicates];
IntStream
.range(0, totalPerdicates)
.forEach(i -> {
int idx = i;
predicates[i] = (key, value) ->
key.hashCode() % totalPerdicates == idx;
});
forkStreams = Arrays.asList(sourceStreams.branch(predicates));
// Hack- Dump the number of messages processed every 10 seconds
forkStreams
.forEach(fork -> {
KStream<Windowed<String>, Long> tbl =
fork.transformValues(new SourceTopicValueTransformerSupplier())
.selectKey((key, value) -> "foobar")
.groupByKey()
.windowedBy(TimeWindows.of(2000L))
.count()
.toStream();
tbl
.foreach((key, count) -> {
String fromTo = String.format("%d-%d",
key.window().start(),
key.window().end());
System.out.printf("(Thread %d, Index %d) %s - %s: %d\n",
Thread.currentThread().getId(),
forkStreams.indexOf(fork),
fromTo, key.key(), count);
});
这是输出的片段
<snip>
(Thread 13, Index 1) 1542132126000-1542132128000 - foobar: 2870
(Thread 13, Index 1) 1542132024000-1542132026000 - foobar: 2955
(Thread 13, Index 1) 1542132106000-1542132108000 - foobar: 1914
(Thread 13, Index 1) 1542132054000-1542132056000 - foobar: 546
<snip>
(Thread 13, Index 2) 1542132070000-1542132072000 - foobar: 524
(Thread 13, Index 2) 1542132012000-1542132014000 - foobar: 2491
(Thread 13, Index 2) 1542132042000-1542132044000 - foobar: 261
(Thread 13, Index 2) 1542132022000-1542132024000 - foobar: 2823
<snip>
(Thread 13, Index 3) 1542132088000-1542132090000 - foobar: 2170
(Thread 13, Index 3) 1542132010000-1542132012000 - foobar: 2962
(Thread 13, Index 3) 1542132008000-1542132010000 - foobar: 2847
(Thread 13, Index 3) 1542132022000-1542132024000 - foobar: 2797
<snip>
(Thread 13, Index 4) 1542132046000-1542132048000 - foobar: 2846
(Thread 13, Index 4) 1542132096000-1542132098000 - foobar: 3216
(Thread 13, Index 4) 1542132108000-1542132110000 - foobar: 2696
(Thread 13, Index 4) 1542132010000-1542132012000 - foobar: 2881
<snip>
如有任何关于如何将每个分支流放置在不同线程中的建议,我们将不胜感激。
11/27/2018的更新已经回答了这个问题。话虽如此,该解决方案对我不起作用,因为我希望每个 fork 流都作为单独的线程 运行 。调用 stream.branch()
在同一个线程 space 中创建多个子流。因此,一个分区内的所有记录都在同一个线程中处理 space.
为了实现子分区处理,我最终使用 kafka 客户端 API 结合 java 线程和并发队列。