对来自 CSV 的数据进行分区,这样我就可以处理大块而不是单独的行
Partition data coming from CSV so I can process larger patches rather then individual lines
我刚刚开始使用 Google 数据流,我编写了一个从云存储读取 CSV 文件的简单流程。其中一个步骤涉及调用 Web 服务来丰富结果。当批量发送 100 个请求时,相关 Web 服务的性能要好得多。
在查看 API 时,我没有看到将 PCollection 的 100 个元素聚合到单个 Par.do 执行中的好方法。然后需要拆分结果以处理流程的最后一步,即写入 BigQuery table.
不确定我是否需要使用窗口化是我想要的。我看到的大多数开窗示例更适合在给定时间段内进行计数。
您可以在 DoFn 的局部成员变量中缓冲元素,并在缓冲区足够大时调用您的 Web 服务,以及在 finishBundle 中。例如:
class CallServiceFn extends DoFn<String, String> {
private List<String> elements = new ArrayList<>();
public void processElement(ProcessContext c) {
elements.add(c.element());
if (elements.size() >= MAX_CALL_SIZE) {
for (String result : callServiceWithData(elements)) {
c.output(result);
}
elements.clear();
}
}
public void finishBundle(Context c) {
for (String result : callServiceWithData(elements)) {
c.output(result);
}
}
}
请注意,添加了 GroupIntoBatches 转换以简化此操作。
我刚刚开始使用 Google 数据流,我编写了一个从云存储读取 CSV 文件的简单流程。其中一个步骤涉及调用 Web 服务来丰富结果。当批量发送 100 个请求时,相关 Web 服务的性能要好得多。
在查看 API 时,我没有看到将 PCollection 的 100 个元素聚合到单个 Par.do 执行中的好方法。然后需要拆分结果以处理流程的最后一步,即写入 BigQuery table.
不确定我是否需要使用窗口化是我想要的。我看到的大多数开窗示例更适合在给定时间段内进行计数。
您可以在 DoFn 的局部成员变量中缓冲元素,并在缓冲区足够大时调用您的 Web 服务,以及在 finishBundle 中。例如:
class CallServiceFn extends DoFn<String, String> {
private List<String> elements = new ArrayList<>();
public void processElement(ProcessContext c) {
elements.add(c.element());
if (elements.size() >= MAX_CALL_SIZE) {
for (String result : callServiceWithData(elements)) {
c.output(result);
}
elements.clear();
}
}
public void finishBundle(Context c) {
for (String result : callServiceWithData(elements)) {
c.output(result);
}
}
}
请注意,添加了 GroupIntoBatches 转换以简化此操作。