通过 Dataflow 从 Bigtable 到 GCS(反之亦然)
From Bigtable To GCS (and vice versa) via Dataflow
我们正在尝试 运行 每日数据流管道读取 Bigtable 并将数据转储到 GCS(使用 HBase 的 Scan 和 BaseResultCoder 作为编码器)如下(只是为了突出这个想法):
Pipeline pipeline = Pipeline.create(options);
Scan scan = new Scan();
scan.setCacheBlocks(false).setMaxVersions(1);
scan.addFamily(Bytes.toBytes("f"));
CloudBigtableScanConfiguration btConfig = BCloudBigtableScanConfiguration.Builder().withProjectId("aaa").withInstanceId("bbb").withTableId("ccc").withScan(scan).build();
pipeline.apply(Read.from(CloudBigtableIO.read(btConfig))).apply(TextIO.Write.to("gs://bucket/dir/file").withCoder(HBaseResultCoder.getInstance()));
pipeline.run();
这似乎 运行 完全符合预期。
现在,如果需要,我们希望能够使用 GCS 中的转储文件进行恢复作业。也就是说,我们想要一个数据流管道,它从 GCS 读取转储数据(即 PCollection)并创建 Mutations(基本上是 'Put' 个对象)。出于某种原因,以下代码因一堆 NullPointerExceptions 而失败。我们不确定为什么会这样——在下面添加了 if 语句来检查 null 或 0 长度的字符串,以查看这是否会有所作为,但事实并非如此。
// Part of DoFn<Result,Mutation>
@Override
public void processElement(ProcessContext c) {
Result result = c.element();
byte[] row = result.getRow();
if (row == null || row.length == 0) { // NullPointerException at this line
return;
}
Put mutation = new Put(result.getRow());
// go through the column/value entries of this row, and create a corresponding put mutation.
for (Entry<byte[], byte[]> entry : result.getFamilyMap(Bytes.toBytes(cf)).entrySet()) {
byte[] qualifier = entry.getKey();
if (qualifier == null || qualifier.length == 0) {
continue;
}
byte[] val = entry.getValue();
if (val == null || val.length == 0) {
continue;
}
mutation.addImmutable(cf_bytes, qualifier, entry.getValue());
}
c.output(mutation);
}
我们得到的错误如下(第83行标在上面):
(2a6ad6372944050d): java.lang.NullPointerException at some.package.RecoveryFromGcs$CreateMutationFromResult.processElement(RecoveryFromGcs.java:83)
我有两个问题:
1. 有人尝试在 PCollection 上执行 ParDo 以获取要写入 bigtable 的 PCollection 时是否遇到过类似情况?
2. 这是合理的做法吗?最终目标是能够通过备份定期留下我们的大表(针对特定列族)的每日快照,以防万一发生坏事。我们希望能够通过dataflow读取备份数据,需要的时候写入bigtable。
任何建议和帮助将不胜感激!
-------- 编辑
下面是扫描 Bigtable 并将数据转储到 GCS 的代码:
(一些不相关的细节被隐藏了。)
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
final String cf = "f"; // some specific column family.
Scan scan = new Scan();
scan.setCacheBlocks(false).setMaxVersions(1); // Disable caching and read only the latest cell.
scan.addFamily(Bytes.toBytes(cf));
CloudBigtableScanConfiguration btConfig =
BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), "some-bigtable-name").withScan(scan).build();
PCollection<Result> result = pipeline.apply(Read.from(CloudBigtableIO.read(btConfig)));
PCollection<Mutation> mutation =
result.apply(ParDo.of(new CreateMutationFromResult(cf))).setCoder(new HBaseMutationCoder());
mutation.apply(TextIO.Write.to("gs://path-to-files").withCoder(new HBaseMutationCoder()));
pipeline.run();
}
}
读取上述代码输出的作业有如下代码:
(这是从 GCS 读取时抛出的异常)
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<Mutation> mutations = pipeline.apply(TextIO.Read
.from("gs://path-to-files").withCoder(new HBaseMutationCoder()));
CloudBigtableScanConfiguration config =
BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), btTarget).build();
if (config != null) {
CloudBigtableIO.initializeForWrite(pipeline);
mutations.apply(CloudBigtableIO.writeToTable(config));
}
pipeline.run();
}
}
我得到的错误 (https://jpst.it/Qr6M) 有点令人困惑,因为突变都是 Put 对象,但错误是关于 'Delete' 对象。
最好在 cloud bigtable client github issues page 上讨论这个问题。我们目前正在开发像这样的导入/导出功能,因此我们会迅速做出响应。我们还将自己探索这种方法,即使您不添加 github 问题。 github 问题将使我们能够更好地沟通。
FWIW,我不明白您如何在突出显示的行上获得 NPE。您确定线路正确吗?
编辑 (12/12):
以下 processElement()
方法应该可以将 Result 转换为 Put:
@Override
public void processElement(DoFn<Result, Mutation>.ProcessContext c) throws Exception {
Result result = c.element();
byte[] row = result.getRow();
if (row != null && row.length > 0) {
Put put = new Put(row);
for (Cell cell : result.rawCells()) {
put.add(cell);
}
c.output(put);
}
}
我们正在尝试 运行 每日数据流管道读取 Bigtable 并将数据转储到 GCS(使用 HBase 的 Scan 和 BaseResultCoder 作为编码器)如下(只是为了突出这个想法):
Pipeline pipeline = Pipeline.create(options);
Scan scan = new Scan();
scan.setCacheBlocks(false).setMaxVersions(1);
scan.addFamily(Bytes.toBytes("f"));
CloudBigtableScanConfiguration btConfig = BCloudBigtableScanConfiguration.Builder().withProjectId("aaa").withInstanceId("bbb").withTableId("ccc").withScan(scan).build();
pipeline.apply(Read.from(CloudBigtableIO.read(btConfig))).apply(TextIO.Write.to("gs://bucket/dir/file").withCoder(HBaseResultCoder.getInstance()));
pipeline.run();
这似乎 运行 完全符合预期。
现在,如果需要,我们希望能够使用 GCS 中的转储文件进行恢复作业。也就是说,我们想要一个数据流管道,它从 GCS 读取转储数据(即 PCollection)并创建 Mutations(基本上是 'Put' 个对象)。出于某种原因,以下代码因一堆 NullPointerExceptions 而失败。我们不确定为什么会这样——在下面添加了 if 语句来检查 null 或 0 长度的字符串,以查看这是否会有所作为,但事实并非如此。
// Part of DoFn<Result,Mutation>
@Override
public void processElement(ProcessContext c) {
Result result = c.element();
byte[] row = result.getRow();
if (row == null || row.length == 0) { // NullPointerException at this line
return;
}
Put mutation = new Put(result.getRow());
// go through the column/value entries of this row, and create a corresponding put mutation.
for (Entry<byte[], byte[]> entry : result.getFamilyMap(Bytes.toBytes(cf)).entrySet()) {
byte[] qualifier = entry.getKey();
if (qualifier == null || qualifier.length == 0) {
continue;
}
byte[] val = entry.getValue();
if (val == null || val.length == 0) {
continue;
}
mutation.addImmutable(cf_bytes, qualifier, entry.getValue());
}
c.output(mutation);
}
我们得到的错误如下(第83行标在上面):
(2a6ad6372944050d): java.lang.NullPointerException at some.package.RecoveryFromGcs$CreateMutationFromResult.processElement(RecoveryFromGcs.java:83)
我有两个问题: 1. 有人尝试在 PCollection 上执行 ParDo 以获取要写入 bigtable 的 PCollection 时是否遇到过类似情况? 2. 这是合理的做法吗?最终目标是能够通过备份定期留下我们的大表(针对特定列族)的每日快照,以防万一发生坏事。我们希望能够通过dataflow读取备份数据,需要的时候写入bigtable。
任何建议和帮助将不胜感激!
-------- 编辑
下面是扫描 Bigtable 并将数据转储到 GCS 的代码: (一些不相关的细节被隐藏了。)
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
final String cf = "f"; // some specific column family.
Scan scan = new Scan();
scan.setCacheBlocks(false).setMaxVersions(1); // Disable caching and read only the latest cell.
scan.addFamily(Bytes.toBytes(cf));
CloudBigtableScanConfiguration btConfig =
BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), "some-bigtable-name").withScan(scan).build();
PCollection<Result> result = pipeline.apply(Read.from(CloudBigtableIO.read(btConfig)));
PCollection<Mutation> mutation =
result.apply(ParDo.of(new CreateMutationFromResult(cf))).setCoder(new HBaseMutationCoder());
mutation.apply(TextIO.Write.to("gs://path-to-files").withCoder(new HBaseMutationCoder()));
pipeline.run();
}
}
读取上述代码输出的作业有如下代码: (这是从 GCS 读取时抛出的异常)
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<Mutation> mutations = pipeline.apply(TextIO.Read
.from("gs://path-to-files").withCoder(new HBaseMutationCoder()));
CloudBigtableScanConfiguration config =
BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), btTarget).build();
if (config != null) {
CloudBigtableIO.initializeForWrite(pipeline);
mutations.apply(CloudBigtableIO.writeToTable(config));
}
pipeline.run();
}
}
我得到的错误 (https://jpst.it/Qr6M) 有点令人困惑,因为突变都是 Put 对象,但错误是关于 'Delete' 对象。
最好在 cloud bigtable client github issues page 上讨论这个问题。我们目前正在开发像这样的导入/导出功能,因此我们会迅速做出响应。我们还将自己探索这种方法,即使您不添加 github 问题。 github 问题将使我们能够更好地沟通。
FWIW,我不明白您如何在突出显示的行上获得 NPE。您确定线路正确吗?
编辑 (12/12):
以下 processElement()
方法应该可以将 Result 转换为 Put:
@Override
public void processElement(DoFn<Result, Mutation>.ProcessContext c) throws Exception {
Result result = c.element();
byte[] row = result.getRow();
if (row != null && row.length > 0) {
Put put = new Put(row);
for (Cell cell : result.rawCells()) {
put.add(cell);
}
c.output(put);
}
}