具有本地数据流服务器的多语言处理器
Polyglot Processor with Local Dataflow Server
我一直在尝试使用多语言并构建一个简单的 python 处理器。我按照 polyglot recipe 进行操作,但无法部署流。我最初部署了示例中使用的相同处理器,但出现以下错误:
请求的未知命令行 arg:spring.cloud.stream.bindings.input.destination
请求的未知环境变量:SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
Traceback (most recent call last):
File "/processor/python_processor.py", line 10, in
consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 353, in init
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 203, in init
self.cluster = ClusterMetadata(**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 67, in init
self._bootstrap_brokers = self._generate_bootstrap_brokers()
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 71, in _generate_bootstrap_brokers
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1336, in collect_hosts
host, port, afi = get_ip_port_afi(host_port)
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1289, in get_ip_port_afi
host_and_port_str = host_and_port_str.strip()
AttributeError: 'NoneType' object has no attribute 'strip'
Exception AttributeError: "'KafkaClient' object has no attribute '_closed'" in <bound method KafkaClient.del of <kafka.client_async.KafkaClient object at 0x7f8b7024cf10>> ignored
然后我尝试通过部署流传递环境和绑定参数,但这没有用。当我手动将 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS 和 spring.cloud.stream.bindings.input.destination 参数插入 Kafka 的消费者时,我能够将流部署为解决方法。我不完全确定是什么导致了这个问题,在 Kubernetes 上部署它会有什么不同,或者这是 Polyglot 和 Dataflow 的问题吗?如有任何帮助,我们将不胜感激。
重现步骤:
尝试从本地数据流服务器上的多语言配方部署多语言处理器流。我还使用与示例中相同的流定义:http --server.port=32123 | python-处理器 --reversestring=true |日志。
附加上下文:
我正在尝试在 SPDF 和 Kafka 的本地安装上部署流,因为我在使用 Docker.
部署自定义 python 应用程序时遇到了一些问题
您在上面发布的配方需要 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
环境变量作为服务器配置的一部分出现(因为流是通过 Skipper 服务器管理的,您需要在 Skipper 服务器配置中设置此环境变量).
您可以查看 this 文档,了解如何在 Skipper 服务器部署中将 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
设置为环境 属性。
您还可以在部署 python-processor
流应用程序时将此 属性 作为部署程序 属性 传递。您可以参考 this 文档,了解如何通过部署 属性 在流部署时设置 Spring Cloud Stream 属性(此处为活页夹配置 属性)。
我一直在尝试使用多语言并构建一个简单的 python 处理器。我按照 polyglot recipe 进行操作,但无法部署流。我最初部署了示例中使用的相同处理器,但出现以下错误:
请求的未知命令行 arg:spring.cloud.stream.bindings.input.destination 请求的未知环境变量:SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
Traceback (most recent call last):
File "/processor/python_processor.py", line 10, in
consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 353, in init
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 203, in init
self.cluster = ClusterMetadata(**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 67, in init
self._bootstrap_brokers = self._generate_bootstrap_brokers()
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 71, in _generate_bootstrap_brokers
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1336, in collect_hosts
host, port, afi = get_ip_port_afi(host_port)
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1289, in get_ip_port_afi
host_and_port_str = host_and_port_str.strip()
AttributeError: 'NoneType' object has no attribute 'strip'
Exception AttributeError: "'KafkaClient' object has no attribute '_closed'" in <bound method KafkaClient.del of <kafka.client_async.KafkaClient object at 0x7f8b7024cf10>> ignored
然后我尝试通过部署流传递环境和绑定参数,但这没有用。当我手动将 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS 和 spring.cloud.stream.bindings.input.destination 参数插入 Kafka 的消费者时,我能够将流部署为解决方法。我不完全确定是什么导致了这个问题,在 Kubernetes 上部署它会有什么不同,或者这是 Polyglot 和 Dataflow 的问题吗?如有任何帮助,我们将不胜感激。
重现步骤: 尝试从本地数据流服务器上的多语言配方部署多语言处理器流。我还使用与示例中相同的流定义:http --server.port=32123 | python-处理器 --reversestring=true |日志。
附加上下文: 我正在尝试在 SPDF 和 Kafka 的本地安装上部署流,因为我在使用 Docker.
部署自定义 python 应用程序时遇到了一些问题您在上面发布的配方需要 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
环境变量作为服务器配置的一部分出现(因为流是通过 Skipper 服务器管理的,您需要在 Skipper 服务器配置中设置此环境变量).
您可以查看 this 文档,了解如何在 Skipper 服务器部署中将 SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
设置为环境 属性。
您还可以在部署 python-processor
流应用程序时将此 属性 作为部署程序 属性 传递。您可以参考 this 文档,了解如何通过部署 属性 在流部署时设置 Spring Cloud Stream 属性(此处为活页夹配置 属性)。