从字典行定义 Python Apache Beam 模式

Defining a Python Apache Beam schema from dictionary rows

我想在 Apache Beam (Python) 中获取行架构,以便与 SQL t运行sforms 一起使用。但是,我 运行 进入下面解释的问题。

架构定义如下:

class RowSchema(typing.NamedTuple):
  colA: str
  colB: typing.Optional[str]

coders.registry.register_coder(RowSchema, coders.RowCoder)

以下示例正确推断架构:

  with beam.Pipeline(options=pipeline_options) as p:
    
    pcol = (p
    | "Create" >> beam.Create(
        [
          RowSchema(colA='a1', colB='b1'),
          RowSchema(colA='a2', colB=None)])
          .with_output_types(RowSchema)
    | beam.Map(print)
    )

但是,以下尝试引发“ValueError:类型名称和字段名称必须是有效标识符:'run.<locals>.RowSchema'”

  with beam.Pipeline(options=pipeline_options) as p:

    pcol = (p
    | "Create" >> beam.Create(
        [
            {'colA': 'a1', 'colB': 'b1'},
            {'colA': 'a2', 'colB': None}])
    | 'ToRow' >> beam.Map(
        lambda x: RowSchema(**x)) \
          .with_output_types(RowSchema)
    | beam.Map(print)
    )

完整堆栈跟踪:

Traceback (most recent call last):


File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "home/src/main.py", line 326, in <module>
    run()
  File "home/src/main.py", line 267, in run
    | 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
  File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1661, in Map
    pardo = FlatMap(wrapper, *args, **kwargs)
  File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1606, in FlatMap
    pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
  File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1217, in __init__
    super().__init__(fn, *args, **kwargs)
  File "home/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py", line 861, in __init__
    self.fn = pickler.loads(pickler.dumps(self.fn))
  File "home/lib/python3.9/site-packages/apache_beam/internal/pickler.py", line 51, in loads
    return desired_pickle_lib.loads(
  File "home/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py", line 289, in loads
    return dill.loads(s)
  File "home/lib/python3.9/site-packages/dill/_dill.py", line 275, in loads
    return load(file, ignore, **kwds)
  File "home/lib/python3.9/site-packages/dill/_dill.py", line 270, in load
    return Unpickler(file, ignore=ignore, **kwds).load()
  File "home/lib/python3.9/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "home/lib/python3.9/site-packages/dill/_dill.py", line 788, in _create_namedtuple
    t = collections.namedtuple(name, fieldnames)
  File "/usr/lib/python3.9/collections/__init__.py", line 390, in namedtuple
    raise ValueError('Type names and field names must be valid '
ValueError: Type names and field names must be valid identifiers: 'run.<locals>.RowSchema'

如果我将架构定义更改为

,失败的尝试就会成功
RowSchema = typing.NamedTuple('RowSchema', [('colA', str), ('colB', typing.Optional[str])])

根据下面的一些参考资料,错误片段的格式似乎正确。

参考文献:

在 Python 3.9、Beam 2.37.0 和多个运行器(包括 DirectRunner、DataflowRunner 和 PortableRunner)上进行了测试。

通过简单地将模式定义移到 运行 函数之外解决了这个问题。

class RowSchema(typing.NamedTuple):
    colA: str
    colB: typing.Optional[str]

coders.registry.register_coder(RowSchema, coders.RowCoder)

def run(argv=None, save_main_session=True):
    ...
    with beam.Pipeline(options=pipeline_options) as p:
        ...