使用 Apache Beam 的数据流 sdk 写入 BigTable 时捕获 NullPointerException

NullPointerException caught when writing to BigTable using Apache Beam's dataflow sdk

我正在使用 Apache's Beam sdk 版本 0.2.0-incubating-SNAPSHOT 并尝试使用 Dataflow runner 将数据提取到 bigtable。不幸的是,在执行我使用 BigTableIO.Write 作为接收器的数据流管道时,我得到了 NullPointerException。已经检查了我的BigtableOptions,参数没问题,符合我的需要。

基本上,我创建并在我的管道的某个点我有将 PCollection<KV<ByteString, Iterable<Mutation>>> 写入我想要的 bigtable 的步骤:

final BigtableOptions.Builder optionsBuilder =
    new BigtableOptions.Builder().setProjectId(System.getProperty("PROJECT_ID"))
        .setInstanceId(System.getProperty("BT_INSTANCE_ID"));

// do intermediary steps and create PCollection<KV<ByteString, Iterable<Mutation>>> 
// to write to bigtable

// modifiedHits is a PCollection<KV<ByteString, Iterable<Mutation>>>
modifiedHits.apply("writting to big table", BigtableIO.write()
    .withBigtableOptions(optionsBuilder).withTableId(System.getProperty("BT_TABLENAME")));

p.run();

在执行管道时,我得到了 NullPointerException,在 public void processElement(ProcessContext c) 方法中精确指向 BigtableIO class:

(6e0ccd8407eed08b): java.lang.NullPointerException at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.processElement(BigtableIO.java:532)

我检查过此方法在写入 bigtable 之前正在处理所有元素,但不确定为什么我会在执行此管道时超时出现此类异常。根据下面的代码,此方法使用 bigtableWriter 属性来处理每个 c.element(),但我什至无法设置断点来调试 null 的确切位置。关于如何解决此问题的任何建议或意见?

@ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    checkForFailures();
    Futures.addCallback(
        bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
    ++recordsWritten;
  }

谢谢。

我查找了该作业及其类路径,如果我没记错的话,您似乎使用的是 beam-sdks-java-{core,io}0.3.0-incubating-SNAPSHOT 版本,但是 version 0.2.0-incubating-SNAPSHOT 的 [=] 13=].

我认为问题是因为这个 - 你必须使用相同的版本(更多细节:0.3.0 版中的 BigtableIO 使用 @Setup@Teardown 方法,但 runner 0.2.0尚不支持它们)。