一个数据流作业中的并行管道
Parallel pipeline inside one Dataflow Job
我想 运行 在 GCP 上的一个数据流作业中使用两条并行管道。我已经创建了一个管道,它工作得很好,但我想在不创建另一个作业的情况下创建另一个管道。
我搜索了很多答案,但找不到任何代码示例:(
如果我这样 运行 它不起作用:
pipe1.run();
pipe2.run();
它给了我 "There is already an active job name... If you want to submit a second job, try again setting a different name using --jobName
"
您可以将其他输入应用于管道,这将在一个作业中产生一个单独的管道。例如:
public class ExamplePipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline one:" + c.element());
c.output(c.element() + " extra message.");
}
}));
linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));
PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
linesForPipelineTwo.apply("Pipeline 2 transoform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline two:" + c.element());
}
}));
pipeline.run();
}
如您所见,您也可以将两个(或更多)分离的 PBegin 应用于具有多个 PDone/Sinks 的管道。在此示例中,"pipeline 1"
将输出转储并写入文件,"pipeline 2"
仅将其转储到屏幕。
如果您在 GCP 上使用 DataflowRunner
运行 此 GUI 将显示 2 未连接 "pipelines"。
我想 运行 在 GCP 上的一个数据流作业中使用两条并行管道。我已经创建了一个管道,它工作得很好,但我想在不创建另一个作业的情况下创建另一个管道。
我搜索了很多答案,但找不到任何代码示例:(
如果我这样 运行 它不起作用:
pipe1.run();
pipe2.run();
它给了我 "There is already an active job name... If you want to submit a second job, try again setting a different name using --jobName
"
您可以将其他输入应用于管道,这将在一个作业中产生一个单独的管道。例如:
public class ExamplePipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline one:" + c.element());
c.output(c.element() + " extra message.");
}
}));
linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));
PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
linesForPipelineTwo.apply("Pipeline 2 transoform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline two:" + c.element());
}
}));
pipeline.run();
}
如您所见,您也可以将两个(或更多)分离的 PBegin 应用于具有多个 PDone/Sinks 的管道。在此示例中,"pipeline 1"
将输出转储并写入文件,"pipeline 2"
仅将其转储到屏幕。
如果您在 GCP 上使用 DataflowRunner
运行 此 GUI 将显示 2 未连接 "pipelines"。