从字典行定义 Python Apache Beam 模式
Defining a Python Apache Beam schema from dictionary rows
我想在 Apache Beam (Python) 中获取行架构,以便与 SQL t运行sforms 一起使用。但是,我 运行 进入下面解释的问题。
架构定义如下:
class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
coders.registry.register_coder(RowSchema, coders.RowCoder)
以下示例正确推断架构:
with beam.Pipeline(options=pipeline_options) as p:
pcol = (p
| "Create" >> beam.Create(
[
RowSchema(colA='a1', colB='b1'),
RowSchema(colA='a2', colB=None)])
.with_output_types(RowSchema)
| beam.Map(print)
)
但是,以下尝试引发“ValueError:类型名称和字段名称必须是有效标识符:'run.<locals>.RowSchema'”
with beam.Pipeline(options=pipeline_options) as p:
pcol = (p
| "Create" >> beam.Create(
[
{'colA': 'a1', 'colB': 'b1'},
{'colA': 'a2', 'colB': None}])
| 'ToRow' >> beam.Map(
lambda x: RowSchema(**x)) \
.with_output_types(RowSchema)
| beam.Map(print)
)
完整堆栈跟踪:
Traceback (most recent call last):
File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "home/src/main.py", line 326, in <module>
run()
File "home/src/main.py", line 267, in run
| 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1661, in Map
pardo = FlatMap(wrapper, *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1606, in FlatMap
pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1217, in __init__
super().__init__(fn, *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py", line 861, in __init__
self.fn = pickler.loads(pickler.dumps(self.fn))
File "home/lib/python3.9/site-packages/apache_beam/internal/pickler.py", line 51, in loads
return desired_pickle_lib.loads(
File "home/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py", line 289, in loads
return dill.loads(s)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "home/lib/python3.9/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 788, in _create_namedtuple
t = collections.namedtuple(name, fieldnames)
File "/usr/lib/python3.9/collections/__init__.py", line 390, in namedtuple
raise ValueError('Type names and field names must be valid '
ValueError: Type names and field names must be valid identifiers: 'run.<locals>.RowSchema'
如果我将架构定义更改为
,失败的尝试就会成功
RowSchema = typing.NamedTuple('RowSchema', [('colA', str), ('colB', typing.Optional[str])])
根据下面的一些参考资料,错误片段的格式似乎正确。
参考文献:
- https://beam.apache.org/documentation/programming-guide/#inferring-schemas
- https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang_sql.py
- https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py
在 Python 3.9、Beam 2.37.0 和多个运行器(包括 DirectRunner、DataflowRunner 和 PortableRunner)上进行了测试。
通过简单地将模式定义移到 运行 函数之外解决了这个问题。
class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
coders.registry.register_coder(RowSchema, coders.RowCoder)
def run(argv=None, save_main_session=True):
...
with beam.Pipeline(options=pipeline_options) as p:
...
我想在 Apache Beam (Python) 中获取行架构,以便与 SQL t运行sforms 一起使用。但是,我 运行 进入下面解释的问题。
架构定义如下:
class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
coders.registry.register_coder(RowSchema, coders.RowCoder)
以下示例正确推断架构:
with beam.Pipeline(options=pipeline_options) as p:
pcol = (p
| "Create" >> beam.Create(
[
RowSchema(colA='a1', colB='b1'),
RowSchema(colA='a2', colB=None)])
.with_output_types(RowSchema)
| beam.Map(print)
)
但是,以下尝试引发“ValueError:类型名称和字段名称必须是有效标识符:'run.<locals>.RowSchema'”
with beam.Pipeline(options=pipeline_options) as p:
pcol = (p
| "Create" >> beam.Create(
[
{'colA': 'a1', 'colB': 'b1'},
{'colA': 'a2', 'colB': None}])
| 'ToRow' >> beam.Map(
lambda x: RowSchema(**x)) \
.with_output_types(RowSchema)
| beam.Map(print)
)
完整堆栈跟踪:
Traceback (most recent call last):
File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "home/src/main.py", line 326, in <module>
run()
File "home/src/main.py", line 267, in run
| 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1661, in Map
pardo = FlatMap(wrapper, *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1606, in FlatMap
pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1217, in __init__
super().__init__(fn, *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py", line 861, in __init__
self.fn = pickler.loads(pickler.dumps(self.fn))
File "home/lib/python3.9/site-packages/apache_beam/internal/pickler.py", line 51, in loads
return desired_pickle_lib.loads(
File "home/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py", line 289, in loads
return dill.loads(s)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "home/lib/python3.9/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 788, in _create_namedtuple
t = collections.namedtuple(name, fieldnames)
File "/usr/lib/python3.9/collections/__init__.py", line 390, in namedtuple
raise ValueError('Type names and field names must be valid '
ValueError: Type names and field names must be valid identifiers: 'run.<locals>.RowSchema'
如果我将架构定义更改为
,失败的尝试就会成功RowSchema = typing.NamedTuple('RowSchema', [('colA', str), ('colB', typing.Optional[str])])
根据下面的一些参考资料,错误片段的格式似乎正确。
参考文献:
- https://beam.apache.org/documentation/programming-guide/#inferring-schemas
- https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang_sql.py
- https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py
在 Python 3.9、Beam 2.37.0 和多个运行器(包括 DirectRunner、DataflowRunner 和 PortableRunner)上进行了测试。
通过简单地将模式定义移到 运行 函数之外解决了这个问题。
class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
coders.registry.register_coder(RowSchema, coders.RowCoder)
def run(argv=None, save_main_session=True):
...
with beam.Pipeline(options=pipeline_options) as p:
...