在使用 Beam 导入 HBase 期间,有没有办法 modify/create PCollection 中的新密钥?

Is there a way to modify/create new Keys in a PCollection during HBase Import using Beam?

我想在 Apache Beam 导入作业期间将 prefix/suffix 添加到 HBase 密钥。下面是创建保存 HBase 数据的 PCollection 的代码:

    Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
    HBaseSnapshotInputConfigBuilder configurationBuilder =
        new HBaseSnapshotInputConfigBuilder()
            .setProjectId(opts.getProject())
            .setHbaseSnapshotSourceDir(opts.getHbaseSnapshotSourceDir())
            .setSnapshotName(opts.getSnapshotName())
            .setRestoreDirSuffix(opts.getJobName());

    PCollection<KV<ImmutableBytesWritable, Result>> readResult =
        pipeline.apply(
            "Read from HBase Snapshot",
            HadoopFormatIO.<ImmutableBytesWritable, Result>read()
                .withConfiguration(configurationBuilder.build()));

    // Somehow create a new PCollection with the transformed Key - derived from the PCollection above.
    PCollection<Mutation> copyResult = readResult.apply("Prefixes", ParDo.of(new TransformFn()));

在 PTransform 期间,我收到这样的数据:

  public void processElement(ProcessContext context) throws Exception {
    KV<ImmutableBytesWritable, Result> kv = context.element();
    List<Cell> cells = checkEmptyRow(kv);
    if (cells.isEmpty()) {
      return;
    }

    // Preprocess delete markers
    if (hasDeleteMarkers(cells)) {
      cells = preprocessDeleteMarkers(cells);
    }

    // Split the row into multiple puts if it exceeds the maximum mutation limit
    Iterator<Cell> cellIt = cells.iterator();

    while (cellIt.hasNext()) {
      Put put = new Put(kv.getKey().get());

      for (int i = 0; i < MAX_CELLS && cellIt.hasNext(); i++) {
        put.add(cellIt.next());
      }

      context.output(put);
    }
  }

在哪里可以更新 Hbase 密钥? HBase Put 对象需要与 context.element() 具有相同的键,否则转换失败:

InvalidArgumentException: Didn't receive a result for this mutation entry

似乎 PCollection -> ProcessContext -> HBase Put -> HBase Cell 在导入期间都需要维护相同的密钥 ...因此我无法在已创建 PCollection。

我如何着手创建一个新的 PCollection 来保存相同的 Hbase 数据但不同的 HBase 密钥?

如果我没记错的话,您不能只使用新键将单元格复制到 Put 中,因为该单元格已经包含旧行键。

来自 Cell Javadoc:

唯一性由行、列族、列限定符、时间戳和类型的组合决定。

您需要使用新密钥创建一个新的 Cell,将 CF 名称、限定符名称、时间戳、值等复制到其中,并将此新单元格添加到新的 Put 对象中。

PS:顺便说一句,Beam 中有一个专用的 HbaseIO,比 HadopFormatIO.

更容易使用