如何获取 Apache Beam SQL 查询的输出模式?

How do I get an output schema for an Apache Beam SQL query?

我一直在使用 Beam SQL DSL,如果不提供手动了解输出模式的编码器,我将无法使用查询的输出。我可以推断输出模式而不是对其进行硬编码吗?

walkthrough or the examples actually use the output from a query. I'm using Scio 而不是普通的 Java API 来保持代码的相对可读性和简洁性,我认为这对这个问题没有影响。

这是我的意思的一个例子。

给定一个输入模式 inSchema 和一些映射到 Row 的数据源,如下所示:(在此示例中,基于 Avro,但同样,我认为这并不重要):

sc.avroFile[Foo](args("input"))
   .map(fooToRow)
   .setCoder(inSchema.getRowCoder)
   .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
   .saveAsTextFile(args("output"))

运行 此管道导致 KryoException 如下:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
fieldIndices (org.apache.beam.sdk.schemas.Schema)
schema (org.apache.beam.sdk.values.RowWithStorage)
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

但是,插入一个 RowCoder 匹配 SQL 输出,在本例中是单个计数 int 列:

   ...snip...
   .applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
   .setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
   .saveAsTextFile(args("output"))

现在管道运行正常。

必须手动告诉管道如何编码 SQL 输出似乎没有必要,因为我们指定了输入 schema/coder(s) 和查询。在我看来,我们应该能够从中推断出输出模式 - 但我看不出如何,除了直接使用方解石?

在 Beam Jira 上提出问题之前,我想我会检查一下我是否遗漏了一些明显的东西!

输出架构推理 should work, your expectation is correct. This seems like a bug (either in Beam or Scio), filed BEAM-5335 以供调查。