通过 Dataflow 从 Bigtable 到 GCS(反之亦然)

From Bigtable To GCS (and vice versa) via Dataflow

我们正在尝试 运行 每日数据流管道读取 Bigtable 并将数据转储到 GCS(使用 HBase 的 Scan 和 BaseResultCoder 作为编码器)如下(只是为了突出这个想法):

  Pipeline pipeline = Pipeline.create(options); 
  Scan scan = new Scan();
  scan.setCacheBlocks(false).setMaxVersions(1);
  scan.addFamily(Bytes.toBytes("f"));
  CloudBigtableScanConfiguration btConfig = BCloudBigtableScanConfiguration.Builder().withProjectId("aaa").withInstanceId("bbb").withTableId("ccc").withScan(scan).build();
  pipeline.apply(Read.from(CloudBigtableIO.read(btConfig))).apply(TextIO.Write.to("gs://bucket/dir/file").withCoder(HBaseResultCoder.getInstance()));
  pipeline.run();

这似乎 运行 完全符合预期。

现在,如果需要,我们希望能够使用 GCS 中的转储文件进行恢复作业。也就是说,我们想要一个数据流管道,它从 GCS 读取转储数据(即 PCollection)并创建 Mutations(基本上是 'Put' 个对象)。出于某种原因,以下代码因一堆 NullPointerExceptions 而失败。我们不确定为什么会这样——在下面添加了 if 语句来检查 null 或 0 长度的字符串,以查看这是否会有所作为,但事实并非如此。

// Part of DoFn<Result,Mutation>
@Override
public void processElement(ProcessContext c) {
  Result result = c.element();
  byte[] row = result.getRow();
  if (row == null || row.length == 0) { // NullPointerException at this line
    return;
  }
  Put mutation = new Put(result.getRow());
  // go through the column/value entries of this row, and create a corresponding put mutation.
  for (Entry<byte[], byte[]> entry : result.getFamilyMap(Bytes.toBytes(cf)).entrySet()) {
    byte[] qualifier = entry.getKey();
    if (qualifier == null || qualifier.length == 0) {
      continue;
    }
    byte[] val = entry.getValue();
    if (val == null || val.length == 0) {
      continue;
    }
    mutation.addImmutable(cf_bytes, qualifier, entry.getValue());
  }
  c.output(mutation);
}

我们得到的错误如下(第83行标在上面):

(2a6ad6372944050d): java.lang.NullPointerException at some.package.RecoveryFromGcs$CreateMutationFromResult.processElement(RecoveryFromGcs.java:83)

我有两个问题: 1. 有人尝试在 PCollection 上执行 ParDo 以获取要写入 bigtable 的 PCollection 时是否遇到过类似情况? 2. 这是合理的做法吗?最终目标是能够通过备份定期留下我们的大表(针对特定列族)的每日快照,以防万一发生坏事。我们希望能够通过dataflow读取备份数据,需要的时候写入bigtable。

任何建议和帮助将不胜感激!

-------- 编辑

下面是扫描 Bigtable 并将数据转储到 GCS 的代码: (一些不相关的细节被隐藏了。)

public static void execute(Options options) {
  Pipeline pipeline = Pipeline.create(options);
  final String cf = "f"; // some specific column family.
  Scan scan = new Scan();
  scan.setCacheBlocks(false).setMaxVersions(1); // Disable caching and read only the latest cell.
  scan.addFamily(Bytes.toBytes(cf));

  CloudBigtableScanConfiguration btConfig =
      BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), "some-bigtable-name").withScan(scan).build();

  PCollection<Result> result = pipeline.apply(Read.from(CloudBigtableIO.read(btConfig)));

  PCollection<Mutation> mutation =
      result.apply(ParDo.of(new CreateMutationFromResult(cf))).setCoder(new HBaseMutationCoder());

  mutation.apply(TextIO.Write.to("gs://path-to-files").withCoder(new HBaseMutationCoder()));

  pipeline.run();
}

}

读取上述代码输出的作业有如下代码: (这是从 GCS 读取时抛出的异常)

public static void execute(Options options) {
  Pipeline pipeline = Pipeline.create(options);
  PCollection<Mutation> mutations = pipeline.apply(TextIO.Read
      .from("gs://path-to-files").withCoder(new HBaseMutationCoder()));

  CloudBigtableScanConfiguration config =
      BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), btTarget).build();
  if (config != null) {
    CloudBigtableIO.initializeForWrite(pipeline);
    mutations.apply(CloudBigtableIO.writeToTable(config));
  }
  pipeline.run();
}

}

我得到的错误 (https://jpst.it/Qr6M) 有点令人困惑,因为突变都是 Put 对象,但错误是关于 'Delete' 对象。

最好在 cloud bigtable client github issues page 上讨论这个问题。我们目前正在开发像这样的导入/导出功能,因此我们会迅速做出响应。我们还将自己探索这种方法,即使您不添加 github 问题。 github 问题将使我们能够更好地沟通。

FWIW,我不明白您如何在突出显示的行上获得 NPE。您确定线路正确吗?

编辑 (12/12):

以下 processElement() 方法应该可以将 Result 转换为 Put:

@Override
public void processElement(DoFn<Result, Mutation>.ProcessContext c) throws Exception {
  Result result = c.element();
  byte[] row = result.getRow();
  if (row != null && row.length > 0) {
    Put put = new Put(row);
    for (Cell cell : result.rawCells()) {
      put.add(cell);
    }
    c.output(put);
  }
}