Google 数据流 Apache Beam
Google DataFlow Apache Beam
我正在尝试使用 Apache Beam 创建数据流管道,但我无法按照文档进行操作,也找不到任何示例。
管道很简单。
- 创建管道
- 阅读 pub/sub 个主题
- 写信给扳手。
目前,我卡在了第 2 步。我找不到任何有关如何从 pub/sub 读取并使用它的示例。
这是我目前的代码,我想
class ExtractFlowInfoFn extends DoFn<PubsubMessage, KV<String, String>> {
public void processElement(ProcessContext c) {
KV.of("key", "value");
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
.apply("ConvertToKeyValuePair", ParDo.of(new ExtractFlowInfoFn()))
.apply("WriteToLog", ));
};
我能够通过遵循多个示例来想出代码。老实说,我不知道我在这里做什么。
请帮助我理解这一点,或者 link 我找到正确的文档。
从 Pub/Sub 中提取消息并写入 Cloud Spanner 的示例:
import com.google.cloud.spanner.Mutation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
class MessageToMutationDoFn extends DoFn<PubsubMessage, Mutation> {
@ProcessElement
public void processElement(ProcessContext c) {
// TODO: create Mutation object from PubsubMessage
Mutation mutation = Mutation.newInsertBuilder("users_backup2")
.set("column_1").to("value_1")
.set("column_2").to("value_2")
.set("column_3").to("value_3")
.build();
c.output(mutation);
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create();
p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
.apply("MessageToMutation", ParDo.of(new MessageToMutationDoFn()))
.apply("WriteToSpanner", SpannerIO.write()
.withProjectId("projectId")
.withInstanceId("spannerInstanceId")
.withDatabaseId("spannerDatabaseId"));
p.run();
}
我正在尝试使用 Apache Beam 创建数据流管道,但我无法按照文档进行操作,也找不到任何示例。
管道很简单。
- 创建管道
- 阅读 pub/sub 个主题
- 写信给扳手。
目前,我卡在了第 2 步。我找不到任何有关如何从 pub/sub 读取并使用它的示例。
这是我目前的代码,我想
class ExtractFlowInfoFn extends DoFn<PubsubMessage, KV<String, String>> {
public void processElement(ProcessContext c) {
KV.of("key", "value");
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
.apply("ConvertToKeyValuePair", ParDo.of(new ExtractFlowInfoFn()))
.apply("WriteToLog", ));
};
我能够通过遵循多个示例来想出代码。老实说,我不知道我在这里做什么。
请帮助我理解这一点,或者 link 我找到正确的文档。
从 Pub/Sub 中提取消息并写入 Cloud Spanner 的示例:
import com.google.cloud.spanner.Mutation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
class MessageToMutationDoFn extends DoFn<PubsubMessage, Mutation> {
@ProcessElement
public void processElement(ProcessContext c) {
// TODO: create Mutation object from PubsubMessage
Mutation mutation = Mutation.newInsertBuilder("users_backup2")
.set("column_1").to("value_1")
.set("column_2").to("value_2")
.set("column_3").to("value_3")
.build();
c.output(mutation);
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create();
p.apply("ReadFromPubSub", PubsubIO.readMessages().fromSubscription("test"))
.apply("MessageToMutation", ParDo.of(new MessageToMutationDoFn()))
.apply("WriteToSpanner", SpannerIO.write()
.withProjectId("projectId")
.withInstanceId("spannerInstanceId")
.withDatabaseId("spannerDatabaseId"));
p.run();
}