将通过 PubSub 收到的每一行写入 Cloud Storage 上它自己的文件
Write each row received over PubSub to its own file on Cloud Storage
我正在通过 pubsub 接收消息。每条消息都应该作为粗数据存储在GCS中自己的文件中,对数据进行一些处理,然后将其保存到大查询-数据中有文件名。
数据收到后应该立即在BQ中看到。
示例:
data published to pubsub : {a:1, b:2}
data saved to GCS file UUID: A1F432
data processing : {a:1, b:2} ->
{a:11, b: 22} ->
{fileName: A1F432, data: {a:11, b: 22}}
data in BQ : {fileName: A1F432, data: {a:11, b: 22}}
想法是处理后的数据存储在 BQ 中,与存储在 GCS 中的粗略数据具有 link。
这是我的代码。
public class BotPipline {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(MY_PROJECT);
options.setStagingLocation(MY_STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.Read.subscription(MY_SUBSCRIBTION));
String uuid = ...;
input.apply(TextIO.Write.to(MY_STORAGE_LOCATION + uuid));
input
.apply(ParDo.of(new DoFn<String,String>(){..}).named("updateJsonAndInsertUUID"))
.apply(convertToTableRow(...)).named("convertJsonStringToTableRow"))
.apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
);
pipeline.run();
}
我的代码没有 运行,因为不支持在 TextIO.Write 中编写无限集合。
经过一些研究,我发现我有几个选项可以解决这个问题:
- 在数据流中创建自定义接收器
- 将写入 GCS 作为我自己的 DoFn
- 使用可选的 BoundedWindow
访问数据的 window
我不知道如何开始。
任何人都可以为我提供以下解决方案之一的代码,或者给我一个符合我的情况的不同解决方案。 (提供代码)
最好的选择是 #2 - 一个简单的 DoFn
,它根据您的数据创建文件。像这样:
class CreateFileFn extends DoFn<String, Void> {
@ProcessElement
public void process(ProcessContext c) throws IOException {
String filename = ...generate filename from element...;
try (WritableByteChannel channel = FileSystems.create(
FileSystems.matchNewResource(filename, false),
"application/text-plain")) {
OutputStream out = Channels.newOutputStream(channel);
...write the element to out...
}
}
}
我正在通过 pubsub 接收消息。每条消息都应该作为粗数据存储在GCS中自己的文件中,对数据进行一些处理,然后将其保存到大查询-数据中有文件名。
数据收到后应该立即在BQ中看到。
示例:
data published to pubsub : {a:1, b:2}
data saved to GCS file UUID: A1F432
data processing : {a:1, b:2} ->
{a:11, b: 22} ->
{fileName: A1F432, data: {a:11, b: 22}}
data in BQ : {fileName: A1F432, data: {a:11, b: 22}}
想法是处理后的数据存储在 BQ 中,与存储在 GCS 中的粗略数据具有 link。
这是我的代码。
public class BotPipline {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(MY_PROJECT);
options.setStagingLocation(MY_STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.Read.subscription(MY_SUBSCRIBTION));
String uuid = ...;
input.apply(TextIO.Write.to(MY_STORAGE_LOCATION + uuid));
input
.apply(ParDo.of(new DoFn<String,String>(){..}).named("updateJsonAndInsertUUID"))
.apply(convertToTableRow(...)).named("convertJsonStringToTableRow"))
.apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
);
pipeline.run();
}
我的代码没有 运行,因为不支持在 TextIO.Write 中编写无限集合。 经过一些研究,我发现我有几个选项可以解决这个问题:
- 在数据流中创建自定义接收器
- 将写入 GCS 作为我自己的 DoFn
- 使用可选的 BoundedWindow 访问数据的 window
我不知道如何开始。 任何人都可以为我提供以下解决方案之一的代码,或者给我一个符合我的情况的不同解决方案。 (提供代码)
最好的选择是 #2 - 一个简单的 DoFn
,它根据您的数据创建文件。像这样:
class CreateFileFn extends DoFn<String, Void> {
@ProcessElement
public void process(ProcessContext c) throws IOException {
String filename = ...generate filename from element...;
try (WritableByteChannel channel = FileSystems.create(
FileSystems.matchNewResource(filename, false),
"application/text-plain")) {
OutputStream out = Channels.newOutputStream(channel);
...write the element to out...
}
}
}