无法通过扩展 DataflowPipelineOptions 添加 CustomPipelineOptions

Unable to add CustomPipelineOptions by extending DataflowPipelineOptions

我想在我的管道中完成以下工作:

现在,在尝试扩展 DataflowPipelineOptions 时:

public interface CustomPipelineOptions extends DataflowPipelineOptions {
    @Description("Sample parameter description")
    ValueProvider<String> getSampleParameter();
    void setSampleParameter(ValueProvider<String> sampleParameter);
    
    // more custom parameters below...
}

并在我的 main() 函数中将我的 CustomPipelineOptions 类型选项传递给 run()

public static void main(String[] args) {

    CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);
    
    // set Dataflow specifc options
    options.setProject("my-project");
    options.setRegion("my-region");
    options.setStagingLocation("gs://my-bucket/location");
    options.setTempLocation("gs://my-bucket/location");
    options.setSubnetwork("regions/my-region/subnetworks/my-subnetwork");
    options.setJobName("my-job-name");
    options.setUsePublicIps(false);
    options.setRunner(DataflowRunner.class);

    run(options);
}

(请注意,在上面我配置了 javadoc 中概述的各种 DataflowPipelineOptions 选项)

我在其中使用 CustomPipelineOptions:

类型的选项创建我的管道
static void run(CustomPipelineOptions options) {
    /*
     Define pipeline
     */
    Pipeline p = Pipeline.create(options);
    
    // function continues below...
}

此外,我在 pom.xml 文件中包含了以下相关依赖项: (注意 ${beam.version} 是 2.31.0,${slf4j.version} 是 1.7.25)

<dependencies>
    <!-- core beam SDK -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- gcp package -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- dataflowRunner -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>
      <scope>runtime</scope>
    </dependency>

    <!-- directRunner -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- slf4j; logging for java -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
  </dependencies>

当我使用以下命令执行管道时:

mvn compile exec:java -Dexec.mainClass=path.to.class.myClass

或与:

mvn compile exec:java -Dexec.mainClass=path.to.class.myClass -Pdataflow-runner

我收到以下错误:

Compilation failure:
[ERROR] /C:/Users/path/to/class/myClass.java:[18,40] package org.apache.beam.runners.dataflow does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[19,48] package org.apache.beam.runners.dataflow.options does not exist
[ERROR] /C:/Users/path/to/class/myClass.java:[56,52] cannot find symbol
[ERROR]   symbol:   class DataflowPipelineOptions
[ERROR]   location: class path.to.class.myClass
[ERROR] /C:/Users/path/to/class/myClass.java:[82,38] incompatible types: path.to.class.myClass.CustomPipelineOptions cannot be converted to org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[145,67] method as in class org.apache.beam.sdk.options.PipelineOptionsFactory.Builder cannot be applied to given types;

和:

[ERROR]   required: java.lang.Class<T>
[ERROR]   found: java.lang.Class<path.to.class.myClass.CustomPipelineOptions>
[ERROR]   reason: inference variable T has incompatible bounds
[ERROR]     equality constraints: path.to.class.myClass.CustomPipelineOptions
[ERROR]     upper bounds: org.apache.beam.sdk.options.PipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[150,16] cannot find symbol
[ERROR]   symbol:   method setProject(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[151,16] cannot find symbol
[ERROR]   symbol:   method setRegion(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[152,16] cannot find symbol
[ERROR]   symbol:   method setStagingLocation(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[153,16] cannot find symbol
[ERROR]   symbol:   method setTempLocation(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[154,16] cannot find symbol
[ERROR]   symbol:   method setSubnetwork(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[155,16] cannot find symbol
[ERROR]   symbol:   method setJobName(java.lang.String)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[156,16] cannot find symbol
[ERROR]   symbol:   method setUsePublicIps(boolean)
[ERROR]   location: variable options of type path.to.class.myClass.CustomPipelineOptions
[ERROR] /C:/Users/path/to/class/myClass.java:[157,27] cannot find symbol
[ERROR]   symbol:   class DataflowRunner
[ERROR]   location: class path.to.class.myClass

任何关于为什么我会收到这些错误以及如何实现我的目标以包括我自己在 CustomPipelineOptions 和数据流特定 DataflowPipelineOptions 中定义的管道选项的任何想法将不胜感激。谢谢!

第一个问题pom.xml文件中,我们需要修改

<!-- dataflowRunner -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>
      <scope>runtime</scope> <!-- delete this! -->
    </dependency>

<!-- dataflowRunner -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>
    </dependency>

maven文档中的dependency scope中提到,将范围设置为runtime将表明该依赖“不是编译所必需的,而是执行所必需的”。我们在编译期间需要此依赖项,因此省略依赖项范围会导致默认范围 compile.

第二个问题是在定义main()中的options时:

public static void main(String[] args) {

    CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CustomPipelineOptions.class);

解决方法是去掉withValidation():

public static void main(String[] args) {

    CustomPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomPipelineOptions.class);

PipelineOptionsValidatorvalidate() 方法“验证传递的 PipelineOptions 是否符合传递接口的所有验证标准。”由于我们的选项是在验证后分配的,因此管道失败。