Google 将记录写入 Google 数据存储时数据流模板作业不缩放

Google Dataflow template job not scaling when writing records to Google datastore

我有一个使用数据流模板从云函数触发的小型数据流作业。该作业基本上从 Bigquery 中的 table 读取数据,将结果表行转换为键值,并将键值写入数据存储区。

这是我的代码的样子:-

PCollection<TableRow> bigqueryResult = p.apply("BigQueryRead",
                BigQueryIO.readTableRows().withTemplateCompatibility()
                        .fromQuery(options.getQuery()).usingStandardSql()
                        .withoutValidation());

bigqueryResult.apply("WriteFromBigqueryToDatastore", ParDo.of(new DoFn<TableRow, String>() {                
            @ProcessElement
            public void processElement(ProcessContext pc) {
                TableRow row = pc.element();

                Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
                KeyFactory keyFactoryCounts = datastore.newKeyFactory().setNamespace("MyNamespace")
                        .setKind("MyKind");

                Key key = keyFactoryCounts.newKey("Key");
                Builder builder =   Entity.newBuilder(key);
                builder.set("Key", BooleanValue.newBuilder("Value").setExcludeFromIndexes(true).build());   

                Entity entity= builder.build();
                datastore.put(entity);
            }
        }));

当我尝试处理的记录数在 1 到 100 范围内时,此管道运行良好。但是,当我尝试在管道上施加更多负载时,即约 10000 条记录,管道不会scale(尽管自动缩放​​设置为基于 THROUGHPUT,并且 maximumWorkers 被指定为高达 50,使用 n1-standard-1 机器类型)。该作业使用一两个工人每秒处理 3 或 4 个元素。这会影响我的系统性能。

非常欢迎任何关于如何提高性能的建议。 提前致谢。

至少使用 python 的 ndb 客户端库,可以在单个 .put_multi() 数据存储调用中一次写入多达 500 个实体 - 比调用快得多.put() 一次针对一个实体(调用在底层 RPC 上阻塞)

我不是 java 用户,但似乎也可以使用类似的技术。来自 Using batch operations:

You can use the batch operations if you want to operate on multiple entities in a single Cloud Datastore call.

Here is an example of a batch call:

Entity employee1 = new Entity("Employee");
Entity employee2 = new Entity("Employee");
Entity employee3 = new Entity("Employee");
// ...

List<Entity> employees = Arrays.asList(employee1, employee2, employee3);
datastore.put(employees);

找到了使用 DatastoreIO 而不是数据存储客户端的解决方案。 以下是我使用的片段,

    PCollection<TableRow> row = p.apply("BigQueryRead",
                BigQueryIO.readTableRows().withTemplateCompatibility()
                        .fromQuery(options.getQueryForSegmentedUsers()).usingStandardSql()
                        .withoutValidation());          

    PCollection<com.google.datastore.v1.Entity> userEntity = row.apply("ConvertTablerowToEntity", ParDo.of(new DoFn<TableRow, com.google.datastore.v1.Entity>() {

        @SuppressWarnings("deprecation")
        @ProcessElement
        public void processElement(ProcessContext pc) {
            final String namespace = "MyNamespace";
            final String kind = "MyKind";

            com.google.datastore.v1.Key.Builder keyBuilder = DatastoreHelper.makeKey(kind, "root");
            if (namespace != null) {
              keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
            }
            final com.google.datastore.v1.Key ancestorKey = keyBuilder.build();

            TableRow row = pc.element();
            String entityProperty = "sample";

            String key = "key";

            com.google.datastore.v1.Entity.Builder entityBuilder = com.google.datastore.v1.Entity.newBuilder();
            com.google.datastore.v1.Key.Builder keyBuilder1 = DatastoreHelper.makeKey(ancestorKey, kind, key);
            if (namespace != null) {
                keyBuilder1.getPartitionIdBuilder().setNamespaceId(namespace);
              }

              entityBuilder.setKey(keyBuilder1.build());
              entityBuilder.getMutableProperties().put(entityProperty, DatastoreHelper.makeValue("sampleValue").build());
              pc.output(entityBuilder.build());             
        }

    }));

    userEntity.apply("WriteToDatastore", DatastoreIO.v1().write().withProjectId(options.getProject()));

此解决方案能够从 1 个工作人员每秒 3 个元素扩展到 20 个工作人员每秒约 1500 个元素。