在使用 Beam 导入 HBase 期间,有没有办法 modify/create PCollection 中的新密钥?
Is there a way to modify/create new Keys in a PCollection during HBase Import using Beam?
我想在 Apache Beam 导入作业期间将 prefix/suffix 添加到 HBase 密钥。下面是创建保存 HBase 数据的 PCollection 的代码:
Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
HBaseSnapshotInputConfigBuilder configurationBuilder =
new HBaseSnapshotInputConfigBuilder()
.setProjectId(opts.getProject())
.setHbaseSnapshotSourceDir(opts.getHbaseSnapshotSourceDir())
.setSnapshotName(opts.getSnapshotName())
.setRestoreDirSuffix(opts.getJobName());
PCollection<KV<ImmutableBytesWritable, Result>> readResult =
pipeline.apply(
"Read from HBase Snapshot",
HadoopFormatIO.<ImmutableBytesWritable, Result>read()
.withConfiguration(configurationBuilder.build()));
// Somehow create a new PCollection with the transformed Key - derived from the PCollection above.
PCollection<Mutation> copyResult = readResult.apply("Prefixes", ParDo.of(new TransformFn()));
在 PTransform 期间,我收到这样的数据:
public void processElement(ProcessContext context) throws Exception {
KV<ImmutableBytesWritable, Result> kv = context.element();
List<Cell> cells = checkEmptyRow(kv);
if (cells.isEmpty()) {
return;
}
// Preprocess delete markers
if (hasDeleteMarkers(cells)) {
cells = preprocessDeleteMarkers(cells);
}
// Split the row into multiple puts if it exceeds the maximum mutation limit
Iterator<Cell> cellIt = cells.iterator();
while (cellIt.hasNext()) {
Put put = new Put(kv.getKey().get());
for (int i = 0; i < MAX_CELLS && cellIt.hasNext(); i++) {
put.add(cellIt.next());
}
context.output(put);
}
}
在哪里可以更新 Hbase 密钥? HBase Put 对象需要与 context.element() 具有相同的键,否则转换失败:
InvalidArgumentException: Didn't receive a result for this mutation entry
似乎 PCollection -> ProcessContext -> HBase Put -> HBase Cell 在导入期间都需要维护相同的密钥 ...因此我无法在已创建 PCollection。
我如何着手创建一个新的 PCollection 来保存相同的 Hbase 数据但不同的 HBase 密钥?
如果我没记错的话,您不能只使用新键将单元格复制到 Put
中,因为该单元格已经包含旧行键。
来自 Cell
Javadoc:
唯一性由行、列族、列限定符、时间戳和类型的组合决定。
您需要使用新密钥创建一个新的 Cell
,将 CF 名称、限定符名称、时间戳、值等复制到其中,并将此新单元格添加到新的 Put
对象中。
PS:顺便说一句,Beam 中有一个专用的 HbaseIO
,比 HadopFormatIO
.
更容易使用
我想在 Apache Beam 导入作业期间将 prefix/suffix 添加到 HBase 密钥。下面是创建保存 HBase 数据的 PCollection 的代码:
Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
HBaseSnapshotInputConfigBuilder configurationBuilder =
new HBaseSnapshotInputConfigBuilder()
.setProjectId(opts.getProject())
.setHbaseSnapshotSourceDir(opts.getHbaseSnapshotSourceDir())
.setSnapshotName(opts.getSnapshotName())
.setRestoreDirSuffix(opts.getJobName());
PCollection<KV<ImmutableBytesWritable, Result>> readResult =
pipeline.apply(
"Read from HBase Snapshot",
HadoopFormatIO.<ImmutableBytesWritable, Result>read()
.withConfiguration(configurationBuilder.build()));
// Somehow create a new PCollection with the transformed Key - derived from the PCollection above.
PCollection<Mutation> copyResult = readResult.apply("Prefixes", ParDo.of(new TransformFn()));
在 PTransform 期间,我收到这样的数据:
public void processElement(ProcessContext context) throws Exception {
KV<ImmutableBytesWritable, Result> kv = context.element();
List<Cell> cells = checkEmptyRow(kv);
if (cells.isEmpty()) {
return;
}
// Preprocess delete markers
if (hasDeleteMarkers(cells)) {
cells = preprocessDeleteMarkers(cells);
}
// Split the row into multiple puts if it exceeds the maximum mutation limit
Iterator<Cell> cellIt = cells.iterator();
while (cellIt.hasNext()) {
Put put = new Put(kv.getKey().get());
for (int i = 0; i < MAX_CELLS && cellIt.hasNext(); i++) {
put.add(cellIt.next());
}
context.output(put);
}
}
在哪里可以更新 Hbase 密钥? HBase Put 对象需要与 context.element() 具有相同的键,否则转换失败:
InvalidArgumentException: Didn't receive a result for this mutation entry
似乎 PCollection -> ProcessContext -> HBase Put -> HBase Cell 在导入期间都需要维护相同的密钥 ...因此我无法在已创建 PCollection。
我如何着手创建一个新的 PCollection 来保存相同的 Hbase 数据但不同的 HBase 密钥?
如果我没记错的话,您不能只使用新键将单元格复制到 Put
中,因为该单元格已经包含旧行键。
来自 Cell
Javadoc:
唯一性由行、列族、列限定符、时间戳和类型的组合决定。
您需要使用新密钥创建一个新的 Cell
,将 CF 名称、限定符名称、时间戳、值等复制到其中,并将此新单元格添加到新的 Put
对象中。
PS:顺便说一句,Beam 中有一个专用的 HbaseIO
,比 HadopFormatIO
.