Apache Beam 使用 NamedTuple 推断模式 (Python)
Apache Beam infer schema using NamedTuple (Python)
我对 apache beam 很陌生,我想知道如何使用 namedtuple 将模式推断为 pcollection。
文档 Programming Guide 中的示例指出:
class Transaction(typing.NamedTuple):
bank: str
purchase_amount: float
pc = input | beam.Map(lambda ...).with_output_types(Transaction)
我尝试实现类似的东西,但首先从镶木地板文件中读取
from apache_beam import coders
from typing import NamedTuple
import apache_beam as beam
class TestSchema(NamedTuple):
company_id: int
is_company: bool
company_created_datetime: str
company_values: str
if __name__ == '__main__':
coders.registry.register_coder(TestSchema, coders.RowCoder)
with beam.Pipeline() as pipeline:
record = pipeline | "Read Parquet" >> beam.io.ReadFromParquet("test.parquet").with_output_types(TestSchema) \
| "Print" >> beam.Map(print)
pipeline.run().wait_until_finish()
我得到 AttributeError: 'dict' object has no attribute 'company_id' [while running 'Read Parquet/ParDo(_ArrowTableToRowDictionaries)']
同样没有 .with_output_types(TestSchema)
我可以看到看起来像这样的数据
{'company_id': 3, 'is_company': True, 'company_created_datetime': datetime.datetime(2022, 3, 8, 13, 2, 26, 573511), 'company_values': 'test value'}
我正在使用 python 3.8 和 beam 2.37.0
我错过了什么吗?任何帮助将不胜感激(下面的堆栈跟踪)。
Traceback (most recent call last):
File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 817, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/worker/operations.py", line 826, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/common.py", line 1206, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
File "apache_beam/runners/common.py", line 698, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 836, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1281, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 841, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 214, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 211, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 250, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1425, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1436, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 246, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 1610, in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
AttributeError: 'dict' object has no attribute 'company_id' [while running 'Read Parquet/ParDo(_ArrowTableToRowDictionaries)']
好的,在对 beam 模式进行一些研究并深入研究源代码之后,我终于找到了解决方案。看起来您需要将 pcollection 中的每个值都转换为 NamedTuple,然后应用类型提示。
with beam.Pipeline() as pipeline:
record = pipeline | "Read Parquet" >> beam.io.ReadFromParquet("test.parquet") \
| "Transform to NamedTuple" beam.Map(lambda x: TestSchema(**x)).with_output_types(TestSchema) \
| "Print" >> beam.Map(print)
我对 apache beam 很陌生,我想知道如何使用 namedtuple 将模式推断为 pcollection。
文档 Programming Guide 中的示例指出:
class Transaction(typing.NamedTuple):
bank: str
purchase_amount: float
pc = input | beam.Map(lambda ...).with_output_types(Transaction)
我尝试实现类似的东西,但首先从镶木地板文件中读取
from apache_beam import coders
from typing import NamedTuple
import apache_beam as beam
class TestSchema(NamedTuple):
company_id: int
is_company: bool
company_created_datetime: str
company_values: str
if __name__ == '__main__':
coders.registry.register_coder(TestSchema, coders.RowCoder)
with beam.Pipeline() as pipeline:
record = pipeline | "Read Parquet" >> beam.io.ReadFromParquet("test.parquet").with_output_types(TestSchema) \
| "Print" >> beam.Map(print)
pipeline.run().wait_until_finish()
我得到 AttributeError: 'dict' object has no attribute 'company_id' [while running 'Read Parquet/ParDo(_ArrowTableToRowDictionaries)']
同样没有 .with_output_types(TestSchema)
我可以看到看起来像这样的数据
{'company_id': 3, 'is_company': True, 'company_created_datetime': datetime.datetime(2022, 3, 8, 13, 2, 26, 573511), 'company_values': 'test value'}
我正在使用 python 3.8 和 beam 2.37.0
我错过了什么吗?任何帮助将不胜感激(下面的堆栈跟踪)。
Traceback (most recent call last):
File "apache_beam/runners/worker/operations.py", line 346, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 348, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 817, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/worker/operations.py", line 826, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
File "apache_beam/runners/common.py", line 1206, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
File "apache_beam/runners/common.py", line 698, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 836, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 215, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 707, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 708, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1200, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1281, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1198, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 718, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 841, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1361, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 214, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 211, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 250, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1425, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1436, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 207, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 246, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 1610, in apache_beam.coders.coder_impl.RowCoderImpl.encode_to_stream
AttributeError: 'dict' object has no attribute 'company_id' [while running 'Read Parquet/ParDo(_ArrowTableToRowDictionaries)']
好的,在对 beam 模式进行一些研究并深入研究源代码之后,我终于找到了解决方案。看起来您需要将 pcollection 中的每个值都转换为 NamedTuple,然后应用类型提示。
with beam.Pipeline() as pipeline:
record = pipeline | "Read Parquet" >> beam.io.ReadFromParquet("test.parquet") \
| "Transform to NamedTuple" beam.Map(lambda x: TestSchema(**x)).with_output_types(TestSchema) \
| "Print" >> beam.Map(print)