数据流流式传输作业无法扩展到超过 1 个工作人员
Dataflow streaming job not scaleing past 1 worker
我使用 Apache Beam SDK for Java 2.1.0
的流式数据流作业 (2017-09-08_03_55_43-9675407418829265662
) 不会超过 1 个工作人员,即使 pubsub 队列不断增长(现在有 10 万条未送达的消息)——你知道为什么吗?
目前 运行 autoscalingAlgorithm=THROUGHPUT_BASED
和 maxNumWorkers=10
。
这里是数据流工程师。我查看了后端的作业,发现它没有扩展,因为 CPU 利用率很低,这意味着其他因素限制了管道的性能,例如外部节流。在这些情况下,升级很少有帮助。
我看到有些捆绑包的处理时间长达数小时。我建议调查您的管道逻辑,看看是否还有其他可以优化的部分。
这就是我最终得到的结果:
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;
public class ReshuffleWithRandomKey<T>
extends PTransform<PCollection<T>, PCollection<T>> {
private final int size;
public ReshuffleWithRandomKey(int size) {
this.size = size;
}
@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply("Random key", ParDo.of(new AssignRandomKeyFn<T>(size)))
.apply("Reshuffle", Reshuffle.<Integer, T>of())
.apply("Values", Values.<T>create());
}
private static class AssignRandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
private final int size;
AssignRandomKeyFn(int size) {
this.size = size;
}
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of(ThreadLocalRandom.current().nextInt(0, size), c.element()));
}
}
}
你怎么看@raghu-angadi 和@scott-wegner?
我使用 Apache Beam SDK for Java 2.1.0
的流式数据流作业 (2017-09-08_03_55_43-9675407418829265662
) 不会超过 1 个工作人员,即使 pubsub 队列不断增长(现在有 10 万条未送达的消息)——你知道为什么吗?
目前 运行 autoscalingAlgorithm=THROUGHPUT_BASED
和 maxNumWorkers=10
。
这里是数据流工程师。我查看了后端的作业,发现它没有扩展,因为 CPU 利用率很低,这意味着其他因素限制了管道的性能,例如外部节流。在这些情况下,升级很少有帮助。
我看到有些捆绑包的处理时间长达数小时。我建议调查您的管道逻辑,看看是否还有其他可以优化的部分。
这就是我最终得到的结果:
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;
public class ReshuffleWithRandomKey<T>
extends PTransform<PCollection<T>, PCollection<T>> {
private final int size;
public ReshuffleWithRandomKey(int size) {
this.size = size;
}
@Override
public PCollection<T> expand(PCollection<T> input) {
return input
.apply("Random key", ParDo.of(new AssignRandomKeyFn<T>(size)))
.apply("Reshuffle", Reshuffle.<Integer, T>of())
.apply("Values", Values.<T>create());
}
private static class AssignRandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
private final int size;
AssignRandomKeyFn(int size) {
this.size = size;
}
@ProcessElement
public void process(ProcessContext c) {
c.output(KV.of(ThreadLocalRandom.current().nextInt(0, size), c.element()));
}
}
}
你怎么看@raghu-angadi 和@scott-wegner?