如何在 Apache Beam Python 中正确定义和使用模式?
How to properly define and use a schema in Apache Beam Python?
以下代码尝试从源读取数据、定义架构并执行 SQLTransform。
...
class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
beam.coders.registry.register_coder(RowSchema, beam.coders.RowCoder)
def run(argv=None):
...
with beam.Pipeline(options=pipeline_options) as p:
query = '''
SELECT
colA, colB
FROM `{}`
''' \
.format(
known_args.table
)
pcol = (p
| 'read from BQ' >>
beam.io.ReadFromBigQuery(
gcs_location=known_args.execution_gcs_location,
query=query,
use_standard_sql=True,
)
| 'ToRow' >> beam.Map(
lambda x: RowSchema(**x)).with_output_types(RowSchema)
| SqlTransform(
"""
...
""")
| beam.Map(print)
)
...
但是,它会导致以下错误:
File "/home/lib/python3.9/site-packages/apache_beam/coders/coders.py", line 423, in encode
return value.encode('utf-8')
AttributeError: 'int' object has no attribute 'encode' [while running 'ToRow']
使用相同的结构,以下管道可以正常工作:
pcol = (p
| "Create" >> beam.Create(
[{'colA': 'a1', 'colB': 'b1'}, {'colA': 'a2', 'colB': None}])
| 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
| SqlTransform(
"""
...
""")
| beam.Map(print)
)
示例 1 中似乎缺少的是将输入数据转换为 beam.pvalue.Row 的实例,示例 2 中是这种情况,但示例 1 中不是。
如何将输入转换为 Row 对象以用于静态模式,假设这确实是问题所在?
bigquery module 也有内置架构,但仅适用于 BigQuery 写入。
我还检查了示例,包括 this one,它使用不适用于此用例的动态模式。
原来 colA 在 BigQuery 中实际上是一个 INTEGER,因此只需在模式定义中将其类型更改为 int 即可解决问题。
以下代码尝试从源读取数据、定义架构并执行 SQLTransform。
...
class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
beam.coders.registry.register_coder(RowSchema, beam.coders.RowCoder)
def run(argv=None):
...
with beam.Pipeline(options=pipeline_options) as p:
query = '''
SELECT
colA, colB
FROM `{}`
''' \
.format(
known_args.table
)
pcol = (p
| 'read from BQ' >>
beam.io.ReadFromBigQuery(
gcs_location=known_args.execution_gcs_location,
query=query,
use_standard_sql=True,
)
| 'ToRow' >> beam.Map(
lambda x: RowSchema(**x)).with_output_types(RowSchema)
| SqlTransform(
"""
...
""")
| beam.Map(print)
)
...
但是,它会导致以下错误:
File "/home/lib/python3.9/site-packages/apache_beam/coders/coders.py", line 423, in encode
return value.encode('utf-8')
AttributeError: 'int' object has no attribute 'encode' [while running 'ToRow']
使用相同的结构,以下管道可以正常工作:
pcol = (p
| "Create" >> beam.Create(
[{'colA': 'a1', 'colB': 'b1'}, {'colA': 'a2', 'colB': None}])
| 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
| SqlTransform(
"""
...
""")
| beam.Map(print)
)
示例 1 中似乎缺少的是将输入数据转换为 beam.pvalue.Row 的实例,示例 2 中是这种情况,但示例 1 中不是。
如何将输入转换为 Row 对象以用于静态模式,假设这确实是问题所在?
bigquery module 也有内置架构,但仅适用于 BigQuery 写入。
我还检查了示例,包括 this one,它使用不适用于此用例的动态模式。
原来 colA 在 BigQuery 中实际上是一个 INTEGER,因此只需在模式定义中将其类型更改为 int 即可解决问题。