数据流流式传输作业无法扩展到超过 1 个工作人员

Dataflow streaming job not scaleing past 1 worker

我使用 Apache Beam SDK for Java 2.1.0 的流式数据流作业 (2017-09-08_03_55_43-9675407418829265662) 不会超过 1 个工作人员,即使 pubsub 队列不断增长(现在有 10 万条未送达的消息)——你知道为什么吗?

目前 运行 autoscalingAlgorithm=THROUGHPUT_BASEDmaxNumWorkers=10

这里是数据流工程师。我查看了后端的作业,发现它没有扩展,因为 CPU 利用率很低,这意味着其他因素限制了管道的性能,例如外部节流。在这些情况下,升级很少有帮助。

我看到有些捆绑包的处理时间长达数小时。我建议调查您的管道逻辑,看看是否还有其他可以优化的部分。

这就是我最终得到的结果:

import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

import java.util.concurrent.ThreadLocalRandom;


public class ReshuffleWithRandomKey<T>
        extends PTransform<PCollection<T>, PCollection<T>> {

    private final int size;

    public ReshuffleWithRandomKey(int size) {
        this.size = size;
    }

    @Override
    public PCollection<T> expand(PCollection<T> input) {
        return input
                .apply("Random key", ParDo.of(new AssignRandomKeyFn<T>(size)))
                .apply("Reshuffle", Reshuffle.<Integer, T>of())
                .apply("Values", Values.<T>create());
    }

    private static class AssignRandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {

        private final int size;

        AssignRandomKeyFn(int size) {
            this.size = size;
        }

        @ProcessElement
        public void process(ProcessContext c) {
            c.output(KV.of(ThreadLocalRandom.current().nextInt(0, size), c.element()));
        }
    }
}

你怎么看@raghu-angadi 和@scott-wegner?