如何将 Twitter Heron 与 Storm Flux 一起使用
How to use Twitter Heron with Storm Flux
我正在尝试将项目从 apache-storm 迁移到 twitter-heron。经过一番努力,我能够摆脱大部分错误,例如使用 className: "org.apache.storm.kafka.ZkHosts"
而不是 className: "storm.kafka.ZkHosts"
。但是,我坚持提交拓扑。我使用Flux提交拓扑到storm
当它在 ZkState 中创建 CuratorFramework 对象时出现 NullPointerException。在进一步挖掘中,我发现 an issue in github 它说的是,如果未设置有关 zookeeper 的配置,则会导致此问题。
进一步调试我发现问题是因为我缺少ZkState.java:46中需要的以下配置。
storm.zookeeper.session.timeout
storm.zookeeper.connection.timeout
storm.zookeeper.retry.times
storm.zookeeper.retry.interval
虽然我已经设法找出问题所在,但是,我不确定在我的配置中将其添加到何处。有人可以帮我在哪里添加上面的配置。谢谢。
我的通量配置
name: "My_Topology"
components:
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "localhost:2181"
- id: "SpoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts" # brokerHosts
- "my-topic" # topic
- "/my-zkRoot" # zkRoot
- "my-id" # spoutId
properties:
- name: "zkServers"
value: ["localhost"]
- name: "zkPort"
value: 2181
- name: "zkRoot"
value: "/my-zkRoot"
- name: "retryInitialDelayMs"
value: 2000
- name: "retryDelayMultiplier"
value: 2
config:
topology.workers: 5
topology.testing.always.try.serialize: true
spouts:
- id: "kafka-spout"
className: "org.apache.storm.kafka.KafkaSpout"
parallelism: 1
constructorArgs:
- ref: "SpoutConfig"
bolts:
- id: "my-bolt"
className: "com.example.sample.MyBolt"
parallelism: 1
streams:
- name: "kafka_spout --> my_bolt"
from: "kafka-spout"
to: "my-bolt"
grouping:
type: SHUFFLE
您可以将这些添加到您的 Flux yaml 文件的配置部分
config:
topology.workers: 5
topology.testing.always.try.serialize: true
storm.zookeeper.session.timeout: 30000
storm.zookeeper.connection.timeout: 30000
storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 2000
我们已将 Storm Flux 原生集成到 Heron 中,以简化操作。使用 Heron ECO - 您可以为两个不同的 API
编写拓扑
- 本土苍鹭API
- 风暴API
有关其他信息,请在此处查看 ECO 文档
https://apache.github.io/incubator-heron/docs/developers/java/eco-api/
我正在尝试将项目从 apache-storm 迁移到 twitter-heron。经过一番努力,我能够摆脱大部分错误,例如使用 className: "org.apache.storm.kafka.ZkHosts"
而不是 className: "storm.kafka.ZkHosts"
。但是,我坚持提交拓扑。我使用Flux提交拓扑到storm
当它在 ZkState 中创建 CuratorFramework 对象时出现 NullPointerException。在进一步挖掘中,我发现 an issue in github 它说的是,如果未设置有关 zookeeper 的配置,则会导致此问题。
进一步调试我发现问题是因为我缺少ZkState.java:46中需要的以下配置。
storm.zookeeper.session.timeout
storm.zookeeper.connection.timeout
storm.zookeeper.retry.times
storm.zookeeper.retry.interval
虽然我已经设法找出问题所在,但是,我不确定在我的配置中将其添加到何处。有人可以帮我在哪里添加上面的配置。谢谢。
我的通量配置
name: "My_Topology"
components:
- id: "zkHosts"
className: "org.apache.storm.kafka.ZkHosts"
constructorArgs:
- "localhost:2181"
- id: "SpoutConfig"
className: "org.apache.storm.kafka.SpoutConfig"
constructorArgs:
- ref: "zkHosts" # brokerHosts
- "my-topic" # topic
- "/my-zkRoot" # zkRoot
- "my-id" # spoutId
properties:
- name: "zkServers"
value: ["localhost"]
- name: "zkPort"
value: 2181
- name: "zkRoot"
value: "/my-zkRoot"
- name: "retryInitialDelayMs"
value: 2000
- name: "retryDelayMultiplier"
value: 2
config:
topology.workers: 5
topology.testing.always.try.serialize: true
spouts:
- id: "kafka-spout"
className: "org.apache.storm.kafka.KafkaSpout"
parallelism: 1
constructorArgs:
- ref: "SpoutConfig"
bolts:
- id: "my-bolt"
className: "com.example.sample.MyBolt"
parallelism: 1
streams:
- name: "kafka_spout --> my_bolt"
from: "kafka-spout"
to: "my-bolt"
grouping:
type: SHUFFLE
您可以将这些添加到您的 Flux yaml 文件的配置部分
config:
topology.workers: 5
topology.testing.always.try.serialize: true
storm.zookeeper.session.timeout: 30000
storm.zookeeper.connection.timeout: 30000
storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 2000
我们已将 Storm Flux 原生集成到 Heron 中,以简化操作。使用 Heron ECO - 您可以为两个不同的 API
编写拓扑- 本土苍鹭API
- 风暴API
有关其他信息,请在此处查看 ECO 文档
https://apache.github.io/incubator-heron/docs/developers/java/eco-api/