为什么 Apache Beam 中的 CustomOptions 不继承 DataflowPipelineOptions 默认属性?
Why CustomOptions in Apache Beam is not inheriting DataflowPipelineOptions default properties?
我是 Apache Beam 的新手,正在尝试 运行 使用 DirectRunner 和 DataflowRunner 的示例读写程序。在我的用例中,几乎没有 CLI args,为了实现这一点,我创建了一个扩展 PipelineOptions 的接口 "CustomOptions.java"。
使用 DirectRunner 程序 运行 没问题,但使用 DataflowRunner,它说 "interface CustomOptions missing a property named 'project'"。
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<type>maven-plugin</type>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
CustomOptions.java(接口)
import org.apache.beam.sdk.options.PipelineOptions;
public interface CustomOptions extends PipelineOptions {
String getInput();
void setInput(String value);
String getOutput();
void setOutput(String value);
}
WordCount.java
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public class WordCount {
public static void main(String args[]) {
PipelineOptionsFactory.register(CustomOptions.class);
CustomOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("Read", TextIO.read().from(options.getInput()))
.apply("Write", TextIO.write().to(options.getOutput()));
p.run();
}
}
命令:
DirectRunner (Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath
DataflowRunner (Not Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath --runner=DataflowRunner --stagingLocation=gs://<tmp_path> --project=<projectId>
错误:
Exception in thread "main" java.lang.IllegalArgumentException: Class interface CustomOptions missing a property named 'project'.
at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1625)
at org.apache.beam.sdk.options.PipelineOptionsFactory.access0(PipelineOptionsFactory.java:115)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:298)
at WordCount.main(WordCount.java:13)
我尝试的第二件事是使用 DataflowPipelineOptions 而不是 PipelineOptions 来扩展 CustomOptions。也使用这个,我得到一个错误:
Exception in thread "main" java.lang.IllegalArgumentException: No filesystem found for scheme gs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:215)
at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:734)
at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1069)
at WordCount.main(WordCount.java:15)
二审多了一个问题,同样的代码不能用DirectRunner和DataflowRunner执行。因为在第二种情况下 "projectId" 是一个强制参数,不会在 DirectRunner 中指定。
经过几次试验和错误,我认为我做对了。
我使用与问题中提到的相同的 java classes,即使用 PipelineOptions 扩展 CustomOptions.java。我所做的唯一更改是 pom.xml。
现在我使用的是带有少量额外配置的 maven shade 插件,而不是 maven assembly 插件。有了这些,我取得了:
1. 同一个 jar 可以与 DirectRunner 或 DataflowRunner 一起使用。
2. 说明我想从命令行执行哪个主要class。
上一个'pom.xml':
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- add Main-Class to manifest file -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.dh.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<type>maven-plugin</type>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
新 'pom.xml':
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
当我读到这个答案时,这成为可能:
我是 Apache Beam 的新手,正在尝试 运行 使用 DirectRunner 和 DataflowRunner 的示例读写程序。在我的用例中,几乎没有 CLI args,为了实现这一点,我创建了一个扩展 PipelineOptions 的接口 "CustomOptions.java"。
使用 DirectRunner 程序 运行 没问题,但使用 DataflowRunner,它说 "interface CustomOptions missing a property named 'project'"。
pom.xml
<dependencies>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<type>maven-plugin</type>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
CustomOptions.java(接口)
import org.apache.beam.sdk.options.PipelineOptions;
public interface CustomOptions extends PipelineOptions {
String getInput();
void setInput(String value);
String getOutput();
void setOutput(String value);
}
WordCount.java
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public class WordCount {
public static void main(String args[]) {
PipelineOptionsFactory.register(CustomOptions.class);
CustomOptions options = PipelineOptionsFactory.fromArgs(args).as(CustomOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("Read", TextIO.read().from(options.getInput()))
.apply("Write", TextIO.write().to(options.getOutput()));
p.run();
}
}
命令:
DirectRunner (Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath
DataflowRunner (Not Working) : java -cp jarPath WordCount --input=inputPath --output=outputPath --runner=DataflowRunner --stagingLocation=gs://<tmp_path> --project=<projectId>
错误:
Exception in thread "main" java.lang.IllegalArgumentException: Class interface CustomOptions missing a property named 'project'.
at org.apache.beam.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1625)
at org.apache.beam.sdk.options.PipelineOptionsFactory.access0(PipelineOptionsFactory.java:115)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:298)
at WordCount.main(WordCount.java:13)
我尝试的第二件事是使用 DataflowPipelineOptions 而不是 PipelineOptions 来扩展 CustomOptions。也使用这个,我得到一个错误:
Exception in thread "main" java.lang.IllegalArgumentException: No filesystem found for scheme gs
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:463)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:533)
at org.apache.beam.sdk.io.FileBasedSink.convertToFileResourceIfPossible(FileBasedSink.java:215)
at org.apache.beam.sdk.io.TextIO$TypedWrite.to(TextIO.java:734)
at org.apache.beam.sdk.io.TextIO$Write.to(TextIO.java:1069)
at WordCount.main(WordCount.java:15)
二审多了一个问题,同样的代码不能用DirectRunner和DataflowRunner执行。因为在第二种情况下 "projectId" 是一个强制参数,不会在 DirectRunner 中指定。
经过几次试验和错误,我认为我做对了。 我使用与问题中提到的相同的 java classes,即使用 PipelineOptions 扩展 CustomOptions.java。我所做的唯一更改是 pom.xml。
现在我使用的是带有少量额外配置的 maven shade 插件,而不是 maven assembly 插件。有了这些,我取得了: 1. 同一个 jar 可以与 DirectRunner 或 DataflowRunner 一起使用。 2. 说明我想从命令行执行哪个主要class。
上一个'pom.xml':
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<!-- add Main-Class to manifest file -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.dh.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.2.0</version>
<type>maven-plugin</type>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
新 'pom.xml':
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>2.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>2.16.0</version>
</dependency>
</dependencies>
当我读到这个答案时,这成为可能: