在 apache-beam 中使用 python 的 ReadFromKafka 不支持的信号:2
ReadFromKafka with python in apache-beam Unsupported signal: 2
我一直在努力完成这项工作,我知道这是一个跨语言转换和所有这些,我在我的电脑上安装了 Java jdk(当我写 java -cmd 上的版本我得到了正确的信息和所有这些)但是当我试图使一个简单的管道工作时:
import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
def main():
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',temp_location=temp_location,staging_location=staging_location,project=project,experiments=['use_runner_v2'],streaming=True)
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config={'bootstrap.servers':'xxxxx-xxxxx...','group_id':'testAB'},topics=['users'])
msgs | beam.FlatMap(print)
if __name__ == '__main__':
main()
我收到此错误:ValueError:不支持的信号:2
我尝试将参数 expansion_service= 'beam:external:java:kafka:read:v1' 添加到 ReadFromKafka 但随后我得到:
状态=StatusCode.UNAVAILABLE
details = "DNS 解析失败
beam:external:java:kafka:read:v1:未知:OS 错误“
我正在 venv python 环境中工作,如果此信息有用并且我的 kafka 集群在融合云上。
我也遇到了这个运行时错误:
运行时错误:java.lang.RuntimeException:无法从规范 urn 中获取 beam:transform:org.apache.beam:kafka_read_without_metadata:v1 的依赖项:“beam:transform:org.apache.beam:kafka_read_without_metadata:v1
编辑:我从这里得到 bootstrap 服务器选项
我的错误是我跳过了必须开始 expansion_service 的步骤,我是用这个命令做的
java -jar beam-sdks-java-io-expansion-service-2.37.0.jar 8088 --javaClassLookupAllowlistFile='*'
从 https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.36.0 下载 beam-sdks-java-io-expansion-service-2.37.0.jar 后
然后在 expansion_service='localhost:8088'
中指定端口
然后我犯了两个小错误,一个是我使用的是 JDK 18,我认为它不兼容 https://beam.apache.org/get-started/quickstart-java/ 所以我切换到 JDK 17 并使用了 python 3.8 而不是 python 3.10
我一直在努力完成这项工作,我知道这是一个跨语言转换和所有这些,我在我的电脑上安装了 Java jdk(当我写 java -cmd 上的版本我得到了正确的信息和所有这些)但是当我试图使一个简单的管道工作时:
import apache_beam as beam
from apache_beam.io.external.kafka import ReadFromKafka
from apache_beam.options.pipeline_options import PipelineOptions
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS']='credentialsOld.json'
def main():
print('======================================================')
beam_options = PipelineOptions(runner='DataflowRunner',temp_location=temp_location,staging_location=staging_location,project=project,experiments=['use_runner_v2'],streaming=True)
with beam.Pipeline(options=beam_options) as p:
msgs = p | 'ReadKafka' >> ReadFromKafka(consumer_config={'bootstrap.servers':'xxxxx-xxxxx...','group_id':'testAB'},topics=['users'])
msgs | beam.FlatMap(print)
if __name__ == '__main__':
main()
我收到此错误:ValueError:不支持的信号:2
我尝试将参数 expansion_service= 'beam:external:java:kafka:read:v1' 添加到 ReadFromKafka 但随后我得到:
状态=StatusCode.UNAVAILABLE
details = "DNS 解析失败 beam:external:java:kafka:read:v1:未知:OS 错误“
我正在 venv python 环境中工作,如果此信息有用并且我的 kafka 集群在融合云上。
我也遇到了这个运行时错误: 运行时错误:java.lang.RuntimeException:无法从规范 urn 中获取 beam:transform:org.apache.beam:kafka_read_without_metadata:v1 的依赖项:“beam:transform:org.apache.beam:kafka_read_without_metadata:v1
编辑:我从这里得到 bootstrap 服务器选项
我的错误是我跳过了必须开始 expansion_service 的步骤,我是用这个命令做的
java -jar beam-sdks-java-io-expansion-service-2.37.0.jar 8088 --javaClassLookupAllowlistFile='*'
从 https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.36.0 下载 beam-sdks-java-io-expansion-service-2.37.0.jar 后
然后在 expansion_service='localhost:8088'
然后我犯了两个小错误,一个是我使用的是 JDK 18,我认为它不兼容 https://beam.apache.org/get-started/quickstart-java/ 所以我切换到 JDK 17 并使用了 python 3.8 而不是 python 3.10