google 数据流管道中的数据存储输入是否可以一次处理一批 N 个条目?
Can datastore input in google dataflow pipeline be processed in a batch of N entries at a time?
我正在尝试执行一项数据流管道作业,该作业将一次对数据存储中的 N 个条目 执行一个函数。在我的例子中,此函数将 100 个条目作为负载发送到某个 REST 服务。这意味着我想遍历一个数据存储实体中的所有条目,并一次将 100 个批处理条目 发送到某个外部 REST 服务。
我目前的解决方案
- 从数据存储读取输入
- 创建与管道选项中指定的工作人员一样多的密钥(1 个工作人员 = 1 个密钥)。
- 按键分组,以便我们得到迭代器作为输出(第4步中的迭代器输入)
- 以编程方式对临时列表中的用户进行批处理,并将它们作为批处理发送到 REST 端点。
以上伪代码描述的场景(忽略细节):
final int BATCH_SIZE = 100;
// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))
// 2. create keys to be used in group by so we get iterator in next task
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key = generateKey(c);
EntryPOJO entry = processEntity(c);
c.output(KV.of(key, entry));
}
}))
// 3. Group by key
.apply(GroupByKey.create())
// 4. Programatically batch users
.apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
List<EntryPOJO> batchedEntries = new ArrayList<>();
for (EntryPOJO entry : c.element().getValue()) {
if (batchedEntries.size() >= BATCH_SIZE) {
sendToRESTEndpoint(batchedEntries);
batchedEntries = new ArrayList<>();
}
batchedEntries.add(entry);
}
sendToRESTEndpoint(batchedEntries);
}
}));
我当前解决方案的主要问题
GroupByKey 阻止最后一个 ParDo 的执行(阻止第 4 步),直到所有条目都分配给一个键。
解决方案通常有效,但我想并行执行所有操作(在从数据存储区加载后立即将 100 个条目的批次发送到 REST 端点),这不是我当前的解决方案可能,因为 GroupByKey 不会输出任何数据,直到从数据库中获取每个条目并将其插入到键值对中。所以执行实际上分为 2 个步骤:1. 从数据存储中获取所有数据并为其分配一个键,2. 将条目处理为 batch
问题
所以我想知道是否有一些现有功能可以做到这一点。或者至少在没有 GroupByKey 步骤的情况下获得 Iterable,这样批处理函数任务就不需要等待数据被转储。
您可以在 DoFn
中批量处理这些元素。例如:
final int BATCH_SIZE = 100;
pipeline
// 1. Read input from datastore
.apply(DatastoreIO.readFrom(datasetId, query))
// 2. Programatically batch users
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, Iterable<EntryPOJO>>() {
private final List<EntryPOJO> accumulator = new ArrayList<>(BATCH_SIZE);
@Override
public void processElement(ProcessContext c) throws Exception {
EntryPOJO entry = processEntity(c);
accumulator.add(c);
if (accumulator.size() >= BATCH_SIZE) {
c.output(accumulator);
accumulator = new ArrayList<>(BATCH_SIZE);
}
}
@Override
public void finishBundle(Context c) throws Exception {
if (accumulator.size() > 0) {
c.output(accumulator);
}
}
});
// 3. Consume those bundles
.apply(ParDo.of(new DoFn<Iterable<EntryPOJO>, Object>() {
@Override
public void processElement(ProcessContext c) throws Exception {
sendToRESTEndpoint(batchedEntries);
}
}));
如果您不想要单独的 "batching" 步骤,您也可以将步骤 2 和步骤 3 合并为一个 DoFn
。
我正在尝试执行一项数据流管道作业,该作业将一次对数据存储中的 N 个条目 执行一个函数。在我的例子中,此函数将 100 个条目作为负载发送到某个 REST 服务。这意味着我想遍历一个数据存储实体中的所有条目,并一次将 100 个批处理条目 发送到某个外部 REST 服务。
我目前的解决方案
- 从数据存储读取输入
- 创建与管道选项中指定的工作人员一样多的密钥(1 个工作人员 = 1 个密钥)。
- 按键分组,以便我们得到迭代器作为输出(第4步中的迭代器输入)
- 以编程方式对临时列表中的用户进行批处理,并将它们作为批处理发送到 REST 端点。
以上伪代码描述的场景(忽略细节):
final int BATCH_SIZE = 100;
// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))
// 2. create keys to be used in group by so we get iterator in next task
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key = generateKey(c);
EntryPOJO entry = processEntity(c);
c.output(KV.of(key, entry));
}
}))
// 3. Group by key
.apply(GroupByKey.create())
// 4. Programatically batch users
.apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
List<EntryPOJO> batchedEntries = new ArrayList<>();
for (EntryPOJO entry : c.element().getValue()) {
if (batchedEntries.size() >= BATCH_SIZE) {
sendToRESTEndpoint(batchedEntries);
batchedEntries = new ArrayList<>();
}
batchedEntries.add(entry);
}
sendToRESTEndpoint(batchedEntries);
}
}));
我当前解决方案的主要问题
GroupByKey 阻止最后一个 ParDo 的执行(阻止第 4 步),直到所有条目都分配给一个键。
解决方案通常有效,但我想并行执行所有操作(在从数据存储区加载后立即将 100 个条目的批次发送到 REST 端点),这不是我当前的解决方案可能,因为 GroupByKey 不会输出任何数据,直到从数据库中获取每个条目并将其插入到键值对中。所以执行实际上分为 2 个步骤:1. 从数据存储中获取所有数据并为其分配一个键,2. 将条目处理为 batch
问题
所以我想知道是否有一些现有功能可以做到这一点。或者至少在没有 GroupByKey 步骤的情况下获得 Iterable,这样批处理函数任务就不需要等待数据被转储。
您可以在 DoFn
中批量处理这些元素。例如:
final int BATCH_SIZE = 100;
pipeline
// 1. Read input from datastore
.apply(DatastoreIO.readFrom(datasetId, query))
// 2. Programatically batch users
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, Iterable<EntryPOJO>>() {
private final List<EntryPOJO> accumulator = new ArrayList<>(BATCH_SIZE);
@Override
public void processElement(ProcessContext c) throws Exception {
EntryPOJO entry = processEntity(c);
accumulator.add(c);
if (accumulator.size() >= BATCH_SIZE) {
c.output(accumulator);
accumulator = new ArrayList<>(BATCH_SIZE);
}
}
@Override
public void finishBundle(Context c) throws Exception {
if (accumulator.size() > 0) {
c.output(accumulator);
}
}
});
// 3. Consume those bundles
.apply(ParDo.of(new DoFn<Iterable<EntryPOJO>, Object>() {
@Override
public void processElement(ProcessContext c) throws Exception {
sendToRESTEndpoint(batchedEntries);
}
}));
如果您不想要单独的 "batching" 步骤,您也可以将步骤 2 和步骤 3 合并为一个 DoFn
。