如何在 Apache Beam Python 中正确定义和使用模式?

How to properly define and use a schema in Apache Beam Python?

以下代码尝试从源读取数据、定义架构并执行 SQLTransform。

...

class RowSchema(typing.NamedTuple):
    colA: str
    colB: typing.Optional[str]
    
beam.coders.registry.register_coder(RowSchema, beam.coders.RowCoder)

def run(argv=None):
  ...
  with beam.Pipeline(options=pipeline_options) as p:
    query = '''
    SELECT
      colA, colB
    FROM `{}`
    ''' \
      .format(
        known_args.table
      )

    pcol = (p 
    | 'read from BQ' >>
     beam.io.ReadFromBigQuery(
      gcs_location=known_args.execution_gcs_location,
      query=query,
      use_standard_sql=True,
      )
    | 'ToRow' >> beam.Map(
      lambda x: RowSchema(**x)).with_output_types(RowSchema)
    | SqlTransform(
        """
        ...
        """)
    | beam.Map(print)
    )

...

但是,它会导致以下错误:

  File "/home/lib/python3.9/site-packages/apache_beam/coders/coders.py", line 423, in encode
    return value.encode('utf-8')
AttributeError: 'int' object has no attribute 'encode' [while running 'ToRow']

使用相同的结构,以下管道可以正常工作:

pcol = (p
    | "Create" >> beam.Create(
        [{'colA': 'a1', 'colB': 'b1'}, {'colA': 'a2', 'colB': None}])
    | 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
    | SqlTransform(
        """
        ...
        """)
    | beam.Map(print)
    )

示例 1 中似乎缺少的是将输入数据转换为 beam.pvalue.Row 的实例,示例 2 中是这种情况,但示例 1 中不是。

如何将输入转换为 Row 对象以用于静态模式,假设这确实是问题所在?

所使用的结构基于以下参考资料:1 2

bigquery module 也有内置架构,但仅适用于 BigQuery 写入。

我还检查了示例,包括 this one,它使用不适用于此用例的动态模式。

原来 colA 在 BigQuery 中实际上是一个 INTEGER,因此只需在模式定义中将其类型更改为 int 即可解决问题。