ApacheBeam 数据流作业的类型错误:'unable to determinisitcally encode <TableReference>, provide a type hint'
TypeError for ApacheBeam Dataflow job: 'unable to determinisitcally encode <TableReference>, provide a type hint'
我可以 运行 我的管道在本地使用 Direct Runner 没有问题,但是当我部署到 Dataflow 时,我收到以下错误:
"Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 219, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1311, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1322, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 919, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 200, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1401, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 239, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 393, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
datasetId: 'my-dataset-id'
projectId: 'my-project-id'
tableId: 'my-table'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Write good logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
/---/
File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
datasetId: 'string1'
projectId: 'string2'
tableId: 'string3'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey' [while running 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/Map(reify_timestamps)-ptransform-9454']
这是管道的代码:
logs = (p | beam.io.ReadFromPubSub(topic=topic)
| "Decode Json" >> beam.ParDo(ListProcessor())
| 'Add Timestamp' >> beam.Map(lambda log: beam.window.TimestampedValue(log, time.time()))
| 'Window into Fixed Intervals' >> beam.WindowInto(beam.window.FixedWindows(10))
| 'Calculate Pipeline Duration' >> beam.ParDo(DurationProcessor())
)
additional_bq_parameters = {
'timePartitioning': {'type': 'DAY', 'field': 'created_at'}}
errors = (logs | 'Write logs to BigQuery' >> beam.io.WriteToBigQuery(
'{0}:{1}.{2}'.format(project, collection, table_name),
schema=schema,
additional_bq_parameters=additional_bq_parameters,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))
你能帮我吗?我已经检查了源代码但没有成功。谢谢!
为防止出现此问题中提到的错误,解决方法是使用 Apache Beam 2.30.0 版及更高版本,如 https://issues.apache.org/jira/browse/BEAM-12079 中所述。
我可以 运行 我的管道在本地使用 Direct Runner 没有问题,但是当我部署到 Dataflow 时,我收到以下错误:
"Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 762, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1395, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 219, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 183, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
File "apache_beam/runners/worker/opcounters.py", line 217, in apache_beam.runners.worker.opcounters.OperationCounters.update_from
File "apache_beam/runners/worker/opcounters.py", line 255, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
File "apache_beam/coders/coder_impl.py", line 1311, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1322, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 919, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 200, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
File "apache_beam/coders/coder_impl.py", line 1401, in apache_beam.coders.coder_impl.LengthPrefixCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 239, in apache_beam.coders.coder_impl.StreamCoderImpl.estimate_size
File "apache_beam/coders/coder_impl.py", line 393, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
datasetId: 'my-dataset-id'
projectId: 'my-project-id'
tableId: 'my-table'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Write good logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
/---/
File "apache_beam/coders/coder_impl.py", line 403, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
File "apache_beam/coders/coder_impl.py", line 443, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '<TableReference
datasetId: 'string1'
projectId: 'string2'
tableId: 'string3'>' of type '<class 'apache_beam.io.gcp.internal.clients.bigquery.bigquery_v2_messages.TableReference'>', please provide a type hint for the input of 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/GroupByKey' [while running 'Writing logs to BigQuery/_StreamToBigQuery/CommitInsertIds/Map(reify_timestamps)-ptransform-9454']
这是管道的代码:
logs = (p | beam.io.ReadFromPubSub(topic=topic)
| "Decode Json" >> beam.ParDo(ListProcessor())
| 'Add Timestamp' >> beam.Map(lambda log: beam.window.TimestampedValue(log, time.time()))
| 'Window into Fixed Intervals' >> beam.WindowInto(beam.window.FixedWindows(10))
| 'Calculate Pipeline Duration' >> beam.ParDo(DurationProcessor())
)
additional_bq_parameters = {
'timePartitioning': {'type': 'DAY', 'field': 'created_at'}}
errors = (logs | 'Write logs to BigQuery' >> beam.io.WriteToBigQuery(
'{0}:{1}.{2}'.format(project, collection, table_name),
schema=schema,
additional_bq_parameters=additional_bq_parameters,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))
你能帮我吗?我已经检查了源代码但没有成功。谢谢!
为防止出现此问题中提到的错误,解决方法是使用 Apache Beam 2.30.0 版及更高版本,如 https://issues.apache.org/jira/browse/BEAM-12079 中所述。