将 HBase Scan 转换为 RowFilter

Transform HBase Scan to RowFilter

我正在使用来自 spotify 的 scio 来处理我的 Dataflow 作业。 在上一个 scio 版本中,使用了新的 bigtable java api (com.google.bigtable.v2)

现在 scio bigtable entry point required "RowFilter" to filter instead of Hbase "Scan". Is there a simple way to transform "Scan" to "RowFilter" ? I looked for adapters 在源代码中,但我不确定如何使用它。 我找不到可以轻松从 hbase api 迁移到 "new" api.

的文档

我在需要转换的代码中使用的简单扫描:

val scan = new Scan()
scan.setRowPrefixFilter("helloworld".getBytes)
scan.addColumn("family".getBytes, "qualifier".getBytes)
scan.setMaxVersions()

理论上,您可以将 bigtable-hbase dependency 添加到项目中并调用 com.google.cloud.bigtable.hbase.adapters.Adapters.SCAN_ADAPTER.adapt(scan)Scan 转换为 RowFilter,或者更具体地说 [=13] =] 其中包含一个 [RowFilter][4]。 (链接指向那些包含变量和大量注释的对象的 protobuf 定义)。

也就是说,bigtable-hbase 依赖项添加了相当多的传递依赖项。我会在独立项目中使用 bigtable-hbase SCAN_ADAPTER,然后打印 RowFilter 以查看其构造方式。

在您提到的具体情况下,RowFilter 非常简单,但可能还有其他复杂情况。您的扫描分为三个部分,所以我将详细说明如何实现它们:

  1. scan.setRowPrefixFilter("helloworld".getBytes)。这转换为 BigtableIO 上的开始键和结束键。 "helloworld"为开始键,结束键为RowKeyUtil. calculateTheClosestNextRowKeyForPrefix。默认的 BigtableIO 不公开 set start key 和 set end key,因此 scio 版本必须更改才能使这些设置器 public.

  2. scan.addColumn("family".getBytes, "qualifier".getBytes) 转换为两个 RowFilter 添加到带有 ChainRowFilter(主要类似于 AND)。第一个 RowFilter 将设置 familyNameRegexFilter,第二个 RowFilter 将设置 columnNameRegexFilter

  3. scan.setMaxVersions() 转换为设置了 cellsPerColumnLimitFilterRowFilter。它需要添加到 #2 的链中。警告:如果您使用 timestampRangeFilterRowFilter 的值过滤器来限制列的范围,请确保将 cellsPerColumnLimitFilter 放在链的末尾。