使用 beam-nuggets 库部署管道时出现 GCP Dataflow runner 错误 - "Failed to read inputs in the data_plane."

GCP Dataflow runner error when deploying pipeline using beam-nuggets library - "Failed to read inputs in the data_plane."

我一直在 GCP 提供的 Apache Beam 笔记本中测试 Apache Beam 管道,使用 Kafka 实例作为输入,Bigquery 作为输出。我已经能够通过 Interactive runner 成功使用管道,但是当我将相同的管道部署到 Dataflow runner 时,它似乎从未真正从已定义的 Kafka 主题中读取。查看日志给我错误:

Failed to read inputs in the data plane. Traceback (most recent call last): File /usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py,

基于此posthere

的实现

有什么想法吗?代码如下:

from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio

kafka_config = {"topic": kafka_topic, "bootstrap_servers": ip_addr}

# p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options) # <- use for test
p = beam.Pipeline(DataflowRunner(), options=options) # <- use for dataflow implementation

notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(kafka_config) 
preprocess = notifications | "Pre-process for model" >> beam.ParDo(preprocess()) 
model = preprocess | "format & predict" >> beam.ParDo(model())

newWrite = model | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER)

来自日志的错误消息:

Failed to read inputs in the data plane. Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py", line 528, in _read_inputs for elements in elements_iterator: File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 416, in __next__ return self._next() File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 689, in _next raise self grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1595595923.509682344","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1595595923.509650517","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1595595923.509649070","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1595595923.509645878","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >

还有

grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed" debug_error_string = "{"created":"@1594205651.745381243","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3948,"referenced_errors":[{"created":"@1594205651.745371624","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/resolving_lb_policy.cc","file_line":216,"referenced_errors":[{"created":"@1594205651.745370349","description":"DNS resolution failed","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":375,"grpc_status":14,"referenced_errors":[{"created":"@1594205651.745367499","description":"unparseable host:port","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":417,"target_address":""}]}]}]}" >

管道设置:

Python sdk harness started with pipeline_options: {'streaming': True, 'project': 'example-project', 'job_name': 'beamapp-root-0727105627-001796', 'staging_location': 'example-staging-location', 'temp_location': 'example-staging-location', 'region': 'europe-west1', 'labels': ['goog-dataflow-notebook=2_23_0_dev'], 'subnetwork': 'example-subnetwork', 'experiments': ['use_fastavro', 'use_multiple_sdk_containers'], 'setup_file': '/root/notebook/workspace/setup.py', 'sdk_location': '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-2.23.0.dev0.tar.gz', 'sdk_worker_parallelism': '1', 'environment_cache_millis': '0', 'job_port': '0', 'artifact_port': '0', 'expansion_port': '0'}

据我所知 Failed to read inputs in the data plane ... status = StatusCode.UNAVAILABLE details = "DNS resolution failed" 可能是 Python Beam SDK 的问题,建议更新至 Python Beam SDK 2.23.0。

在我的实施计划中这似乎是不可能的,但是对于多语言管道来说它似乎更可行。我在 google 支持下开了一张票,经过一段时间调查后得到以下回复:

“… at this moment Python doesn't have any KafkaIO that works with DataflowRunner. You can use Java as a workaround. In case you need Python for something in particular (TensorFlow or similar), a possibility is to send the message from Kafka to a PubSub topic (via another pipeline that only reads from Kafka and publish to PS or an external application).”

所以请随时听取他们的建议,或者你们可以一起破解一些东西。我刚刚修改了我的体系结构以使用 pubsub 而不是 kafka。