创建从 Google Pub/Sub 读取的 Apache Beam 管道
Create Apache Beam Pipeline that read from Google Pub/Sub
我正在尝试使用 apache-beam 创建一个流管道,它从 google pub/sub 中读取句子并将这些单词写入 Bigquery Table.
我正在使用 0.6.0
apache-beam 版本。
根据示例,我做了这个:
public class StreamingWordExtract {
/**
* A DoFn that tokenizes lines of text into individual words.
*/
static class ExtractWords extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
String[] words = ((String) c.element()).split("[^a-zA-Z']+");
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
/**
* A DoFn that uppercases a word.
*/
static class Uppercase extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
}
}
/**
* A DoFn that uppercases a word.
*/
static class StringToRowConverter extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TableRow().set("string_field", c.element()));
}
static TableSchema getSchema() {
return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
// Compose the list of TableFieldSchema from tableSchema.
{
add(new TableFieldSchema().setName("string_field").setType("STRING"));
}
});
}
}
private interface StreamingWordExtractOptions extends ExampleBigQueryTableOptions, ExamplePubsubTopicOptions {
@Description("Input file to inject to Pub/Sub topic")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
}
public static void main(String[] args) {
StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StreamingWordExtractOptions.class);
options.setBigQuerySchema(StringToRowConverter.getSchema());
Pipeline p = Pipeline.create(options);
String tableSpec = new StringBuilder()
.append(options.getProject()).append(":")
.append(options.getBigQueryDataset()).append(".")
.append(options.getBigQueryTable())
.toString();
p.apply(PubsubIO.read().topic(options.getPubsubTopic()))
.apply(ParDo.of(new ExtractWords()))
.apply(ParDo.of(new StringToRowConverter()))
.apply(BigQueryIO.Write.to(tableSpec)
.withSchema(StringToRowConverter.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
PipelineResult result = p.run();
}
我附近有错误:
apply(ParDo.of(new ExtractWords()))
因为前面的apply
不是return一个String
而是一个Object
我想问题出在 return 从 PubsubIO.read().topic(options.getPubsubTopic())
编辑的类型上。类型是 PTransform<PBegin, PCollection<T>>
而不是 PTransform<PBegin, PCollection<String>>
使用 apache-beam 从 google pub/sub 中读取的正确方法是什么?
您最近在 Beam 中遇到了向后不兼容的更改 -- 对此深表歉意!
从 Apache Beam 版本 0.5.0 开始,PubsubIO.Read
和 PubsubIO.Write
需要使用 PubsubIO.<T>read()
和 PubsubIO.<T>write()
实例化,而不是静态工厂方法,例如 PubsubIO.Read.topic(String)
.
Read
需要通过 .withCoder(Coder)
为输出类型指定编码器。 Write
.
需要为输入类型指定编码器,或通过 .withAttributes(SimpleFunction<T, PubsubMessage>)
指定格式函数
我正在尝试使用 apache-beam 创建一个流管道,它从 google pub/sub 中读取句子并将这些单词写入 Bigquery Table.
我正在使用 0.6.0
apache-beam 版本。
根据示例,我做了这个:
public class StreamingWordExtract {
/**
* A DoFn that tokenizes lines of text into individual words.
*/
static class ExtractWords extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
String[] words = ((String) c.element()).split("[^a-zA-Z']+");
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
/**
* A DoFn that uppercases a word.
*/
static class Uppercase extends DoFn<String, String> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
}
}
/**
* A DoFn that uppercases a word.
*/
static class StringToRowConverter extends DoFn<String, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TableRow().set("string_field", c.element()));
}
static TableSchema getSchema() {
return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
// Compose the list of TableFieldSchema from tableSchema.
{
add(new TableFieldSchema().setName("string_field").setType("STRING"));
}
});
}
}
private interface StreamingWordExtractOptions extends ExampleBigQueryTableOptions, ExamplePubsubTopicOptions {
@Description("Input file to inject to Pub/Sub topic")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
}
public static void main(String[] args) {
StreamingWordExtractOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StreamingWordExtractOptions.class);
options.setBigQuerySchema(StringToRowConverter.getSchema());
Pipeline p = Pipeline.create(options);
String tableSpec = new StringBuilder()
.append(options.getProject()).append(":")
.append(options.getBigQueryDataset()).append(".")
.append(options.getBigQueryTable())
.toString();
p.apply(PubsubIO.read().topic(options.getPubsubTopic()))
.apply(ParDo.of(new ExtractWords()))
.apply(ParDo.of(new StringToRowConverter()))
.apply(BigQueryIO.Write.to(tableSpec)
.withSchema(StringToRowConverter.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
PipelineResult result = p.run();
}
我附近有错误:
apply(ParDo.of(new ExtractWords()))
因为前面的apply
不是return一个String
而是一个Object
我想问题出在 return 从 PubsubIO.read().topic(options.getPubsubTopic())
编辑的类型上。类型是 PTransform<PBegin, PCollection<T>>
而不是 PTransform<PBegin, PCollection<String>>
使用 apache-beam 从 google pub/sub 中读取的正确方法是什么?
您最近在 Beam 中遇到了向后不兼容的更改 -- 对此深表歉意!
从 Apache Beam 版本 0.5.0 开始,PubsubIO.Read
和 PubsubIO.Write
需要使用 PubsubIO.<T>read()
和 PubsubIO.<T>write()
实例化,而不是静态工厂方法,例如 PubsubIO.Read.topic(String)
.
Read
需要通过 .withCoder(Coder)
为输出类型指定编码器。 Write
.
.withAttributes(SimpleFunction<T, PubsubMessage>)
指定格式函数