使用 Dataflow 的 Bigtable bulkload 太慢
Bigtable bulkload using Dataflow is too slow
每 3 小时将 20GB 数据文件等模式批量加载到 Bigtable 的最佳方法是什么? Dataflow 是正确的方法吗?
我们使用 Dataflow 批量加载 Bigtable 的问题是..
看起来 Dataflow QPS 与 Bigtable(5 个节点)的 QPS 不匹配。我正在尝试使用 Dataflow 将 20GB 文件加载到 bigtable。摄取到 bigtable 需要 4 小时。此外,我在 运行..
期间不断收到此警告
{
"code" : 429,
"errors" : [ {
"domain" : "global",
"message" : "Request throttled due to project QPS limit being reached.",
"reason" : "rateLimitExceeded"
} ],
"message" : "Request throttled due to project QPS limit being reached.",
"status" : "RESOURCE_EXHAUSTED"
}.
代码:
// CloudBigtableOptions is one way to retrieve the options. It's not
// required.
CloudBigtableOptions options = PipelineOptionsFactory.fromArgs(btargs.toArray(new String[btargs.size()]))
.withValidation().as(CloudBigtableOptions.class);
// CloudBigtableTableConfiguration contains the project, zone, cluster
// and table to connect to.
CloudBigtableTableConfiguration config = CloudBigtableTableConfiguration.fromCBTOptions(options);
Pipeline p = Pipeline.create(options);
// This sets up serialization for Puts and Deletes so that Dataflow can
// potentially move them through the network.
CloudBigtableIO.initializeForWrite(p);
p.apply(TextIO.Read.from(inpath)).apply(ParDo.of(new CreatePutsFn(columns, delim)))
.apply(CloudBigtableIO.writeToTable(config));
p.run();
CreatePutsFn:
@Override
public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
String[] vals = c.element().split(this.delim);
for (int i = 0; i < columns.length; i++) {
if (i != keyPos && vals[i].trim() != "") {
c.output(new Put(vals[keyPos].getBytes()).addColumn(FAMILY, Bytes.toBytes(columns[i].toLowerCase()),
Bytes.toBytes(vals[i])));
}
}
}
非常感谢这里的任何帮助。谢谢
我可以解决这个问题。我做了以下三件事来达到预期的结果。现在,此作业运行并在大约 15 分钟内为一个 (20 Gb) 文件摄取数据。之前 运行 需要 4-5 小时。
- 此作业使用数据流在 3 分钟内创建了 20 亿个放置请求,现在通过对一行的所有列进行批处理减少到 4000 万个请求。
public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
String[] vals = c.element().split(this.delim);
Put put = new Put(vals[keyPos].getBytes());
for (int i = 0; i < columns.length; i++) {
if (i != keyPos && vals[i].trim() != "") {
put.addColumn(FAMILY, Bytes.toBytes(columns[i].toLowerCase()), Bytes.toBytes(vals[i]));
}
}
c.output(put);
}
我为客户端写入缓冲区添加了 属性
config.toHBaseConfig().set("hbase.client.write.buffer", "200971520”);
您关于 bigtable 达到 QPS 限制的说法是正确的。因此,在批量加载操作期间,我暂时将集群大小增加到 10 个节点(从 3 个)。
每 3 小时将 20GB 数据文件等模式批量加载到 Bigtable 的最佳方法是什么? Dataflow 是正确的方法吗?
我们使用 Dataflow 批量加载 Bigtable 的问题是..
看起来 Dataflow QPS 与 Bigtable(5 个节点)的 QPS 不匹配。我正在尝试使用 Dataflow 将 20GB 文件加载到 bigtable。摄取到 bigtable 需要 4 小时。此外,我在 运行..
期间不断收到此警告{
"code" : 429,
"errors" : [ {
"domain" : "global",
"message" : "Request throttled due to project QPS limit being reached.",
"reason" : "rateLimitExceeded"
} ],
"message" : "Request throttled due to project QPS limit being reached.",
"status" : "RESOURCE_EXHAUSTED"
}.
代码:
// CloudBigtableOptions is one way to retrieve the options. It's not
// required.
CloudBigtableOptions options = PipelineOptionsFactory.fromArgs(btargs.toArray(new String[btargs.size()]))
.withValidation().as(CloudBigtableOptions.class);
// CloudBigtableTableConfiguration contains the project, zone, cluster
// and table to connect to.
CloudBigtableTableConfiguration config = CloudBigtableTableConfiguration.fromCBTOptions(options);
Pipeline p = Pipeline.create(options);
// This sets up serialization for Puts and Deletes so that Dataflow can
// potentially move them through the network.
CloudBigtableIO.initializeForWrite(p);
p.apply(TextIO.Read.from(inpath)).apply(ParDo.of(new CreatePutsFn(columns, delim)))
.apply(CloudBigtableIO.writeToTable(config));
p.run();
CreatePutsFn:
@Override
public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
String[] vals = c.element().split(this.delim);
for (int i = 0; i < columns.length; i++) {
if (i != keyPos && vals[i].trim() != "") {
c.output(new Put(vals[keyPos].getBytes()).addColumn(FAMILY, Bytes.toBytes(columns[i].toLowerCase()),
Bytes.toBytes(vals[i])));
}
}
}
非常感谢这里的任何帮助。谢谢
我可以解决这个问题。我做了以下三件事来达到预期的结果。现在,此作业运行并在大约 15 分钟内为一个 (20 Gb) 文件摄取数据。之前 运行 需要 4-5 小时。
- 此作业使用数据流在 3 分钟内创建了 20 亿个放置请求,现在通过对一行的所有列进行批处理减少到 4000 万个请求。
public void processElement(DoFn<String, Mutation>.ProcessContext c) throws Exception {
String[] vals = c.element().split(this.delim);
Put put = new Put(vals[keyPos].getBytes());
for (int i = 0; i < columns.length; i++) {
if (i != keyPos && vals[i].trim() != "") {
put.addColumn(FAMILY, Bytes.toBytes(columns[i].toLowerCase()), Bytes.toBytes(vals[i]));
}
}
c.output(put);
}
我为客户端写入缓冲区添加了 属性
config.toHBaseConfig().set("hbase.client.write.buffer", "200971520”);
您关于 bigtable 达到 QPS 限制的说法是正确的。因此,在批量加载操作期间,我暂时将集群大小增加到 10 个节点(从 3 个)。