使用 Dataflow 基于元素值写入 Google Cloud Storage
Element value based writing to Google Cloud Storage using Dataflow
我正在尝试构建一个数据流进程,通过将数据存储到 Google 云存储来帮助归档数据。我有一个事件数据的 PubSub 流,其中包含 client_id 和一些元数据。此过程应存档所有传入事件,因此这需要是一个流式管道。
我希望能够通过将我收到的每个事件放入一个看起来像 gs://archive/client_id/eventdata.json 的桶中来处理事件归档。是否可以在 dataflow/apache beam 内完成,特别是能够为 PCollection 中的每个事件分配不同的文件名?
编辑:
所以我的代码目前看起来像:
public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
private String customerId;
public PerWindowFiles(String customerId) {
this.customerId = customerId;
}
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
String filename = bucket+"/"+customerId;
return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
public static void main(String[] args) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<Event> set = p.apply(PubsubIO.readStrings()
.fromTopic("topic"))
.apply(new ConvertToEvent()));
PCollection<KV<String, Event>> events = labelEvents(set);
PCollection<KV<String, EventGroup>> sessions = groupEvents(events);
String customers = System.getProperty("CUSTOMERS");
JSONArray custList = new JSONArray(customers);
for (Object cust : custList) {
if (cust instanceof String) {
String customerId = (String) cust;
PCollection<KV<String, EventGroup>> custCol = sessions.apply(new FilterByCustomer(customerId));
stringifyEvents(custCol)
.apply(TextIO.write()
.to("gs://archive/")
.withFilenamePolicy(new PerWindowFiles(customerId))
.withWindowedWrites()
.withNumShards(3));
} else {
LOG.info("Failed to create TextIO: customerId was not String");
}
}
p.run()
.waitUntilFinish();
}
这段代码很难看,因为每次出现新客户端时我都需要重新部署,以便能够保存他们的数据。我希望能够动态地将客户数据分配给适当的存储桶。
"Dynamic destinations" - 根据写入的元素选择文件名 - 将是 Beam 2.1.0 中可用的新功能,尚未发布。
我正在尝试构建一个数据流进程,通过将数据存储到 Google 云存储来帮助归档数据。我有一个事件数据的 PubSub 流,其中包含 client_id 和一些元数据。此过程应存档所有传入事件,因此这需要是一个流式管道。
我希望能够通过将我收到的每个事件放入一个看起来像 gs://archive/client_id/eventdata.json 的桶中来处理事件归档。是否可以在 dataflow/apache beam 内完成,特别是能够为 PCollection 中的每个事件分配不同的文件名?
编辑: 所以我的代码目前看起来像:
public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
private String customerId;
public PerWindowFiles(String customerId) {
this.customerId = customerId;
}
@Override
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
String filename = bucket+"/"+customerId;
return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory, Context context, String extension) {
throw new UnsupportedOperationException("Unsupported.");
}
}
public static void main(String[] args) throws IOException {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
PCollection<Event> set = p.apply(PubsubIO.readStrings()
.fromTopic("topic"))
.apply(new ConvertToEvent()));
PCollection<KV<String, Event>> events = labelEvents(set);
PCollection<KV<String, EventGroup>> sessions = groupEvents(events);
String customers = System.getProperty("CUSTOMERS");
JSONArray custList = new JSONArray(customers);
for (Object cust : custList) {
if (cust instanceof String) {
String customerId = (String) cust;
PCollection<KV<String, EventGroup>> custCol = sessions.apply(new FilterByCustomer(customerId));
stringifyEvents(custCol)
.apply(TextIO.write()
.to("gs://archive/")
.withFilenamePolicy(new PerWindowFiles(customerId))
.withWindowedWrites()
.withNumShards(3));
} else {
LOG.info("Failed to create TextIO: customerId was not String");
}
}
p.run()
.waitUntilFinish();
}
这段代码很难看,因为每次出现新客户端时我都需要重新部署,以便能够保存他们的数据。我希望能够动态地将客户数据分配给适当的存储桶。
"Dynamic destinations" - 根据写入的元素选择文件名 - 将是 Beam 2.1.0 中可用的新功能,尚未发布。