如何使用其他命令行参数显式设置直接运行器?
How do I explicitly set a direct runner with other command line arguments?
我写了这个管道,但是当我 运行 它作为一个 jar 时,当我在我的 build.gradle 中指定它时,它找不到直接的 运行ner,当我尝试传递参数 --运行ner=direct 或者
--运行ner=直接运行ner。下面是我的代码和我的 build.gradle 文件。我正在 运行ning gradle 任务 fatJar 以创建 jar,导航到我的 build/libs 文件夹到 运行 jar 并看到此错误。这是我正在使用的命令 java -jar filepipeline-all-1.0-SNAPSHOT.jar --input="../testdata" --output="./manifest.json" --runner=DirectRunner
非常感谢对此问题的任何帮助!
我的文件夹结构如下所示:
--src
- 主要的
--java
--com.pipeline
--BeamPipeline.java
build.gradle
plugins {
id 'java'
}
group 'com.dustin'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
task fatJar(type: Jar) {
manifest {
attributes 'Implementation-Title': 'Gradle Jar File',
'Implementation-Version': version,
'Main-Class': 'com.pipeline.BeamPipeline'
}
baseName = project.name + '-all'
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
apply plugin: 'application'
mainClassName = 'src.main.java.com.pipeline.BeamPipeline'
dependencies {
runtime group: 'org.apache.beam', name: 'beam-runners-direct-java', version:'2.8.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version:'2.8.0'
runtime group: 'org.slf4j', name: 'slf4j-jdk14', version:'1.7.25'
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
compile group: 'commons-io', name: 'commons-io', version: '2.6'
compile group: 'commons-codec', name:'commons-codec', version:'1.12'
compileOnly 'org.projectlombok:lombok:1.18.6'
compile group: 'com.google.code.gson', name: 'gson', version: '2.7'
compile group: 'org.json', name: 'json', version: '20180813'
annotationProcessor 'org.projectlombok:lombok:1.18.6'
}
管道:
package com.pipeline;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
public class BeamPipeline {
private static final Logger log = LoggerFactory.getLogger(BeamPipeline.class);
public static interface MyOptions extends PipelineOptions {
@Validation.Required
@Description("Input Path(with gs:// prefix)")
String getInput();
void setInput(String value);
@Validation.Required
@Description("Output Path (with gs:// prefix)")
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
File dir = new File(options.getInput());
String output = options.getOutput();
for (File file : dir.listFiles()) {
String inputString = file.toString();
p
.apply("Match Files", FileIO.match().filepattern(inputString))
.apply("Read Files", FileIO.readMatches())
.apply(MapElements.via(new SimpleFunction<FileIO.ReadableFile, KV<String, String>>() {
public KV<String, String> apply(FileIO.ReadableFile file) {
String temp = null;
try {
temp = file.readFullyAsUTF8String();
} catch (IOException e) {
}
String sha256hex = org.apache.commons.codec.digest.DigestUtils.sha256Hex(temp);
return KV.of(file.getMetadata().resourceId().toString(), sha256hex);
}
}))
.apply("Print", ParDo.of(new DoFn<KV<String, String>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
FileWriter fileWriter = new FileWriter(output,true);
JSONObject obj = new JSONObject();
obj.put(c.element().getKey(), c.element().getValue());
fileWriter.write(obj.toString());
fileWriter.close();
log.info(String.format("File: %s, SHA-256 %s", c.element().getKey(), c.element().getValue()));
}
}));
}
p.run().waitUntilFinish();
}
}
beam-runners-direct-java
作为运行时依赖项添加,因此不会添加到 fat jar。
您可以添加 beam-runners-direct-java
作为编译时依赖项以使用它。
plugins {
id 'java'
}
group 'com.dustin'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
task fatJar(type: Jar) {
manifest {
attributes 'Implementation-Title': 'Gradle Jar File',
'Implementation-Version': version,
'Main-Class': 'com.pipeline.BeamPipeline'
}
baseName = project.name + '-all'
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
apply plugin: 'application'
mainClassName = 'src.main.java.com.pipeline.BeamPipeline'
dependencies {
compile group: 'org.apache.beam', name: 'beam-runners-direct-java', version:'2.8.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version:'2.8.0'
runtime group: 'org.slf4j', name: 'slf4j-jdk14', version:'1.7.25'
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
compile group: 'commons-io', name: 'commons-io', version: '2.6'
compile group: 'commons-codec', name:'commons-codec', version:'1.12'
compileOnly 'org.projectlombok:lombok:1.18.6'
compile group: 'com.google.code.gson', name: 'gson', version: '2.7'
compile group: 'org.json', name: 'json', version: '20180813'
annotationProcessor 'org.projectlombok:lombok:1.18.6'
}
或者,如果您不想将 DirectRunner
与 fatjar 一起打包而只想将其用于测试,您可以创建单独的 DirectRunner
jar 并将其添加到 class部分同时 运行 管道。
plugins {
id 'java'
}
group 'com.dustin'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
task fatJar(type: Jar) {
manifest {
attributes 'Implementation-Title': 'Gradle Jar File',
'Implementation-Version': version,
'Main-Class': 'com.pipeline.BeamPipeline'
}
baseName = project.name + '-all'
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
task directrunnerjar(type: Jar) {
manifest {
attributes 'Implementation-Title': 'Gradle Jar File',
'Implementation-Version': version,
'Main-Class': 'com.pipeline.BeamPipeline'
}
baseName = project.name + '-runtime'
from { configurations.runtime.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
apply plugin: 'application'
mainClassName = 'src.main.java.com.pipeline.BeamPipeline'
dependencies {
runtime group: 'org.apache.beam', name: 'beam-runners-direct-java', version:'2.8.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version:'2.8.0'
runtime group: 'org.slf4j', name: 'slf4j-jdk14', version:'1.7.25'
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
compile group: 'commons-io', name: 'commons-io', version: '2.6'
compile group: 'commons-codec', name:'commons-codec', version:'1.12'
compileOnly 'org.projectlombok:lombok:1.18.6'
compile group: 'com.google.code.gson', name: 'gson', version: '2.7'
compile group: 'org.json', name: 'json', version: '20180813'
annotationProcessor 'org.projectlombok:lombok:1.18.6'
}
java -cp "libs/myartifact-runtime-1.0-SNAPSHOT.jar:libs/filepipeline-all-1.0-SNAPSHOT" com.pipeline.BeamPipeline --input="../testdata" --output="./manifest.json" --runner=DirectRunner
我写了这个管道,但是当我 运行 它作为一个 jar 时,当我在我的 build.gradle 中指定它时,它找不到直接的 运行ner,当我尝试传递参数 --运行ner=direct 或者
--运行ner=直接运行ner。下面是我的代码和我的 build.gradle 文件。我正在 运行ning gradle 任务 fatJar 以创建 jar,导航到我的 build/libs 文件夹到 运行 jar 并看到此错误。这是我正在使用的命令 java -jar filepipeline-all-1.0-SNAPSHOT.jar --input="../testdata" --output="./manifest.json" --runner=DirectRunner
非常感谢对此问题的任何帮助!
我的文件夹结构如下所示: --src - 主要的 --java --com.pipeline --BeamPipeline.java
build.gradle
plugins {
id 'java'
}
group 'com.dustin'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
task fatJar(type: Jar) {
manifest {
attributes 'Implementation-Title': 'Gradle Jar File',
'Implementation-Version': version,
'Main-Class': 'com.pipeline.BeamPipeline'
}
baseName = project.name + '-all'
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
apply plugin: 'application'
mainClassName = 'src.main.java.com.pipeline.BeamPipeline'
dependencies {
runtime group: 'org.apache.beam', name: 'beam-runners-direct-java', version:'2.8.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version:'2.8.0'
runtime group: 'org.slf4j', name: 'slf4j-jdk14', version:'1.7.25'
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
compile group: 'commons-io', name: 'commons-io', version: '2.6'
compile group: 'commons-codec', name:'commons-codec', version:'1.12'
compileOnly 'org.projectlombok:lombok:1.18.6'
compile group: 'com.google.code.gson', name: 'gson', version: '2.7'
compile group: 'org.json', name: 'json', version: '20180813'
annotationProcessor 'org.projectlombok:lombok:1.18.6'
}
管道:
package com.pipeline;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.KV;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
public class BeamPipeline {
private static final Logger log = LoggerFactory.getLogger(BeamPipeline.class);
public static interface MyOptions extends PipelineOptions {
@Validation.Required
@Description("Input Path(with gs:// prefix)")
String getInput();
void setInput(String value);
@Validation.Required
@Description("Output Path (with gs:// prefix)")
String getOutput();
void setOutput(String value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
File dir = new File(options.getInput());
String output = options.getOutput();
for (File file : dir.listFiles()) {
String inputString = file.toString();
p
.apply("Match Files", FileIO.match().filepattern(inputString))
.apply("Read Files", FileIO.readMatches())
.apply(MapElements.via(new SimpleFunction<FileIO.ReadableFile, KV<String, String>>() {
public KV<String, String> apply(FileIO.ReadableFile file) {
String temp = null;
try {
temp = file.readFullyAsUTF8String();
} catch (IOException e) {
}
String sha256hex = org.apache.commons.codec.digest.DigestUtils.sha256Hex(temp);
return KV.of(file.getMetadata().resourceId().toString(), sha256hex);
}
}))
.apply("Print", ParDo.of(new DoFn<KV<String, String>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
FileWriter fileWriter = new FileWriter(output,true);
JSONObject obj = new JSONObject();
obj.put(c.element().getKey(), c.element().getValue());
fileWriter.write(obj.toString());
fileWriter.close();
log.info(String.format("File: %s, SHA-256 %s", c.element().getKey(), c.element().getValue()));
}
}));
}
p.run().waitUntilFinish();
}
}
beam-runners-direct-java
作为运行时依赖项添加,因此不会添加到 fat jar。
您可以添加 beam-runners-direct-java
作为编译时依赖项以使用它。
plugins {
id 'java'
}
group 'com.dustin'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
task fatJar(type: Jar) {
manifest {
attributes 'Implementation-Title': 'Gradle Jar File',
'Implementation-Version': version,
'Main-Class': 'com.pipeline.BeamPipeline'
}
baseName = project.name + '-all'
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
apply plugin: 'application'
mainClassName = 'src.main.java.com.pipeline.BeamPipeline'
dependencies {
compile group: 'org.apache.beam', name: 'beam-runners-direct-java', version:'2.8.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version:'2.8.0'
runtime group: 'org.slf4j', name: 'slf4j-jdk14', version:'1.7.25'
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
compile group: 'commons-io', name: 'commons-io', version: '2.6'
compile group: 'commons-codec', name:'commons-codec', version:'1.12'
compileOnly 'org.projectlombok:lombok:1.18.6'
compile group: 'com.google.code.gson', name: 'gson', version: '2.7'
compile group: 'org.json', name: 'json', version: '20180813'
annotationProcessor 'org.projectlombok:lombok:1.18.6'
}
或者,如果您不想将 DirectRunner
与 fatjar 一起打包而只想将其用于测试,您可以创建单独的 DirectRunner
jar 并将其添加到 class部分同时 运行 管道。
plugins {
id 'java'
}
group 'com.dustin'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
repositories {
mavenCentral()
}
task fatJar(type: Jar) {
manifest {
attributes 'Implementation-Title': 'Gradle Jar File',
'Implementation-Version': version,
'Main-Class': 'com.pipeline.BeamPipeline'
}
baseName = project.name + '-all'
from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
task directrunnerjar(type: Jar) {
manifest {
attributes 'Implementation-Title': 'Gradle Jar File',
'Implementation-Version': version,
'Main-Class': 'com.pipeline.BeamPipeline'
}
baseName = project.name + '-runtime'
from { configurations.runtime.collect { it.isDirectory() ? it : zipTree(it) } }
with jar
}
apply plugin: 'application'
mainClassName = 'src.main.java.com.pipeline.BeamPipeline'
dependencies {
runtime group: 'org.apache.beam', name: 'beam-runners-direct-java', version:'2.8.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-core', version:'2.8.0'
runtime group: 'org.slf4j', name: 'slf4j-jdk14', version:'1.7.25'
testCompile group: 'junit', name: 'junit', version: '4.12'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
compile group: 'commons-io', name: 'commons-io', version: '2.6'
compile group: 'commons-codec', name:'commons-codec', version:'1.12'
compileOnly 'org.projectlombok:lombok:1.18.6'
compile group: 'com.google.code.gson', name: 'gson', version: '2.7'
compile group: 'org.json', name: 'json', version: '20180813'
annotationProcessor 'org.projectlombok:lombok:1.18.6'
}
java -cp "libs/myartifact-runtime-1.0-SNAPSHOT.jar:libs/filepipeline-all-1.0-SNAPSHOT" com.pipeline.BeamPipeline --input="../testdata" --output="./manifest.json" --runner=DirectRunner