Apache Beam 使用 NamedTuple 推断模式 (Python)

Apache Beam infer schema using NamedTuple (Python)

我对 apache beam 很陌生,我想知道如何使用 namedtuple 将模式推断为 pcollection。

文档 Programming Guide 中的示例指出:

class Transaction(typing.NamedTuple):
  bank: str
  purchase_amount: float

pc = input | beam.Map(lambda ...).with_output_types(Transaction)

我尝试实现类似的东西,但首先从镶木地板文件中读取

from apache_beam import coders
from typing import NamedTuple
import apache_beam as beam


class TestSchema(NamedTuple):
    company_id: int
    is_company: bool
    company_created_datetime: str
    company_values: str


if __name__ == '__main__':
    coders.registry.register_coder(TestSchema, coders.RowCoder)
    with beam.Pipeline() as pipeline:
        record = pipeline | "Read Parquet" >> beam.io.ReadFromParquet("test.parquet").with_output_types(TestSchema) \
                          | "Print" >> beam.Map(print)

        pipeline.run().wait_until_finish()

我得到 AttributeError: 'dict' object has no attribute 'company_id' [while running 'Read Parquet/ParDo(_ArrowTableToRowDictionaries)']

同样没有 .with_output_types(TestSchema) 我可以看到看起来像这样的数据

{'company_id': 3, 'is_company': True, 'company_created_datetime': datetime.datetime(2022, 3, 8, 13, 2, 26, 573511), 'company_values': 'test value'}

我正在使用 python 3.8 和 beam 2.37.0

我错过了什么吗?任何帮助将不胜感激(下面的堆栈跟踪)。

Traceback (most recent call last):
  File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 817, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/worker/operations.py", line 826, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
  File "apache_beam/runners/common.py", line 1206, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
  File "apache_beam/runners/common.py", line 698, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 836, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1281, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 841, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
  File "apache_beam/runners/worker/operations.py", line 214, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 211, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 250, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1425, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1436, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 246, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 1610, in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
AttributeError: 'dict' object has no attribute 'company_id' [while running 'Read Parquet/ParDo(_ArrowTableToRowDictionaries)']

好的,在对 beam 模式进行一些研究并深入研究源代码之后,我终于找到了解决方案。看起来您需要将 pcollection 中的每个值都转换为 NamedTuple,然后应用类型提示。

with beam.Pipeline() as pipeline:
    record = pipeline | "Read Parquet" >> beam.io.ReadFromParquet("test.parquet") \
                      | "Transform to NamedTuple" beam.Map(lambda x: TestSchema(**x)).with_output_types(TestSchema) \
                      | "Print" >> beam.Map(print)