使用 beam-nuggets 库部署管道时出现 GCP Dataflow runner 错误 - "Failed to read inputs in the data_plane."
GCP Dataflow runner error when deploying pipeline using beam-nuggets library - "Failed to read inputs in the data_plane."
我一直在 GCP 提供的 Apache Beam 笔记本中测试 Apache Beam 管道,使用 Kafka 实例作为输入,Bigquery 作为输出。我已经能够通过 Interactive runner 成功使用管道,但是当我将相同的管道部署到 Dataflow runner 时,它似乎从未真正从已定义的 Kafka 主题中读取。查看日志给我错误:
Failed to read inputs in the data plane. Traceback (most recent call
last): File
/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py,
基于此posthere
的实现
有什么想法吗?代码如下:
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
kafka_config = {"topic": kafka_topic, "bootstrap_servers": ip_addr}
# p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options) # <- use for test
p = beam.Pipeline(DataflowRunner(), options=options) # <- use for dataflow implementation
notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config)
preprocess = notifications | "Pre-process for model" >> beam.ParDo(preprocess())
model = preprocess | "format & predict" >> beam.ParDo(model())
newWrite = model | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
来自日志的错误消息:
Failed to read inputs in the data plane. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", line 528, in _read_inputs for elements in elements_iterator: File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__ return self._next() File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 689, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1595595923.509682344","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1595595923.509650517","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1595595923.509649070","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1595595923.509645878","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >
还有
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1594205651.745381243","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1594205651.745371624","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1594205651.745370349","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1594205651.745367499","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >
管道设置:
Python sdk harness started with pipeline_options: {'streaming': True, 'project': 'example-project', 'job_name': 'beamapp-root-0727105627-001796', 'staging_location': 'example-staging-location', 'temp_location': 'example-staging-location', 'region': 'europe-west1', 'labels': ['goog-dataflow-notebook=2_23_0_dev'], 'subnetwork': 'example-subnetwork', 'experiments': ['use_fastavro', 'use_multiple_sdk_containers'], 'setup_file': '/root/notebook/workspace/setup.py', 'sdk_location': '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-2.23.0.dev0.tar.gz', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'job_port': '0', 'artifact_port': '0', 'expansion_port': '0'}
据我所知 Failed to read inputs in the data plane ... status = StatusCode.UNAVAILABLE details = "DNS resolution failed"
可能是 Python Beam SDK 的问题,建议更新至 Python Beam SDK 2.23.0。
在我的实施计划中这似乎是不可能的,但是对于多语言管道来说它似乎更可行。我在 google 支持下开了一张票,经过一段时间调查后得到以下回复:
“… at this moment Python doesn't have any KafkaIO that works with
DataflowRunner. You can use Java as a workaround. In case you need
Python for something in particular (TensorFlow or similar), a
possibility is to send the message from Kafka to a PubSub topic (via
another pipeline that only reads from Kafka and publish to PS or an
external application).”
所以请随时听取他们的建议,或者你们可以一起破解一些东西。我刚刚修改了我的体系结构以使用 pubsub 而不是 kafka。
我一直在 GCP 提供的 Apache Beam 笔记本中测试 Apache Beam 管道,使用 Kafka 实例作为输入,Bigquery 作为输出。我已经能够通过 Interactive runner 成功使用管道,但是当我将相同的管道部署到 Dataflow runner 时,它似乎从未真正从已定义的 Kafka 主题中读取。查看日志给我错误:
Failed to read inputs in the data plane. Traceback (most recent call last): File /usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py,
基于此posthere
的实现有什么想法吗?代码如下:
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
kafka_config = {"topic": kafka_topic, "bootstrap_servers": ip_addr}
# p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options) # <- use for test
p = beam.Pipeline(DataflowRunner(), options=options) # <- use for dataflow implementation
notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config)
preprocess = notifications | "Pre-process for model" >> beam.ParDo(preprocess())
model = preprocess | "format & predict" >> beam.ParDo(model())
newWrite = model | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)
来自日志的错误消息:
Failed to read inputs in the data plane. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", line 528, in _read_inputs for elements in elements_iterator: File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__ return self._next() File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 689, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1595595923.509682344","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1595595923.509650517","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1595595923.509649070","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1595595923.509645878","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >
还有
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1594205651.745381243","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1594205651.745371624","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1594205651.745370349","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1594205651.745367499","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >
管道设置:
Python sdk harness started with pipeline_options: {'streaming': True, 'project': 'example-project', 'job_name': 'beamapp-root-0727105627-001796', 'staging_location': 'example-staging-location', 'temp_location': 'example-staging-location', 'region': 'europe-west1', 'labels': ['goog-dataflow-notebook=2_23_0_dev'], 'subnetwork': 'example-subnetwork', 'experiments': ['use_fastavro', 'use_multiple_sdk_containers'], 'setup_file': '/root/notebook/workspace/setup.py', 'sdk_location': '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-2.23.0.dev0.tar.gz', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'job_port': '0', 'artifact_port': '0', 'expansion_port': '0'}
据我所知 Failed to read inputs in the data plane ... status = StatusCode.UNAVAILABLE details = "DNS resolution failed"
可能是 Python Beam SDK 的问题,建议更新至 Python Beam SDK 2.23.0。
在我的实施计划中这似乎是不可能的,但是对于多语言管道来说它似乎更可行。我在 google 支持下开了一张票,经过一段时间调查后得到以下回复:
“… at this moment Python doesn't have any KafkaIO that works with DataflowRunner. You can use Java as a workaround. In case you need Python for something in particular (TensorFlow or similar), a possibility is to send the message from Kafka to a PubSub topic (via another pipeline that only reads from Kafka and publish to PS or an external application).”
所以请随时听取他们的建议,或者你们可以一起破解一些东西。我刚刚修改了我的体系结构以使用 pubsub 而不是 kafka。