无法通过扩展 DataflowPipelineOptions 添加 CustomPipelineOptions
Unable to add CustomPipelineOptions by extending DataflowPipelineOptions
我想在我的管道中完成以下工作:
- 类似于 Apache Beam WordCount 示例中的 WordCountOptions 的自定义用户定义管道选项
- 相关 javadoc
中所述的数据流管道选项
现在,在尝试扩展 DataflowPipelineOptions
时:
public interface CustomPipelineOptions extends DataflowPipelineOptions {
@Description("Sample parameter description")
ValueProvider<String> getSampleParameter();
void setSampleParameter(ValueProvider<String> sampleParameter);
// more custom parameters below...
}
并在我的 main()
函数中将我的 CustomPipelineOptions
类型选项传递给 run()
:
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
// set Dataflow specifc options
options.setProject("my-project");
options.setRegion("my-region");
options.setStagingLocation("gs://my-bucket/location");
options.setTempLocation("gs://my-bucket/location");
options.setSubnetwork("regions/my-region/subnetworks/my-subnetwork");
options.setJobName("my-job-name");
options.setUsePublicIps(false);
options.setRunner(DataflowRunner.class);
run(options);
}
(请注意,在上面我配置了 javadoc 中概述的各种 DataflowPipelineOptions
选项)
我在其中使用 CustomPipelineOptions
:
类型的选项创建我的管道
static void run(CustomPipelineOptions options) {
/*
Define pipeline
*/
Pipeline p = Pipeline.create(options);
// function continues below...
}
此外,我在 pom.xml
文件中包含了以下相关依赖项:
(注意 ${beam.version}
是 2.31.0,${slf4j.version}
是 1.7.25)
<dependencies>
<!-- core beam SDK -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- gcp package -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<!-- directRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- slf4j; logging for java -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
当我使用以下命令执行管道时:
mvn compile exec:java -Dexec.mainClass=path.to.class.myClass
或与:
mvn compile exec:java -Dexec.mainClass=path.to.class.myClass -Pdataflow-runner
我收到以下错误:
Compilation failure:
[ERROR] /C:/Users/path/to/class/myClass.java:[18,40] package org.apache.beam.runners.dataflow does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[19,48] package org.apache.beam.runners.dataflow.options does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[56,52] cannot find symbol
[ERROR] symbol: class DataflowPipelineOptions
[ERROR] location: class path.to.class.myClass
[ERROR] /C:/Users/path/to/class/myClass.java:[82,38] incompatible types: path.to.class.myClass.CustomPipelineOptions cannot be converted to org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[145,67] method as in class org.apache.beam.sdk.options.PipelineOptionsFactory.Builder cannot be applied to given types;
和:
[ERROR] required: java.lang.Class<T>
[ERROR] found: java.lang.Class<path.to.class.myClass.CustomPipelineOptions>
[ERROR] reason: inference variable T has incompatible bounds
[ERROR] equality constraints: path.to.class.myClass.CustomPipelineOptions
[ERROR] upper bounds: org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[150,16] cannot find symbol
[ERROR] symbol: method setProject(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[151,16] cannot find symbol
[ERROR] symbol: method setRegion(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[152,16] cannot find symbol
[ERROR] symbol: method setStagingLocation(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[153,16] cannot find symbol
[ERROR] symbol: method setTempLocation(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[154,16] cannot find symbol
[ERROR] symbol: method setSubnetwork(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[155,16] cannot find symbol
[ERROR] symbol: method setJobName(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[156,16] cannot find symbol
[ERROR] symbol: method setUsePublicIps(boolean)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[157,27] cannot find symbol
[ERROR] symbol: class DataflowRunner
[ERROR] location: class path.to.class.myClass
任何关于为什么我会收到这些错误以及如何实现我的目标以包括我自己在 CustomPipelineOptions
和数据流特定 DataflowPipelineOptions
中定义的管道选项的任何想法将不胜感激。谢谢!
第一个问题在pom.xml
文件中,我们需要修改
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope> <!-- delete this! -->
</dependency>
到
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
</dependency>
maven文档中的dependency scope中提到,将范围设置为runtime
将表明该依赖“不是编译所必需的,而是执行所必需的”。我们在编译期间需要此依赖项,因此省略依赖项范围会导致默认范围 compile
.
第二个问题是在定义main()中的options
时:
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
解决方法是去掉withValidation()
:
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomPipelineOptions.class);
PipelineOptionsValidator 的 validate()
方法“验证传递的 PipelineOptions
是否符合传递接口的所有验证标准。”由于我们的选项是在验证后分配的,因此管道失败。
我想在我的管道中完成以下工作:
- 类似于 Apache Beam WordCount 示例中的 WordCountOptions 的自定义用户定义管道选项
- 相关 javadoc 中所述的数据流管道选项
现在,在尝试扩展 DataflowPipelineOptions
时:
public interface CustomPipelineOptions extends DataflowPipelineOptions {
@Description("Sample parameter description")
ValueProvider<String> getSampleParameter();
void setSampleParameter(ValueProvider<String> sampleParameter);
// more custom parameters below...
}
并在我的 main()
函数中将我的 CustomPipelineOptions
类型选项传递给 run()
:
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
// set Dataflow specifc options
options.setProject("my-project");
options.setRegion("my-region");
options.setStagingLocation("gs://my-bucket/location");
options.setTempLocation("gs://my-bucket/location");
options.setSubnetwork("regions/my-region/subnetworks/my-subnetwork");
options.setJobName("my-job-name");
options.setUsePublicIps(false);
options.setRunner(DataflowRunner.class);
run(options);
}
(请注意,在上面我配置了 javadoc 中概述的各种 DataflowPipelineOptions
选项)
我在其中使用 CustomPipelineOptions
:
static void run(CustomPipelineOptions options) {
/*
Define pipeline
*/
Pipeline p = Pipeline.create(options);
// function continues below...
}
此外,我在 pom.xml
文件中包含了以下相关依赖项:
(注意 ${beam.version}
是 2.31.0,${slf4j.version}
是 1.7.25)
<dependencies>
<!-- core beam SDK -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- gcp package -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<!-- directRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- slf4j; logging for java -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
当我使用以下命令执行管道时:
mvn compile exec:java -Dexec.mainClass=path.to.class.myClass
或与:
mvn compile exec:java -Dexec.mainClass=path.to.class.myClass -Pdataflow-runner
我收到以下错误:
Compilation failure:
[ERROR] /C:/Users/path/to/class/myClass.java:[18,40] package org.apache.beam.runners.dataflow does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[19,48] package org.apache.beam.runners.dataflow.options does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[56,52] cannot find symbol
[ERROR] symbol: class DataflowPipelineOptions
[ERROR] location: class path.to.class.myClass
[ERROR] /C:/Users/path/to/class/myClass.java:[82,38] incompatible types: path.to.class.myClass.CustomPipelineOptions cannot be converted to org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[145,67] method as in class org.apache.beam.sdk.options.PipelineOptionsFactory.Builder cannot be applied to given types;
和:
[ERROR] required: java.lang.Class<T>
[ERROR] found: java.lang.Class<path.to.class.myClass.CustomPipelineOptions>
[ERROR] reason: inference variable T has incompatible bounds
[ERROR] equality constraints: path.to.class.myClass.CustomPipelineOptions
[ERROR] upper bounds: org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[150,16] cannot find symbol
[ERROR] symbol: method setProject(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[151,16] cannot find symbol
[ERROR] symbol: method setRegion(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[152,16] cannot find symbol
[ERROR] symbol: method setStagingLocation(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[153,16] cannot find symbol
[ERROR] symbol: method setTempLocation(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[154,16] cannot find symbol
[ERROR] symbol: method setSubnetwork(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[155,16] cannot find symbol
[ERROR] symbol: method setJobName(java.lang.String)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[156,16] cannot find symbol
[ERROR] symbol: method setUsePublicIps(boolean)
[ERROR] location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[157,27] cannot find symbol
[ERROR] symbol: class DataflowRunner
[ERROR] location: class path.to.class.myClass
任何关于为什么我会收到这些错误以及如何实现我的目标以包括我自己在 CustomPipelineOptions
和数据流特定 DataflowPipelineOptions
中定义的管道选项的任何想法将不胜感激。谢谢!
第一个问题在pom.xml
文件中,我们需要修改
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope> <!-- delete this! -->
</dependency>
到
<!-- dataflowRunner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
</dependency>
maven文档中的dependency scope中提到,将范围设置为runtime
将表明该依赖“不是编译所必需的,而是执行所必需的”。我们在编译期间需要此依赖项,因此省略依赖项范围会导致默认范围 compile
.
第二个问题是在定义main()中的options
时:
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
解决方法是去掉withValidation()
:
public static void main(String[] args) {
CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomPipelineOptions.class);
PipelineOptionsValidator 的 validate()
方法“验证传递的 PipelineOptions
是否符合传递接口的所有验证标准。”由于我们的选项是在验证后分配的,因此管道失败。