写入 BigQuery 时出现 MojoExecutionException

MojoExecutionException while writing to BigQuery

我对 GCP 比较陌生,正在尝试了解 Google Cloud 上提供的不同服务。

在使用 Bigquery 尝试数据流 (Beam) 时出现异常,我无法从显示的错误中找到任何信息。 对此的任何帮助将不胜感激。

上下文:

我正在尝试 运行 一个数据流管道,它将从数据存储中读取整个数据,然后将其写入 Bigquery。

代码

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.Value;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.druk.twc.tods.ToDatastore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static com.google.datastore.v1.client.DatastoreHelper.getString;

public class DsToBQ {
    private static final Logger LOG = LoggerFactory.getLogger(ToDatastore.class);

    static class GetContentFn extends DoFn<Entity, String> {
        private static final long serialVersionUID = 0;

        @ProcessElement
        public void processElement(ProcessContext c) {
            Map<String, Value> props = c.element().getPropertiesMap();
            Value value = props.get("content");
            if (value != null) {
                c.output(getString(value));
            }
        }
    }

    static class ToTableRowFn extends DoFn<String, TableRow> {
        private static final long serialVersionUID = 0;

        @ProcessElement
        public void processElement(ProcessContext c) {
            TableRow row = new TableRow()
                    .set("number", Integer.parseInt(c.element()));
            c.output(row);
        }
    }

    public static interface Options extends PipelineOptions {

        @Description("Dataset entity kind")
        @Default.String("pos")
        String getKind();
        void setKind(String value);

        @Description("Table location in formate: YOUR_PROJECT_ID:DATASET_ID.TABLE_ID")
        @Validation.Required
        String getTable();
        void setTable(String table);
    }

    public static void main(String args[]) {
        //Register all the custom option
        PipelineOptionsFactory.register(Options.class);
        //Create pipeline options
        Options option = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        // Build a query:
        Query.Builder q = Query.newBuilder();
        // read all entities of the specified kind.
        q.addKindBuilder().setName(option.getKind());
        Query query = q.build();

        // Build the table schema for the output table.
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("number").setType("INTEGER"));
        TableSchema schema = new TableSchema().setFields(fields);


        Pipeline p = Pipeline.create(option);
        p
                .apply("Read From Datastore", DatastoreIO.v1().read().withQuery(query))
                .apply("Multiply with 3", ParDo.of(new GetContentFn()))
                .apply("Convert to Table row", ParDo.of(new ToTableRowFn()))
                .apply("Write for BQ", BigQueryIO.writeTableRows()
                        .to(option.getTable())
                        .withSchema(schema));

        p.run();
    }
}

错误

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on project pubsub-bigquery: An exception occured while executing the Java class. null: InvocationTargetException: projectId -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on project pubsub-bigquery: An exception occured while executing the Java class. null
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:212)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
    at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
    at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
    at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
    at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
    at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
    at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
    at org.apache.maven.cli.MavenCli.execute(MavenCli.java:863)
    at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:288)
    at org.apache.maven.cli.MavenCli.main(MavenCli.java:199)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
    at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
    at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
    at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
Caused by: org.apache.maven.plugin.MojoExecutionException: An exception occured while executing the Java class. null
    at org.codehaus.mojo.exec.ExecJavaMojo.execute(ExecJavaMojo.java:345)
    at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
    at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:207)
    ... 20 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo.run(ExecJavaMojo.java:293)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException: projectId
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:787)
    at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.validate(DatastoreV1.java:624)
    at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:610)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:590)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:594)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access0(TransformHierarchy.java:276)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:210)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:440)
    at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:552)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:296)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:283)
    at org.druk.twc.tobq.DsToBQ.main(DsToBQ.java:126)
    ... 6 more
[ERROR]
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

执行命令

mvn compile exec:java -Dexec.mainClass=<Class Name>     
-Dexec.args="--runner=DataflowRunner \
--project=<Project ID> --kind=<Kind Name> \
--gcpTempLocation=gs://<To temp Location> \
--table=<Project ID>:<Dataset ID>.<Table ID>" -Pdataflow-runner

从错误中,我看到无法检索要从 Datastore 读取的项目。在您的管道配置中,您必须使用以下方式指定数据存储项目:

p
  .apply("Read From Datastore", DatastoreIO.v1().read()
    .withProject("..")
    .withQuery(query))

我已打开 JIRA 问题 BEAM-2883 以改进此错误消息。