Google 将记录写入 Google 数据存储时数据流模板作业不缩放
Google Dataflow template job not scaling when writing records to Google datastore
我有一个使用数据流模板从云函数触发的小型数据流作业。该作业基本上从 Bigquery 中的 table 读取数据,将结果表行转换为键值,并将键值写入数据存储区。
这是我的代码的样子:-
PCollection<TableRow> bigqueryResult = p.apply("BigQueryRead",
BigQueryIO.readTableRows().withTemplateCompatibility()
.fromQuery(options.getQuery()).usingStandardSql()
.withoutValidation());
bigqueryResult.apply("WriteFromBigqueryToDatastore", ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(ProcessContext pc) {
TableRow row = pc.element();
Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
KeyFactory keyFactoryCounts = datastore.newKeyFactory().setNamespace("MyNamespace")
.setKind("MyKind");
Key key = keyFactoryCounts.newKey("Key");
Builder builder = Entity.newBuilder(key);
builder.set("Key", BooleanValue.newBuilder("Value").setExcludeFromIndexes(true).build());
Entity entity= builder.build();
datastore.put(entity);
}
}));
当我尝试处理的记录数在 1 到 100 范围内时,此管道运行良好。但是,当我尝试在管道上施加更多负载时,即约 10000 条记录,管道不会scale(尽管自动缩放设置为基于 THROUGHPUT,并且 maximumWorkers 被指定为高达 50,使用 n1-standard-1 机器类型)。该作业使用一两个工人每秒处理 3 或 4 个元素。这会影响我的系统性能。
非常欢迎任何关于如何提高性能的建议。
提前致谢。
至少使用 python 的 ndb
客户端库,可以在单个 .put_multi()
数据存储调用中一次写入多达 500 个实体 - 比调用快得多.put()
一次针对一个实体(调用在底层 RPC 上阻塞)
我不是 java 用户,但似乎也可以使用类似的技术。来自 Using batch operations:
You can use the batch operations if you want to operate on multiple
entities in a single Cloud Datastore call.
Here is an example of a batch call:
Entity employee1 = new Entity("Employee");
Entity employee2 = new Entity("Employee");
Entity employee3 = new Entity("Employee");
// ...
List<Entity> employees = Arrays.asList(employee1, employee2, employee3);
datastore.put(employees);
找到了使用 DatastoreIO 而不是数据存储客户端的解决方案。
以下是我使用的片段,
PCollection<TableRow> row = p.apply("BigQueryRead",
BigQueryIO.readTableRows().withTemplateCompatibility()
.fromQuery(options.getQueryForSegmentedUsers()).usingStandardSql()
.withoutValidation());
PCollection<com.google.datastore.v1.Entity> userEntity = row.apply("ConvertTablerowToEntity", ParDo.of(new DoFn<TableRow, com.google.datastore.v1.Entity>() {
@SuppressWarnings("deprecation")
@ProcessElement
public void processElement(ProcessContext pc) {
final String namespace = "MyNamespace";
final String kind = "MyKind";
com.google.datastore.v1.Key.Builder keyBuilder = DatastoreHelper.makeKey(kind, "root");
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
final com.google.datastore.v1.Key ancestorKey = keyBuilder.build();
TableRow row = pc.element();
String entityProperty = "sample";
String key = "key";
com.google.datastore.v1.Entity.Builder entityBuilder = com.google.datastore.v1.Entity.newBuilder();
com.google.datastore.v1.Key.Builder keyBuilder1 = DatastoreHelper.makeKey(ancestorKey, kind, key);
if (namespace != null) {
keyBuilder1.getPartitionIdBuilder().setNamespaceId(namespace);
}
entityBuilder.setKey(keyBuilder1.build());
entityBuilder.getMutableProperties().put(entityProperty, DatastoreHelper.makeValue("sampleValue").build());
pc.output(entityBuilder.build());
}
}));
userEntity.apply("WriteToDatastore", DatastoreIO.v1().write().withProjectId(options.getProject()));
此解决方案能够从 1 个工作人员每秒 3 个元素扩展到 20 个工作人员每秒约 1500 个元素。
我有一个使用数据流模板从云函数触发的小型数据流作业。该作业基本上从 Bigquery 中的 table 读取数据,将结果表行转换为键值,并将键值写入数据存储区。
这是我的代码的样子:-
PCollection<TableRow> bigqueryResult = p.apply("BigQueryRead",
BigQueryIO.readTableRows().withTemplateCompatibility()
.fromQuery(options.getQuery()).usingStandardSql()
.withoutValidation());
bigqueryResult.apply("WriteFromBigqueryToDatastore", ParDo.of(new DoFn<TableRow, String>() {
@ProcessElement
public void processElement(ProcessContext pc) {
TableRow row = pc.element();
Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
KeyFactory keyFactoryCounts = datastore.newKeyFactory().setNamespace("MyNamespace")
.setKind("MyKind");
Key key = keyFactoryCounts.newKey("Key");
Builder builder = Entity.newBuilder(key);
builder.set("Key", BooleanValue.newBuilder("Value").setExcludeFromIndexes(true).build());
Entity entity= builder.build();
datastore.put(entity);
}
}));
当我尝试处理的记录数在 1 到 100 范围内时,此管道运行良好。但是,当我尝试在管道上施加更多负载时,即约 10000 条记录,管道不会scale(尽管自动缩放设置为基于 THROUGHPUT,并且 maximumWorkers 被指定为高达 50,使用 n1-standard-1 机器类型)。该作业使用一两个工人每秒处理 3 或 4 个元素。这会影响我的系统性能。
非常欢迎任何关于如何提高性能的建议。 提前致谢。
至少使用 python 的 ndb
客户端库,可以在单个 .put_multi()
数据存储调用中一次写入多达 500 个实体 - 比调用快得多.put()
一次针对一个实体(调用在底层 RPC 上阻塞)
我不是 java 用户,但似乎也可以使用类似的技术。来自 Using batch operations:
You can use the batch operations if you want to operate on multiple entities in a single Cloud Datastore call.
Here is an example of a batch call:
Entity employee1 = new Entity("Employee"); Entity employee2 = new Entity("Employee"); Entity employee3 = new Entity("Employee"); // ... List<Entity> employees = Arrays.asList(employee1, employee2, employee3); datastore.put(employees);
找到了使用 DatastoreIO 而不是数据存储客户端的解决方案。 以下是我使用的片段,
PCollection<TableRow> row = p.apply("BigQueryRead",
BigQueryIO.readTableRows().withTemplateCompatibility()
.fromQuery(options.getQueryForSegmentedUsers()).usingStandardSql()
.withoutValidation());
PCollection<com.google.datastore.v1.Entity> userEntity = row.apply("ConvertTablerowToEntity", ParDo.of(new DoFn<TableRow, com.google.datastore.v1.Entity>() {
@SuppressWarnings("deprecation")
@ProcessElement
public void processElement(ProcessContext pc) {
final String namespace = "MyNamespace";
final String kind = "MyKind";
com.google.datastore.v1.Key.Builder keyBuilder = DatastoreHelper.makeKey(kind, "root");
if (namespace != null) {
keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
final com.google.datastore.v1.Key ancestorKey = keyBuilder.build();
TableRow row = pc.element();
String entityProperty = "sample";
String key = "key";
com.google.datastore.v1.Entity.Builder entityBuilder = com.google.datastore.v1.Entity.newBuilder();
com.google.datastore.v1.Key.Builder keyBuilder1 = DatastoreHelper.makeKey(ancestorKey, kind, key);
if (namespace != null) {
keyBuilder1.getPartitionIdBuilder().setNamespaceId(namespace);
}
entityBuilder.setKey(keyBuilder1.build());
entityBuilder.getMutableProperties().put(entityProperty, DatastoreHelper.makeValue("sampleValue").build());
pc.output(entityBuilder.build());
}
}));
userEntity.apply("WriteToDatastore", DatastoreIO.v1().write().withProjectId(options.getProject()));
此解决方案能够从 1 个工作人员每秒 3 个元素扩展到 20 个工作人员每秒约 1500 个元素。