使用 Scio 将 SCollection 从 textFile 放入 BigQuery
Put SCollection from textFile to BigQuery with Scio
我用 textFile
阅读了一些文档,并对单个单词做了 flatMap
,为每个单词添加了一些额外信息:
val col = sc.textFile(args.getOrElse("input","documents/*"))
.flatMap(_.split("\s+").filter(_.nonEmpty))
val mapped = col.map(t => t + ": " + extraInformation())
我目前正在轻松地将其保存为文本
mapped.saveAsTextFile(args.getOrElse("output", "results"))
但我不知道如何将地图保存到 BigQuery 架构。我见过的所有示例都从 BigQuery 创建了初始 Scollection,然后将其保存到另一个 table,因此初始集合是 [TableRow]
而不是 [String]
。
这里正确的做法是什么?我是否应该研究如何将我的数据转换为 Big Query 可接受的一种集合?或者我应该尝试进一步研究如何将这个纯文本直接推入 table?
为了写入 BigQuery,您需要定义一个 TableSchema:
public static final TableSchema BQ_TABLE_SCHEMA = new TableSchema();
public static final List<TableFieldSchema> BQ_FIELDS;
static {
TableFieldSchema string_field = new TableFieldSchema()
.setName("string_field")
.setType(FieldType.STRING.toString())
.setMode(FieldMode.NULLABLE.toString());
BQ_FIELDS = Lists.newArrayList(
string_field
);
BQ_TABLE_SCHEMA.setFields(BQ_FIELDS);
}
然后您需要将 String 转换为 TableRow 对象:
.apply("ConvertToTableRow", ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TableRow().set("string_field", c.element()));
}
}))
.apply("InsertTableRowsToBigQuery",
BigQueryIO.writeTableRows().to("project_id:dataset_name.table_name")
.withSchema(BQ_TABLE_SCHEMA)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND))
.getFailedInserts();
你也可以看看Java中的这个例子,它与Scio中需要做的非常相似:https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java#L78
我建议在案例 class 上使用 @BigQueryType.toTable
注释,如下所示:
import com.spotify.scio.bigquery._
object MyScioJob {
@BigQueryType.toTable
case class WordAnnotated(word: String, extraInformation: String)
def main(args: Array[String]): Unit = {
// ...job setup logic
sc.textFile(args.getOrElse("input","documents/*"))
.flatMap(_.split("\s+").filter(_.nonEmpty))
.map(t => WordAnnotated(t, extraInformation())
.saveAsTypedBigQuery("myProject:myDataset.myTable")
}
}
Scio wiki 上有更多关于此的信息。
我用 textFile
阅读了一些文档,并对单个单词做了 flatMap
,为每个单词添加了一些额外信息:
val col = sc.textFile(args.getOrElse("input","documents/*"))
.flatMap(_.split("\s+").filter(_.nonEmpty))
val mapped = col.map(t => t + ": " + extraInformation())
我目前正在轻松地将其保存为文本
mapped.saveAsTextFile(args.getOrElse("output", "results"))
但我不知道如何将地图保存到 BigQuery 架构。我见过的所有示例都从 BigQuery 创建了初始 Scollection,然后将其保存到另一个 table,因此初始集合是 [TableRow]
而不是 [String]
。
这里正确的做法是什么?我是否应该研究如何将我的数据转换为 Big Query 可接受的一种集合?或者我应该尝试进一步研究如何将这个纯文本直接推入 table?
为了写入 BigQuery,您需要定义一个 TableSchema:
public static final TableSchema BQ_TABLE_SCHEMA = new TableSchema();
public static final List<TableFieldSchema> BQ_FIELDS;
static {
TableFieldSchema string_field = new TableFieldSchema()
.setName("string_field")
.setType(FieldType.STRING.toString())
.setMode(FieldMode.NULLABLE.toString());
BQ_FIELDS = Lists.newArrayList(
string_field
);
BQ_TABLE_SCHEMA.setFields(BQ_FIELDS);
}
然后您需要将 String 转换为 TableRow 对象:
.apply("ConvertToTableRow", ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new TableRow().set("string_field", c.element()));
}
}))
.apply("InsertTableRowsToBigQuery",
BigQueryIO.writeTableRows().to("project_id:dataset_name.table_name")
.withSchema(BQ_TABLE_SCHEMA)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND))
.getFailedInserts();
你也可以看看Java中的这个例子,它与Scio中需要做的非常相似:https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java#L78
我建议在案例 class 上使用 @BigQueryType.toTable
注释,如下所示:
import com.spotify.scio.bigquery._
object MyScioJob {
@BigQueryType.toTable
case class WordAnnotated(word: String, extraInformation: String)
def main(args: Array[String]): Unit = {
// ...job setup logic
sc.textFile(args.getOrElse("input","documents/*"))
.flatMap(_.split("\s+").filter(_.nonEmpty))
.map(t => WordAnnotated(t, extraInformation())
.saveAsTypedBigQuery("myProject:myDataset.myTable")
}
}
Scio wiki 上有更多关于此的信息。